2026-01-05 07:46:21 -05:00
// Copyright The Prometheus Authors
2021-11-23 13:40:49 -05:00
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package histogram
import (
2024-11-03 07:15:51 -05:00
"errors"
2021-11-23 13:40:49 -05:00
"fmt"
2023-08-25 17:35:42 -04:00
"math"
2021-11-23 13:40:49 -05:00
"strings"
2026-02-07 18:52:22 -05:00
"github.com/prometheus/prometheus/util/kahansum"
2021-11-23 13:40:49 -05:00
)
// FloatHistogram is similar to Histogram but uses float64 for all
// counts. Additionally, bucket counts are absolute and not deltas.
//
// A FloatHistogram is needed by PromQL to handle operations that might result
// in fractional counts. Since the counts in a histogram are unlikely to be too
// large to be represented precisely by a float64, a FloatHistogram can also be
// used to represent a histogram with integer counts and thus serves as a more
// generalized representation.
type FloatHistogram struct {
2023-01-04 04:58:18 -05:00
// Counter reset information.
CounterResetHint CounterResetHint
2024-06-07 06:50:59 -04:00
// Currently valid schema numbers are -4 <= n <= 8 for exponential buckets.
2024-02-28 08:06:43 -05:00
// They are all for base-2 bucket schemas, where 1 is a bucket boundary in
// each case, and then each power of two is divided into 2^n logarithmic buckets.
// Or in other words, each bucket boundary is the previous boundary times
2024-03-22 09:36:39 -04:00
// 2^(2^-n). Another valid schema number is -53 for custom buckets, defined by
// the CustomValues field.
2021-11-23 13:40:49 -05:00
Schema int32
// Width of the zero bucket.
ZeroThreshold float64
// Observations falling into the zero bucket. Must be zero or positive.
ZeroCount float64
// Total number of observations. Must be zero or positive.
Count float64
// Sum of observations. This is also used as the stale marker.
Sum float64
// Spans for positive and negative buckets (see Span below).
PositiveSpans , NegativeSpans [ ] Span
// Observation counts in buckets. Each represents an absolute count and
// must be zero or positive.
PositiveBuckets , NegativeBuckets [ ] float64
2024-02-28 08:06:43 -05:00
// Holds the custom (usually upper) bounds for bucket definitions, otherwise nil.
// This slice is interned, to be treated as immutable and copied by reference.
// These numbers should be strictly increasing. This field is only used when the
2024-03-22 09:36:39 -04:00
// schema is for custom buckets, and the ZeroThreshold, ZeroCount, NegativeSpans
2024-06-07 06:50:59 -04:00
// and NegativeBuckets fields are not used in that case.
2024-03-22 09:36:39 -04:00
CustomValues [ ] float64
2024-02-28 08:06:43 -05:00
}
func ( h * FloatHistogram ) UsesCustomBuckets ( ) bool {
return IsCustomBucketsSchema ( h . Schema )
2021-11-23 13:40:49 -05:00
}
// Copy returns a deep copy of the Histogram.
func ( h * FloatHistogram ) Copy ( ) * FloatHistogram {
2024-01-23 11:02:14 -05:00
c := FloatHistogram {
CounterResetHint : h . CounterResetHint ,
Schema : h . Schema ,
Count : h . Count ,
Sum : h . Sum ,
}
2021-11-23 13:40:49 -05:00
2024-02-28 08:06:43 -05:00
if h . UsesCustomBuckets ( ) {
2025-05-07 08:29:35 -04:00
// Custom values are interned, so no need to copy them.
c . CustomValues = h . CustomValues
2024-02-28 08:06:43 -05:00
} else {
c . ZeroThreshold = h . ZeroThreshold
c . ZeroCount = h . ZeroCount
if len ( h . NegativeSpans ) != 0 {
c . NegativeSpans = make ( [ ] Span , len ( h . NegativeSpans ) )
copy ( c . NegativeSpans , h . NegativeSpans )
}
if len ( h . NegativeBuckets ) != 0 {
c . NegativeBuckets = make ( [ ] float64 , len ( h . NegativeBuckets ) )
copy ( c . NegativeBuckets , h . NegativeBuckets )
}
}
2024-01-23 11:02:14 -05:00
if len ( h . PositiveSpans ) != 0 {
2021-11-23 13:40:49 -05:00
c . PositiveSpans = make ( [ ] Span , len ( h . PositiveSpans ) )
copy ( c . PositiveSpans , h . PositiveSpans )
}
2024-01-23 11:02:14 -05:00
if len ( h . PositiveBuckets ) != 0 {
2021-11-23 13:40:49 -05:00
c . PositiveBuckets = make ( [ ] float64 , len ( h . PositiveBuckets ) )
copy ( c . PositiveBuckets , h . PositiveBuckets )
}
return & c
}
2024-01-23 11:02:14 -05:00
// CopyTo makes a deep copy into the given FloatHistogram.
// The destination object has to be a non-nil pointer.
func ( h * FloatHistogram ) CopyTo ( to * FloatHistogram ) {
to . CounterResetHint = h . CounterResetHint
to . Schema = h . Schema
to . Count = h . Count
to . Sum = h . Sum
2024-02-28 08:06:43 -05:00
if h . UsesCustomBuckets ( ) {
to . ZeroThreshold = 0
to . ZeroCount = 0
to . NegativeSpans = clearIfNotNil ( to . NegativeSpans )
to . NegativeBuckets = clearIfNotNil ( to . NegativeBuckets )
2025-05-07 08:29:35 -04:00
// Custom values are interned, so no need to copy them.
to . CustomValues = h . CustomValues
2024-02-28 08:06:43 -05:00
} else {
to . ZeroThreshold = h . ZeroThreshold
to . ZeroCount = h . ZeroCount
to . NegativeSpans = resize ( to . NegativeSpans , len ( h . NegativeSpans ) )
copy ( to . NegativeSpans , h . NegativeSpans )
to . NegativeBuckets = resize ( to . NegativeBuckets , len ( h . NegativeBuckets ) )
copy ( to . NegativeBuckets , h . NegativeBuckets )
2025-05-07 08:29:35 -04:00
// Custom values are interned, so no need to reset them.
to . CustomValues = nil
2024-02-28 08:06:43 -05:00
}
2024-01-23 11:02:14 -05:00
to . PositiveSpans = resize ( to . PositiveSpans , len ( h . PositiveSpans ) )
copy ( to . PositiveSpans , h . PositiveSpans )
to . PositiveBuckets = resize ( to . PositiveBuckets , len ( h . PositiveBuckets ) )
copy ( to . PositiveBuckets , h . PositiveBuckets )
}
2022-01-06 11:44:30 -05:00
// CopyToSchema works like Copy, but the returned deep copy has the provided
// target schema, which must be ≤ the original schema (i.e. it must have a lower
2024-06-07 06:50:59 -04:00
// resolution). This method panics if a custom buckets schema is used in the
// receiving FloatHistogram or as the provided targetSchema.
2022-01-06 11:44:30 -05:00
func ( h * FloatHistogram ) CopyToSchema ( targetSchema int32 ) * FloatHistogram {
if targetSchema == h . Schema {
// Fast path.
return h . Copy ( )
}
2024-02-28 08:06:43 -05:00
if h . UsesCustomBuckets ( ) {
panic ( fmt . Errorf ( "cannot reduce resolution to %d when there are custom buckets" , targetSchema ) )
}
if IsCustomBucketsSchema ( targetSchema ) {
panic ( "cannot reduce resolution to custom buckets schema" )
}
2022-01-06 11:44:30 -05:00
if targetSchema > h . Schema {
panic ( fmt . Errorf ( "cannot copy from schema %d to %d" , h . Schema , targetSchema ) )
}
c := FloatHistogram {
Schema : targetSchema ,
ZeroThreshold : h . ZeroThreshold ,
ZeroCount : h . ZeroCount ,
Count : h . Count ,
Sum : h . Sum ,
}
Add histogram validation in remote-read and during reducing resolution (#17561)
ReduceResolution is currently called before validation during
ingestion. This will cause a panic if there are not enough buckets in
the histogram. If there are too many buckets, the spurious buckets are
ignored, and therefore the error in the input histogram is masked.
Furthermore, invalid negative offsets might cause problems, too.
Therefore, we need to do some minimal validation in reduceResolution.
Fortunately, it is easy and shouldn't slow things down. Sadly, it
requires to return errors, which triggers a bunch of code changes.
Even here is a bright side, we can get rud of a few panics. (Remember:
Don't panic!)
In different news, we haven't done a full validation of histograms
read via remote-read. This is not so much a security concern (as you
can throw off Prometheus easily by feeding it bogus data via
remote-read) but more that remote-read sources might be makeshift and
could accidentally create invalid histograms. We really don't want to
panic in that case. So this commit does not only add a check of the
spans and buckets as needed for resolution reduction but also a full
validation during remote-read.
Signed-off-by: beorn7 <beorn@grafana.com>
2025-11-20 18:22:24 -05:00
c . PositiveSpans , c . PositiveBuckets = mustReduceResolution ( h . PositiveSpans , h . PositiveBuckets , h . Schema , targetSchema , false , false )
c . NegativeSpans , c . NegativeBuckets = mustReduceResolution ( h . NegativeSpans , h . NegativeBuckets , h . Schema , targetSchema , false , false )
2022-01-06 11:44:30 -05:00
return & c
}
2021-11-23 13:40:49 -05:00
// String returns a string representation of the Histogram.
func ( h * FloatHistogram ) String ( ) string {
var sb strings . Builder
fmt . Fprintf ( & sb , "{count:%g, sum:%g" , h . Count , h . Sum )
2022-10-03 07:15:27 -04:00
var nBuckets [ ] Bucket [ float64 ]
2021-11-23 13:40:49 -05:00
for it := h . NegativeBucketIterator ( ) ; it . Next ( ) ; {
bucket := it . At ( )
if bucket . Count != 0 {
nBuckets = append ( nBuckets , it . At ( ) )
}
}
for i := len ( nBuckets ) - 1 ; i >= 0 ; i -- {
fmt . Fprintf ( & sb , ", %s" , nBuckets [ i ] . String ( ) )
}
if h . ZeroCount != 0 {
fmt . Fprintf ( & sb , ", %s" , h . ZeroBucket ( ) . String ( ) )
}
for it := h . PositiveBucketIterator ( ) ; it . Next ( ) ; {
bucket := it . At ( )
if bucket . Count != 0 {
fmt . Fprintf ( & sb , ", %s" , bucket . String ( ) )
}
}
sb . WriteRune ( '}' )
return sb . String ( )
}
2023-08-25 17:35:42 -04:00
// TestExpression returns the string representation of this histogram as it is used in the internal PromQL testing
// framework as well as in promtool rules unit tests.
// The syntax is described in https://prometheus.io/docs/prometheus/latest/configuration/unit_testing_rules/#series
func ( h * FloatHistogram ) TestExpression ( ) string {
var res [ ] string
m := h . Copy ( )
m . Compact ( math . MaxInt ) // Compact to reduce the number of positive and negative spans to 1.
if m . Schema != 0 {
res = append ( res , fmt . Sprintf ( "schema:%d" , m . Schema ) )
}
if m . Count != 0 {
res = append ( res , fmt . Sprintf ( "count:%g" , m . Count ) )
}
if m . Sum != 0 {
res = append ( res , fmt . Sprintf ( "sum:%g" , m . Sum ) )
}
if m . ZeroCount != 0 {
res = append ( res , fmt . Sprintf ( "z_bucket:%g" , m . ZeroCount ) )
}
if m . ZeroThreshold != 0 {
res = append ( res , fmt . Sprintf ( "z_bucket_w:%g" , m . ZeroThreshold ) )
}
2024-05-08 07:58:24 -04:00
if m . UsesCustomBuckets ( ) {
res = append ( res , fmt . Sprintf ( "custom_values:%g" , m . CustomValues ) )
}
2023-08-25 17:35:42 -04:00
2024-09-04 01:39:05 -04:00
switch m . CounterResetHint {
case UnknownCounterReset :
// Unknown is the default, don't add anything.
case CounterReset :
2024-09-19 21:49:54 -04:00
res = append ( res , "counter_reset_hint:reset" )
2024-09-04 01:39:05 -04:00
case NotCounterReset :
2024-09-19 21:49:54 -04:00
res = append ( res , "counter_reset_hint:not_reset" )
2024-09-04 01:39:05 -04:00
case GaugeType :
2024-09-19 21:49:54 -04:00
res = append ( res , "counter_reset_hint:gauge" )
2024-09-04 01:39:05 -04:00
}
2023-08-25 17:35:42 -04:00
addBuckets := func ( kind , bucketsKey , offsetKey string , buckets [ ] float64 , spans [ ] Span ) [ ] string {
if len ( spans ) > 1 {
panic ( fmt . Sprintf ( "histogram with multiple %s spans not supported" , kind ) )
}
for _ , span := range spans {
if span . Offset != 0 {
res = append ( res , fmt . Sprintf ( "%s:%d" , offsetKey , span . Offset ) )
}
}
var bucketStr [ ] string
for _ , bucket := range buckets {
bucketStr = append ( bucketStr , fmt . Sprintf ( "%g" , bucket ) )
}
if len ( bucketStr ) > 0 {
res = append ( res , fmt . Sprintf ( "%s:[%s]" , bucketsKey , strings . Join ( bucketStr , " " ) ) )
}
return res
}
res = addBuckets ( "positive" , "buckets" , "offset" , m . PositiveBuckets , m . PositiveSpans )
res = addBuckets ( "negative" , "n_buckets" , "n_offset" , m . NegativeBuckets , m . NegativeSpans )
return "{{" + strings . Join ( res , " " ) + "}}"
}
2024-06-07 06:50:59 -04:00
// ZeroBucket returns the zero bucket. This method panics if the schema is for custom buckets.
2022-10-03 07:15:27 -04:00
func ( h * FloatHistogram ) ZeroBucket ( ) Bucket [ float64 ] {
2024-02-28 08:06:43 -05:00
if h . UsesCustomBuckets ( ) {
panic ( "histograms with custom buckets have no zero bucket" )
}
2022-10-03 07:15:27 -04:00
return Bucket [ float64 ] {
2021-11-23 13:40:49 -05:00
Lower : - h . ZeroThreshold ,
Upper : h . ZeroThreshold ,
LowerInclusive : true ,
UpperInclusive : true ,
Count : h . ZeroCount ,
2024-02-28 08:06:43 -05:00
// Index is irrelevant for the zero bucket.
2021-11-23 13:40:49 -05:00
}
}
2023-05-16 15:15:20 -04:00
// Mul multiplies the FloatHistogram by the provided factor, i.e. it scales all
2021-12-06 07:49:18 -05:00
// bucket counts including the zero bucket and the count and the sum of
// observations. The bucket layout stays the same. This method changes the
// receiving histogram directly (rather than acting on a copy). It returns a
2025-10-08 20:10:07 -04:00
// pointer to the receiving histogram for convenience. If factor is negative,
// the counter reset hint is set to GaugeType.
2023-05-16 15:15:20 -04:00
func ( h * FloatHistogram ) Mul ( factor float64 ) * FloatHistogram {
2021-12-06 07:49:18 -05:00
h . ZeroCount *= factor
h . Count *= factor
h . Sum *= factor
for i := range h . PositiveBuckets {
h . PositiveBuckets [ i ] *= factor
}
for i := range h . NegativeBuckets {
h . NegativeBuckets [ i ] *= factor
}
2025-08-05 11:37:04 -04:00
if factor < 0 {
h . CounterResetHint = GaugeType
}
2021-12-06 07:49:18 -05:00
return h
}
2023-08-24 15:02:14 -04:00
// Div works like Mul but divides instead of multiplies.
2025-10-08 20:10:07 -04:00
// When dividing by 0, everything will be set to Inf. If scalar is negative,
// the counter reset hint is set to GaugeType.
2023-05-16 15:15:20 -04:00
func ( h * FloatHistogram ) Div ( scalar float64 ) * FloatHistogram {
h . ZeroCount /= scalar
h . Count /= scalar
h . Sum /= scalar
2024-10-15 08:44:36 -04:00
// Division by zero removes all buckets.
if scalar == 0 {
h . PositiveBuckets = nil
h . NegativeBuckets = nil
h . PositiveSpans = nil
h . NegativeSpans = nil
return h
}
2023-05-16 15:15:20 -04:00
for i := range h . PositiveBuckets {
h . PositiveBuckets [ i ] /= scalar
}
for i := range h . NegativeBuckets {
h . NegativeBuckets [ i ] /= scalar
}
2025-08-05 11:37:04 -04:00
if scalar < 0 {
h . CounterResetHint = GaugeType
}
2023-05-16 15:15:20 -04:00
return h
}
2021-12-06 07:49:18 -05:00
// Add adds the provided other histogram to the receiving histogram. Count, Sum,
// and buckets from the other histogram are added to the corresponding
// components of the receiving histogram. Buckets in the other histogram that do
// not exist in the receiving histogram are inserted into the latter. The
// resulting histogram might have buckets with a population of zero or directly
// adjacent spans (offset=0). To normalize those, call the Compact method.
//
2023-12-07 13:50:54 -05:00
// The method reconciles differences in the zero threshold and in the schema, and
// changes them if needed. The other histogram will not be modified in any case.
2024-02-28 08:06:43 -05:00
// Adding is currently only supported between 2 exponential histograms, or between
2025-08-14 12:58:02 -04:00
// 2 custom buckets histograms with the exact same custom bounds. If CounterResetHint
// values conflict, the receiver's hint is set to unknown, and counterResetCollision
// is returned as true. A counter reset conflict occurs iff one of two histograms indicate
// a counter reset (CounterReset) while the other indicates no reset (NotCounterReset).
2021-12-06 07:49:18 -05:00
//
2025-10-17 19:03:52 -04:00
// In case of mismatched NHCB bounds, they will be reconciled to the intersection of
// both histograms, and nhcbBoundsReconciled will be returned as true.
//
2022-01-06 11:44:30 -05:00
// This method returns a pointer to the receiving histogram for convenience.
2025-10-17 19:03:52 -04:00
func ( h * FloatHistogram ) Add ( other * FloatHistogram ) ( res * FloatHistogram , counterResetCollision , nhcbBoundsReconciled bool , err error ) {
2025-10-08 20:10:07 -04:00
if err := h . checkSchemaAndBounds ( other ) ; err != nil {
2025-10-17 19:03:52 -04:00
return nil , false , false , err
2024-02-28 08:06:43 -05:00
}
2025-10-08 20:10:07 -04:00
counterResetCollision = h . adjustCounterReset ( other )
2024-02-28 08:06:43 -05:00
if ! h . UsesCustomBuckets ( ) {
2026-02-07 18:52:22 -05:00
otherZeroCount , _ := h . reconcileZeroBuckets ( other , nil )
2024-02-28 08:06:43 -05:00
h . ZeroCount += otherZeroCount
}
2021-12-06 07:49:18 -05:00
h . Count += other . Count
h . Sum += other . Sum
2023-11-26 02:26:34 -05:00
var (
2024-02-28 08:06:43 -05:00
hPositiveSpans = h . PositiveSpans
hPositiveBuckets = h . PositiveBuckets
2023-11-26 02:26:34 -05:00
otherPositiveSpans = other . PositiveSpans
otherPositiveBuckets = other . PositiveBuckets
2024-02-28 08:06:43 -05:00
)
if h . UsesCustomBuckets ( ) {
2025-10-17 19:03:52 -04:00
if CustomBucketBoundsMatch ( h . CustomValues , other . CustomValues ) {
h . PositiveSpans , h . PositiveBuckets = addBuckets ( h . Schema , h . ZeroThreshold , false , hPositiveSpans , hPositiveBuckets , otherPositiveSpans , otherPositiveBuckets )
} else {
nhcbBoundsReconciled = true
intersectedBounds := intersectCustomBucketBounds ( h . CustomValues , other . CustomValues )
// Add with mapping - maps both histograms to intersected layout.
2026-02-07 18:52:22 -05:00
h . PositiveSpans , h . PositiveBuckets , _ = addCustomBucketsWithMismatches (
2025-10-17 19:03:52 -04:00
false ,
hPositiveSpans , hPositiveBuckets , h . CustomValues ,
otherPositiveSpans , otherPositiveBuckets , other . CustomValues ,
2026-02-07 18:52:22 -05:00
nil , intersectedBounds )
2025-10-17 19:03:52 -04:00
h . CustomValues = intersectedBounds
}
return h , counterResetCollision , nhcbBoundsReconciled , nil
2024-02-28 08:06:43 -05:00
}
var (
hNegativeSpans = h . NegativeSpans
hNegativeBuckets = h . NegativeBuckets
2023-11-26 02:26:34 -05:00
otherNegativeSpans = other . NegativeSpans
otherNegativeBuckets = other . NegativeBuckets
)
2023-12-07 13:50:54 -05:00
switch {
case other . Schema < h . Schema :
Add histogram validation in remote-read and during reducing resolution (#17561)
ReduceResolution is currently called before validation during
ingestion. This will cause a panic if there are not enough buckets in
the histogram. If there are too many buckets, the spurious buckets are
ignored, and therefore the error in the input histogram is masked.
Furthermore, invalid negative offsets might cause problems, too.
Therefore, we need to do some minimal validation in reduceResolution.
Fortunately, it is easy and shouldn't slow things down. Sadly, it
requires to return errors, which triggers a bunch of code changes.
Even here is a bright side, we can get rud of a few panics. (Remember:
Don't panic!)
In different news, we haven't done a full validation of histograms
read via remote-read. This is not so much a security concern (as you
can throw off Prometheus easily by feeding it bogus data via
remote-read) but more that remote-read sources might be makeshift and
could accidentally create invalid histograms. We really don't want to
panic in that case. So this commit does not only add a check of the
spans and buckets as needed for resolution reduction but also a full
validation during remote-read.
Signed-off-by: beorn7 <beorn@grafana.com>
2025-11-20 18:22:24 -05:00
hPositiveSpans , hPositiveBuckets = mustReduceResolution ( hPositiveSpans , hPositiveBuckets , h . Schema , other . Schema , false , true )
hNegativeSpans , hNegativeBuckets = mustReduceResolution ( hNegativeSpans , hNegativeBuckets , h . Schema , other . Schema , false , true )
2023-12-07 13:50:54 -05:00
h . Schema = other . Schema
case other . Schema > h . Schema :
Add histogram validation in remote-read and during reducing resolution (#17561)
ReduceResolution is currently called before validation during
ingestion. This will cause a panic if there are not enough buckets in
the histogram. If there are too many buckets, the spurious buckets are
ignored, and therefore the error in the input histogram is masked.
Furthermore, invalid negative offsets might cause problems, too.
Therefore, we need to do some minimal validation in reduceResolution.
Fortunately, it is easy and shouldn't slow things down. Sadly, it
requires to return errors, which triggers a bunch of code changes.
Even here is a bright side, we can get rud of a few panics. (Remember:
Don't panic!)
In different news, we haven't done a full validation of histograms
read via remote-read. This is not so much a security concern (as you
can throw off Prometheus easily by feeding it bogus data via
remote-read) but more that remote-read sources might be makeshift and
could accidentally create invalid histograms. We really don't want to
panic in that case. So this commit does not only add a check of the
spans and buckets as needed for resolution reduction but also a full
validation during remote-read.
Signed-off-by: beorn7 <beorn@grafana.com>
2025-11-20 18:22:24 -05:00
otherPositiveSpans , otherPositiveBuckets = mustReduceResolution ( otherPositiveSpans , otherPositiveBuckets , other . Schema , h . Schema , false , false )
otherNegativeSpans , otherNegativeBuckets = mustReduceResolution ( otherNegativeSpans , otherNegativeBuckets , other . Schema , h . Schema , false , false )
2023-08-16 10:26:31 -04:00
}
2023-12-07 13:50:54 -05:00
h . PositiveSpans , h . PositiveBuckets = addBuckets ( h . Schema , h . ZeroThreshold , false , hPositiveSpans , hPositiveBuckets , otherPositiveSpans , otherPositiveBuckets )
h . NegativeSpans , h . NegativeBuckets = addBuckets ( h . Schema , h . ZeroThreshold , false , hNegativeSpans , hNegativeBuckets , otherNegativeSpans , otherNegativeBuckets )
2023-11-26 02:26:34 -05:00
2025-10-17 19:03:52 -04:00
return h , counterResetCollision , nhcbBoundsReconciled , nil
2023-08-16 10:26:31 -04:00
}
2026-02-07 18:52:22 -05:00
// KahanAdd works like Add but using the Kahan summation algorithm to minimize numerical errors.
// c is a histogram holding the Kahan compensation term. It is modified in-place if non-nil.
// If c is nil, a new compensation histogram is created inside the function. In this case,
// the caller must use the returned updatedC, because the original c variable is not modified.
func ( h * FloatHistogram ) KahanAdd ( other , c * FloatHistogram ) ( updatedC * FloatHistogram , counterResetCollision , nhcbBoundsReconciled bool , err error ) {
if err := h . checkSchemaAndBounds ( other ) ; err != nil {
return nil , false , false , err
}
counterResetCollision = h . adjustCounterReset ( other )
if c == nil {
c = h . newCompensationHistogram ( )
}
if ! h . UsesCustomBuckets ( ) {
otherZeroCount , otherCZeroCount := h . reconcileZeroBuckets ( other , c )
h . ZeroCount , c . ZeroCount = kahansum . Inc ( otherZeroCount , h . ZeroCount , c . ZeroCount )
h . ZeroCount , c . ZeroCount = kahansum . Inc ( otherCZeroCount , h . ZeroCount , c . ZeroCount )
}
h . Count , c . Count = kahansum . Inc ( other . Count , h . Count , c . Count )
h . Sum , c . Sum = kahansum . Inc ( other . Sum , h . Sum , c . Sum )
var (
hPositiveSpans = h . PositiveSpans
hPositiveBuckets = h . PositiveBuckets
otherPositiveSpans = other . PositiveSpans
otherPositiveBuckets = other . PositiveBuckets
cPositiveBuckets = c . PositiveBuckets
)
if h . UsesCustomBuckets ( ) {
if CustomBucketBoundsMatch ( h . CustomValues , other . CustomValues ) {
h . PositiveSpans , h . PositiveBuckets , c . PositiveBuckets = kahanAddBuckets (
h . Schema , h . ZeroThreshold , false ,
hPositiveSpans , hPositiveBuckets ,
otherPositiveSpans , otherPositiveBuckets ,
cPositiveBuckets , nil ,
)
} else {
nhcbBoundsReconciled = true
intersectedBounds := intersectCustomBucketBounds ( h . CustomValues , other . CustomValues )
// Add with mapping - maps both histograms to intersected layout.
h . PositiveSpans , h . PositiveBuckets , c . PositiveBuckets = addCustomBucketsWithMismatches (
false ,
hPositiveSpans , hPositiveBuckets , h . CustomValues ,
otherPositiveSpans , otherPositiveBuckets , other . CustomValues ,
cPositiveBuckets , intersectedBounds )
h . CustomValues = intersectedBounds
c . CustomValues = intersectedBounds
}
c . PositiveSpans = h . PositiveSpans
return c , counterResetCollision , nhcbBoundsReconciled , nil
}
var (
hNegativeSpans = h . NegativeSpans
hNegativeBuckets = h . NegativeBuckets
otherNegativeSpans = other . NegativeSpans
otherNegativeBuckets = other . NegativeBuckets
cNegativeBuckets = c . NegativeBuckets
2026-03-06 10:50:55 -05:00
otherCPositiveBuckets [ ] float64
otherCNegativeBuckets [ ] float64
2026-02-07 18:52:22 -05:00
)
switch {
case other . Schema < h . Schema :
hPositiveSpans , hPositiveBuckets , cPositiveBuckets = kahanReduceResolution (
hPositiveSpans , hPositiveBuckets , cPositiveBuckets ,
h . Schema , other . Schema ,
true ,
)
hNegativeSpans , hNegativeBuckets , cNegativeBuckets = kahanReduceResolution (
hNegativeSpans , hNegativeBuckets , cNegativeBuckets ,
h . Schema , other . Schema ,
true ,
)
h . Schema = other . Schema
case other . Schema > h . Schema :
2026-03-06 10:50:55 -05:00
if len ( otherPositiveBuckets ) > 0 {
otherCPositiveBuckets = make ( [ ] float64 , len ( otherPositiveBuckets ) )
otherPositiveSpans , otherPositiveBuckets , otherCPositiveBuckets = kahanReduceResolution (
otherPositiveSpans , otherPositiveBuckets , otherCPositiveBuckets ,
other . Schema , h . Schema ,
false ,
)
}
if len ( otherNegativeBuckets ) > 0 {
otherCNegativeBuckets = make ( [ ] float64 , len ( otherNegativeBuckets ) )
otherNegativeSpans , otherNegativeBuckets , otherCNegativeBuckets = kahanReduceResolution (
otherNegativeSpans , otherNegativeBuckets , otherCNegativeBuckets ,
other . Schema , h . Schema ,
false ,
)
}
2026-02-07 18:52:22 -05:00
}
h . PositiveSpans , h . PositiveBuckets , c . PositiveBuckets = kahanAddBuckets (
h . Schema , h . ZeroThreshold , false ,
hPositiveSpans , hPositiveBuckets ,
otherPositiveSpans , otherPositiveBuckets ,
cPositiveBuckets , otherCPositiveBuckets ,
)
h . NegativeSpans , h . NegativeBuckets , c . NegativeBuckets = kahanAddBuckets (
h . Schema , h . ZeroThreshold , false ,
hNegativeSpans , hNegativeBuckets ,
otherNegativeSpans , otherNegativeBuckets ,
cNegativeBuckets , otherCNegativeBuckets ,
)
c . Schema = h . Schema
c . ZeroThreshold = h . ZeroThreshold
c . PositiveSpans = h . PositiveSpans
c . NegativeSpans = h . NegativeSpans
return c , counterResetCollision , nhcbBoundsReconciled , nil
}
2025-10-08 20:10:07 -04:00
// Sub works like Add but subtracts the other histogram. It uses the same logic
// to adjust the counter reset hint. This is useful where this method is used
// for incremental mean calculation. However, if it is used for the actual "-"
// operator in PromQL, the counter reset needs to be set to GaugeType after
// calling this method.
2025-10-17 19:03:52 -04:00
func ( h * FloatHistogram ) Sub ( other * FloatHistogram ) ( res * FloatHistogram , counterResetCollision , nhcbBoundsReconciled bool , err error ) {
2025-10-08 20:10:07 -04:00
if err := h . checkSchemaAndBounds ( other ) ; err != nil {
2025-10-17 19:03:52 -04:00
return nil , false , false , err
2024-02-28 08:06:43 -05:00
}
2025-10-08 20:10:07 -04:00
counterResetCollision = h . adjustCounterReset ( other )
2024-02-28 08:06:43 -05:00
if ! h . UsesCustomBuckets ( ) {
2026-02-07 18:52:22 -05:00
otherZeroCount , _ := h . reconcileZeroBuckets ( other , nil )
2024-02-28 08:06:43 -05:00
h . ZeroCount -= otherZeroCount
}
2021-12-06 07:49:18 -05:00
h . Count -= other . Count
h . Sum -= other . Sum
2023-11-26 02:26:34 -05:00
var (
2024-02-28 08:06:43 -05:00
hPositiveSpans = h . PositiveSpans
hPositiveBuckets = h . PositiveBuckets
2023-11-26 02:26:34 -05:00
otherPositiveSpans = other . PositiveSpans
otherPositiveBuckets = other . PositiveBuckets
2024-02-28 08:06:43 -05:00
)
if h . UsesCustomBuckets ( ) {
2025-10-17 19:03:52 -04:00
if CustomBucketBoundsMatch ( h . CustomValues , other . CustomValues ) {
h . PositiveSpans , h . PositiveBuckets = addBuckets ( h . Schema , h . ZeroThreshold , true , hPositiveSpans , hPositiveBuckets , otherPositiveSpans , otherPositiveBuckets )
} else {
nhcbBoundsReconciled = true
intersectedBounds := intersectCustomBucketBounds ( h . CustomValues , other . CustomValues )
// Subtract with mapping - maps both histograms to intersected layout.
2026-02-07 18:52:22 -05:00
h . PositiveSpans , h . PositiveBuckets , _ = addCustomBucketsWithMismatches (
2025-10-17 19:03:52 -04:00
true ,
hPositiveSpans , hPositiveBuckets , h . CustomValues ,
otherPositiveSpans , otherPositiveBuckets , other . CustomValues ,
2026-02-07 18:52:22 -05:00
nil , intersectedBounds )
2025-10-17 19:03:52 -04:00
h . CustomValues = intersectedBounds
}
return h , counterResetCollision , nhcbBoundsReconciled , nil
2024-02-28 08:06:43 -05:00
}
var (
hNegativeSpans = h . NegativeSpans
hNegativeBuckets = h . NegativeBuckets
2023-11-26 02:26:34 -05:00
otherNegativeSpans = other . NegativeSpans
otherNegativeBuckets = other . NegativeBuckets
)
2023-12-07 13:50:54 -05:00
switch {
case other . Schema < h . Schema :
Add histogram validation in remote-read and during reducing resolution (#17561)
ReduceResolution is currently called before validation during
ingestion. This will cause a panic if there are not enough buckets in
the histogram. If there are too many buckets, the spurious buckets are
ignored, and therefore the error in the input histogram is masked.
Furthermore, invalid negative offsets might cause problems, too.
Therefore, we need to do some minimal validation in reduceResolution.
Fortunately, it is easy and shouldn't slow things down. Sadly, it
requires to return errors, which triggers a bunch of code changes.
Even here is a bright side, we can get rud of a few panics. (Remember:
Don't panic!)
In different news, we haven't done a full validation of histograms
read via remote-read. This is not so much a security concern (as you
can throw off Prometheus easily by feeding it bogus data via
remote-read) but more that remote-read sources might be makeshift and
could accidentally create invalid histograms. We really don't want to
panic in that case. So this commit does not only add a check of the
spans and buckets as needed for resolution reduction but also a full
validation during remote-read.
Signed-off-by: beorn7 <beorn@grafana.com>
2025-11-20 18:22:24 -05:00
hPositiveSpans , hPositiveBuckets = mustReduceResolution ( hPositiveSpans , hPositiveBuckets , h . Schema , other . Schema , false , true )
hNegativeSpans , hNegativeBuckets = mustReduceResolution ( hNegativeSpans , hNegativeBuckets , h . Schema , other . Schema , false , true )
2023-12-07 13:50:54 -05:00
h . Schema = other . Schema
case other . Schema > h . Schema :
Add histogram validation in remote-read and during reducing resolution (#17561)
ReduceResolution is currently called before validation during
ingestion. This will cause a panic if there are not enough buckets in
the histogram. If there are too many buckets, the spurious buckets are
ignored, and therefore the error in the input histogram is masked.
Furthermore, invalid negative offsets might cause problems, too.
Therefore, we need to do some minimal validation in reduceResolution.
Fortunately, it is easy and shouldn't slow things down. Sadly, it
requires to return errors, which triggers a bunch of code changes.
Even here is a bright side, we can get rud of a few panics. (Remember:
Don't panic!)
In different news, we haven't done a full validation of histograms
read via remote-read. This is not so much a security concern (as you
can throw off Prometheus easily by feeding it bogus data via
remote-read) but more that remote-read sources might be makeshift and
could accidentally create invalid histograms. We really don't want to
panic in that case. So this commit does not only add a check of the
spans and buckets as needed for resolution reduction but also a full
validation during remote-read.
Signed-off-by: beorn7 <beorn@grafana.com>
2025-11-20 18:22:24 -05:00
otherPositiveSpans , otherPositiveBuckets = mustReduceResolution ( otherPositiveSpans , otherPositiveBuckets , other . Schema , h . Schema , false , false )
otherNegativeSpans , otherNegativeBuckets = mustReduceResolution ( otherNegativeSpans , otherNegativeBuckets , other . Schema , h . Schema , false , false )
2021-12-06 07:49:18 -05:00
}
2023-08-23 00:52:24 -04:00
2023-12-07 13:50:54 -05:00
h . PositiveSpans , h . PositiveBuckets = addBuckets ( h . Schema , h . ZeroThreshold , true , hPositiveSpans , hPositiveBuckets , otherPositiveSpans , otherPositiveBuckets )
h . NegativeSpans , h . NegativeBuckets = addBuckets ( h . Schema , h . ZeroThreshold , true , hNegativeSpans , hNegativeBuckets , otherNegativeSpans , otherNegativeBuckets )
2023-11-26 02:26:34 -05:00
2025-10-17 19:03:52 -04:00
return h , counterResetCollision , nhcbBoundsReconciled , nil
2025-08-14 12:58:02 -04:00
}
2022-12-28 03:55:07 -05:00
// Equals returns true if the given float histogram matches exactly.
// Exact match is when there are no new buckets (even empty) and no missing buckets,
// and all the bucket values match. Spans can have different empty length spans in between,
// but they must represent the same bucket layout to match.
2023-10-14 14:30:40 -04:00
// Sum, Count, ZeroCount and bucket values are compared based on their bit patterns
// because this method is about data equality rather than mathematical equality.
2024-02-28 08:06:43 -05:00
// We ignore fields that are not used based on the exponential / custom buckets schema,
// but check fields where differences may cause unintended behaviour even if they are not
// supposed to be used according to the schema.
2022-12-28 03:55:07 -05:00
func ( h * FloatHistogram ) Equals ( h2 * FloatHistogram ) bool {
if h2 == nil {
2025-12-22 04:38:48 -05:00
return h == nil
2022-12-28 03:55:07 -05:00
}
2024-02-28 08:06:43 -05:00
if h . Schema != h2 . Schema ||
2023-09-28 02:06:54 -04:00
math . Float64bits ( h . Count ) != math . Float64bits ( h2 . Count ) ||
2023-09-25 09:03:55 -04:00
math . Float64bits ( h . Sum ) != math . Float64bits ( h2 . Sum ) {
2022-12-28 03:55:07 -05:00
return false
}
2024-02-28 08:06:43 -05:00
if h . UsesCustomBuckets ( ) {
2025-10-05 15:38:07 -04:00
if ! CustomBucketBoundsMatch ( h . CustomValues , h2 . CustomValues ) {
2024-02-28 08:06:43 -05:00
return false
}
}
if h . ZeroThreshold != h2 . ZeroThreshold ||
math . Float64bits ( h . ZeroCount ) != math . Float64bits ( h2 . ZeroCount ) {
2022-12-28 03:55:07 -05:00
return false
}
2024-02-28 08:06:43 -05:00
2022-12-28 03:55:07 -05:00
if ! spansMatch ( h . NegativeSpans , h2 . NegativeSpans ) {
return false
}
2025-10-05 15:38:07 -04:00
if ! floatBucketsMatch ( h . NegativeBuckets , h2 . NegativeBuckets ) {
2024-02-28 08:06:43 -05:00
return false
}
2022-12-28 03:55:07 -05:00
2024-02-28 08:06:43 -05:00
if ! spansMatch ( h . PositiveSpans , h2 . PositiveSpans ) {
2022-12-28 03:55:07 -05:00
return false
}
2025-10-05 15:38:07 -04:00
if ! floatBucketsMatch ( h . PositiveBuckets , h2 . PositiveBuckets ) {
2022-12-28 03:55:07 -05:00
return false
}
return true
}
2023-10-18 05:49:56 -04:00
// Size returns the total size of the FloatHistogram, which includes the size of the pointer
// to FloatHistogram, all its fields, and all elements contained in slices.
2023-07-28 04:49:36 -04:00
// NOTE: this is only valid for 64 bit architectures.
2023-11-29 12:23:34 -05:00
func ( h * FloatHistogram ) Size ( ) int {
2023-10-18 05:49:56 -04:00
// Size of each slice separately.
2023-11-29 12:23:34 -05:00
posSpanSize := len ( h . PositiveSpans ) * 8 // 8 bytes (int32 + uint32).
negSpanSize := len ( h . NegativeSpans ) * 8 // 8 bytes (int32 + uint32).
posBucketSize := len ( h . PositiveBuckets ) * 8 // 8 bytes (float64).
negBucketSize := len ( h . NegativeBuckets ) * 8 // 8 bytes (float64).
2024-03-22 09:36:39 -04:00
customBoundSize := len ( h . CustomValues ) * 8 // 8 bytes (float64).
2023-10-18 05:49:56 -04:00
// Total size of the struct.
// fh is 8 bytes.
// fh.CounterResetHint is 4 bytes (1 byte bool + 3 bytes padding).
// fh.Schema is 4 bytes.
// fh.ZeroThreshold is 8 bytes.
// fh.ZeroCount is 8 bytes.
// fh.Count is 8 bytes.
// fh.Sum is 8 bytes.
// fh.PositiveSpans is 24 bytes.
// fh.NegativeSpans is 24 bytes.
// fh.PositiveBuckets is 24 bytes.
// fh.NegativeBuckets is 24 bytes.
2024-03-22 09:36:39 -04:00
// fh.CustomValues is 24 bytes.
2024-02-28 08:06:43 -05:00
structSize := 168
2023-07-28 04:49:36 -04:00
2024-02-28 08:06:43 -05:00
return structSize + posSpanSize + negSpanSize + posBucketSize + negBucketSize + customBoundSize
2023-07-28 04:49:36 -04:00
}
2021-12-06 07:49:18 -05:00
// Compact eliminates empty buckets at the beginning and end of each span, then
// merges spans that are consecutive or at most maxEmptyBuckets apart, and
2021-12-21 13:11:45 -05:00
// finally splits spans that contain more consecutive empty buckets than
// maxEmptyBuckets. (The actual implementation might do something more efficient
// but with the same result.) The compaction happens "in place" in the
// receiving histogram, but a pointer to it is returned for convenience.
histograms: Add Compact method to the normal integer Histogram
And use the new method to call to compact Histograms during
parsing. This happens for both `Histogram` and `FloatHistogram`. In
this way, if targets decide to optimize the exposition size by merging
spans with empty buckets in between, we still get a normalized
results. It will also normalize away any valid but weird
representations like empty spans, spans with offset zero, and empty
buckets at the start or end of a span.
The implementation seemed easy at first as it just turns the
`compactBuckets` helper into a generic function (which now got its own
file). However, the integer Histograms have delta buckets instead of
absolute buckets, which had to be treated specially in the generic
`compactBuckets` function. To make sure it works, I have added plenty
of explicit tests for `Histogram` in addition to the `FloatHistogram`
tests.
I have also updated the doc comment for the `Compact` method.
Based on the insights now expressed in the doc comment, compacting
with a maxEmptyBuckets > 0 is rarely useful. Therefore, this commit
also sets the value to 0 in the two cases we were using 3 so far. We
might still want to reconsider, so I don't want to remove the
maxEmptyBuckets parameter right now.
Signed-off-by: beorn7 <beorn@grafana.com>
2022-09-27 07:04:16 -04:00
//
// The ideal value for maxEmptyBuckets depends on circumstances. The motivation
2023-07-13 09:53:40 -04:00
// to set maxEmptyBuckets > 0 is the assumption that is less overhead to
histograms: Add Compact method to the normal integer Histogram
And use the new method to call to compact Histograms during
parsing. This happens for both `Histogram` and `FloatHistogram`. In
this way, if targets decide to optimize the exposition size by merging
spans with empty buckets in between, we still get a normalized
results. It will also normalize away any valid but weird
representations like empty spans, spans with offset zero, and empty
buckets at the start or end of a span.
The implementation seemed easy at first as it just turns the
`compactBuckets` helper into a generic function (which now got its own
file). However, the integer Histograms have delta buckets instead of
absolute buckets, which had to be treated specially in the generic
`compactBuckets` function. To make sure it works, I have added plenty
of explicit tests for `Histogram` in addition to the `FloatHistogram`
tests.
I have also updated the doc comment for the `Compact` method.
Based on the insights now expressed in the doc comment, compacting
with a maxEmptyBuckets > 0 is rarely useful. Therefore, this commit
also sets the value to 0 in the two cases we were using 3 so far. We
might still want to reconsider, so I don't want to remove the
maxEmptyBuckets parameter right now.
Signed-off-by: beorn7 <beorn@grafana.com>
2022-09-27 07:04:16 -04:00
// represent very few empty buckets explicitly within one span than cutting the
// one span into two to treat the empty buckets as a gap between the two spans,
// both in terms of storage requirement as well as in terms of encoding and
// decoding effort. However, the tradeoffs are subtle. For one, they are
// different in the exposition format vs. in a TSDB chunk vs. for the in-memory
// representation as Go types. In the TSDB, as an additional aspects, the span
// layout is only stored once per chunk, while many histograms with that same
// chunk layout are then only stored with their buckets (so that even a single
// empty bucket will be stored many times).
//
// For the Go types, an additional Span takes 8 bytes. Similarly, an additional
// bucket takes 8 bytes. Therefore, with a single separating empty bucket, both
// options have the same storage requirement, but the single-span solution is
// easier to iterate through. Still, the safest bet is to use maxEmptyBuckets==0
// and only use a larger number if you know what you are doing.
2021-12-06 07:49:18 -05:00
func ( h * FloatHistogram ) Compact ( maxEmptyBuckets int ) * FloatHistogram {
2026-02-07 18:52:22 -05:00
h . PositiveBuckets , _ , h . PositiveSpans = compactBuckets (
h . PositiveBuckets , nil , h . PositiveSpans , maxEmptyBuckets , false ,
2021-12-21 13:11:45 -05:00
)
2026-02-07 18:52:22 -05:00
h . NegativeBuckets , _ , h . NegativeSpans = compactBuckets (
h . NegativeBuckets , nil , h . NegativeSpans , maxEmptyBuckets , false ,
2021-12-21 13:11:45 -05:00
)
2021-12-06 07:49:18 -05:00
return h
}
2026-02-07 18:52:22 -05:00
// kahanCompact works like Compact, but it is specialized for FloatHistogram's KahanAdd method.
// c is a histogram holding the Kahan compensation term.
func ( h * FloatHistogram ) kahanCompact ( maxEmptyBuckets int , c * FloatHistogram ,
) ( updatedH , updatedC * FloatHistogram ) {
h . PositiveBuckets , c . PositiveBuckets , h . PositiveSpans = compactBuckets (
h . PositiveBuckets , c . PositiveBuckets , h . PositiveSpans , maxEmptyBuckets , false ,
)
h . NegativeBuckets , c . NegativeBuckets , h . NegativeSpans = compactBuckets (
h . NegativeBuckets , c . NegativeBuckets , h . NegativeSpans , maxEmptyBuckets , false ,
)
return h , c
}
2021-12-06 07:49:18 -05:00
// DetectReset returns true if the receiving histogram is missing any buckets
// that have a non-zero population in the provided previous histogram. It also
// returns true if any count (in any bucket, in the zero count, or in the count
// of observations, but NOT the sum of observations) is smaller in the receiving
// histogram compared to the previous histogram. Otherwise, it returns false.
//
2023-03-13 16:31:49 -04:00
// This method will shortcut to true if a CounterReset is detected, and shortcut
// to false if NotCounterReset is detected. Otherwise it will do the work to detect
// a reset.
2023-03-08 16:58:40 -05:00
//
2022-01-06 11:44:30 -05:00
// Special behavior in case the Schema or the ZeroThreshold are not the same in
// both histograms:
//
histograms: Add Compact method to the normal integer Histogram
And use the new method to call to compact Histograms during
parsing. This happens for both `Histogram` and `FloatHistogram`. In
this way, if targets decide to optimize the exposition size by merging
spans with empty buckets in between, we still get a normalized
results. It will also normalize away any valid but weird
representations like empty spans, spans with offset zero, and empty
buckets at the start or end of a span.
The implementation seemed easy at first as it just turns the
`compactBuckets` helper into a generic function (which now got its own
file). However, the integer Histograms have delta buckets instead of
absolute buckets, which had to be treated specially in the generic
`compactBuckets` function. To make sure it works, I have added plenty
of explicit tests for `Histogram` in addition to the `FloatHistogram`
tests.
I have also updated the doc comment for the `Compact` method.
Based on the insights now expressed in the doc comment, compacting
with a maxEmptyBuckets > 0 is rarely useful. Therefore, this commit
also sets the value to 0 in the two cases we were using 3 so far. We
might still want to reconsider, so I don't want to remove the
maxEmptyBuckets parameter right now.
Signed-off-by: beorn7 <beorn@grafana.com>
2022-09-27 07:04:16 -04:00
// - A decrease of the ZeroThreshold or an increase of the Schema (i.e. an
// increase of resolution) can only happen together with a reset. Thus, the
// method returns true in either case.
2022-01-06 11:44:30 -05:00
//
histograms: Add Compact method to the normal integer Histogram
And use the new method to call to compact Histograms during
parsing. This happens for both `Histogram` and `FloatHistogram`. In
this way, if targets decide to optimize the exposition size by merging
spans with empty buckets in between, we still get a normalized
results. It will also normalize away any valid but weird
representations like empty spans, spans with offset zero, and empty
buckets at the start or end of a span.
The implementation seemed easy at first as it just turns the
`compactBuckets` helper into a generic function (which now got its own
file). However, the integer Histograms have delta buckets instead of
absolute buckets, which had to be treated specially in the generic
`compactBuckets` function. To make sure it works, I have added plenty
of explicit tests for `Histogram` in addition to the `FloatHistogram`
tests.
I have also updated the doc comment for the `Compact` method.
Based on the insights now expressed in the doc comment, compacting
with a maxEmptyBuckets > 0 is rarely useful. Therefore, this commit
also sets the value to 0 in the two cases we were using 3 so far. We
might still want to reconsider, so I don't want to remove the
maxEmptyBuckets parameter right now.
Signed-off-by: beorn7 <beorn@grafana.com>
2022-09-27 07:04:16 -04:00
// - Upon an increase of the ZeroThreshold, the buckets in the previous
// histogram that fall within the new ZeroThreshold are added to the ZeroCount
// of the previous histogram (without mutating the provided previous
// histogram). The scenario that a populated bucket of the previous histogram
// is partially within, partially outside of the new ZeroThreshold, can only
// happen together with a counter reset and therefore shortcuts to returning
// true.
2022-01-06 11:44:30 -05:00
//
histograms: Add Compact method to the normal integer Histogram
And use the new method to call to compact Histograms during
parsing. This happens for both `Histogram` and `FloatHistogram`. In
this way, if targets decide to optimize the exposition size by merging
spans with empty buckets in between, we still get a normalized
results. It will also normalize away any valid but weird
representations like empty spans, spans with offset zero, and empty
buckets at the start or end of a span.
The implementation seemed easy at first as it just turns the
`compactBuckets` helper into a generic function (which now got its own
file). However, the integer Histograms have delta buckets instead of
absolute buckets, which had to be treated specially in the generic
`compactBuckets` function. To make sure it works, I have added plenty
of explicit tests for `Histogram` in addition to the `FloatHistogram`
tests.
I have also updated the doc comment for the `Compact` method.
Based on the insights now expressed in the doc comment, compacting
with a maxEmptyBuckets > 0 is rarely useful. Therefore, this commit
also sets the value to 0 in the two cases we were using 3 so far. We
might still want to reconsider, so I don't want to remove the
maxEmptyBuckets parameter right now.
Signed-off-by: beorn7 <beorn@grafana.com>
2022-09-27 07:04:16 -04:00
// - Upon a decrease of the Schema, the buckets of the previous histogram are
// merged so that they match the new, lower-resolution schema (again without
// mutating the provided previous histogram).
2021-12-06 07:49:18 -05:00
func ( h * FloatHistogram ) DetectReset ( previous * FloatHistogram ) bool {
2023-02-15 07:49:51 -05:00
if h . CounterResetHint == CounterReset {
return true
}
if h . CounterResetHint == NotCounterReset {
return false
}
2023-03-08 16:58:40 -05:00
// In all other cases of CounterResetHint (UnknownCounterReset and GaugeType),
// we go on as we would otherwise, for reasons explained below.
//
// If the CounterResetHint is UnknownCounterReset, we do not know yet if this histogram comes
// with a counter reset. Therefore, we have to do all the detailed work to find out if there
// is a counter reset or not.
// We do the same if the CounterResetHint is GaugeType, which should not happen, but PromQL still
// allows the user to apply functions to gauge histograms that are only meant for counter histograms.
2024-01-17 11:06:35 -05:00
// In this case, we treat the gauge histograms as counter histograms. A warning should be returned
// to the user in this case.
2021-12-06 07:49:18 -05:00
if h . Count < previous . Count {
return true
}
2025-10-17 19:03:52 -04:00
if h . UsesCustomBuckets ( ) {
if ! previous . UsesCustomBuckets ( ) {
// Mark that something has changed or that the application has been restarted. However, this does
// not matter so much since the change in schema will be handled directly in the chunks and PromQL
// functions.
return true
}
if ! CustomBucketBoundsMatch ( h . CustomValues , previous . CustomValues ) {
// Custom bounds don't match - check if any reconciled bucket value has decreased.
return h . detectResetWithMismatchedCustomBounds ( previous , h . CustomValues , previous . CustomValues )
}
2024-02-28 08:06:43 -05:00
}
2022-01-06 11:44:30 -05:00
if h . Schema > previous . Schema {
return true
}
if h . ZeroThreshold < previous . ZeroThreshold {
// ZeroThreshold decreased.
2021-12-06 07:49:18 -05:00
return true
}
2026-02-07 18:52:22 -05:00
previousZeroCount , newThreshold , _ := previous . zeroCountForLargerThreshold ( h . ZeroThreshold , nil )
2022-01-06 11:44:30 -05:00
if newThreshold != h . ZeroThreshold {
// ZeroThreshold is within a populated bucket in previous
// histogram.
return true
}
if h . ZeroCount < previousZeroCount {
return true
}
currIt := h . floatBucketIterator ( true , h . ZeroThreshold , h . Schema )
prevIt := previous . floatBucketIterator ( true , h . ZeroThreshold , h . Schema )
2023-10-09 02:40:59 -04:00
if detectReset ( & currIt , & prevIt ) {
2021-12-06 07:49:18 -05:00
return true
}
2022-01-06 11:44:30 -05:00
currIt = h . floatBucketIterator ( false , h . ZeroThreshold , h . Schema )
prevIt = previous . floatBucketIterator ( false , h . ZeroThreshold , h . Schema )
2023-10-09 02:40:59 -04:00
return detectReset ( & currIt , & prevIt )
2021-12-06 07:49:18 -05:00
}
2023-10-09 02:40:59 -04:00
func detectReset ( currIt , prevIt * floatBucketIterator ) bool {
2021-12-06 07:49:18 -05:00
if ! prevIt . Next ( ) {
return false // If no buckets in previous histogram, nothing can be reset.
}
2023-10-09 02:40:59 -04:00
prevBucket := prevIt . strippedAt ( )
2021-12-06 07:49:18 -05:00
if ! currIt . Next ( ) {
// No bucket in current, but at least one in previous
// histogram. Check if any of those are non-zero, in which case
// this is a reset.
for {
2023-10-09 02:40:59 -04:00
if prevBucket . count != 0 {
2021-12-06 07:49:18 -05:00
return true
}
if ! prevIt . Next ( ) {
return false
}
}
}
2023-10-09 02:40:59 -04:00
currBucket := currIt . strippedAt ( )
2021-12-06 07:49:18 -05:00
for {
// Forward currIt until we find the bucket corresponding to prevBucket.
2023-10-09 02:40:59 -04:00
for currBucket . index < prevBucket . index {
2021-12-06 07:49:18 -05:00
if ! currIt . Next ( ) {
// Reached end of currIt early, therefore
// previous histogram has a bucket that the
2024-09-10 16:32:03 -04:00
// current one does not have. Unless all
2021-12-06 07:49:18 -05:00
// remaining buckets in the previous histogram
// are unpopulated, this is a reset.
for {
2023-10-09 02:40:59 -04:00
if prevBucket . count != 0 {
2021-12-06 07:49:18 -05:00
return true
}
if ! prevIt . Next ( ) {
return false
}
}
}
2023-10-09 02:40:59 -04:00
currBucket = currIt . strippedAt ( )
2021-12-06 07:49:18 -05:00
}
2023-10-09 02:40:59 -04:00
if currBucket . index > prevBucket . index {
2021-12-06 07:49:18 -05:00
// Previous histogram has a bucket the current one does
// not have. If it's populated, it's a reset.
2023-10-09 02:40:59 -04:00
if prevBucket . count != 0 {
2021-12-06 07:49:18 -05:00
return true
}
} else {
// We have reached corresponding buckets in both iterators.
// We can finally compare the counts.
2023-10-09 02:40:59 -04:00
if currBucket . count < prevBucket . count {
2021-12-06 07:49:18 -05:00
return true
}
}
if ! prevIt . Next ( ) {
// Reached end of prevIt without finding offending buckets.
return false
}
2023-10-09 02:40:59 -04:00
prevBucket = prevIt . strippedAt ( )
2021-12-06 07:49:18 -05:00
}
}
2022-10-03 07:15:27 -04:00
// PositiveBucketIterator returns a BucketIterator to iterate over all positive
// buckets in ascending order (starting next to the zero bucket and going up).
func ( h * FloatHistogram ) PositiveBucketIterator ( ) BucketIterator [ float64 ] {
2023-10-09 02:40:59 -04:00
it := h . floatBucketIterator ( true , 0 , h . Schema )
return & it
2021-11-23 13:40:49 -05:00
}
2022-10-03 07:15:27 -04:00
// NegativeBucketIterator returns a BucketIterator to iterate over all negative
// buckets in descending order (starting next to the zero bucket and going
// down).
func ( h * FloatHistogram ) NegativeBucketIterator ( ) BucketIterator [ float64 ] {
2023-10-09 02:40:59 -04:00
it := h . floatBucketIterator ( false , 0 , h . Schema )
return & it
2021-11-23 13:40:49 -05:00
}
2022-10-03 07:15:27 -04:00
// PositiveReverseBucketIterator returns a BucketIterator to iterate over all
// positive buckets in descending order (starting at the highest bucket and
// going down towards the zero bucket).
func ( h * FloatHistogram ) PositiveReverseBucketIterator ( ) BucketIterator [ float64 ] {
2024-03-22 09:36:39 -04:00
it := newReverseFloatBucketIterator ( h . PositiveSpans , h . PositiveBuckets , h . Schema , true , h . CustomValues )
2023-10-09 02:40:59 -04:00
return & it
2021-12-06 08:47:22 -05:00
}
2022-10-03 07:15:27 -04:00
// NegativeReverseBucketIterator returns a BucketIterator to iterate over all
// negative buckets in ascending order (starting at the lowest bucket and going
// up towards the zero bucket).
func ( h * FloatHistogram ) NegativeReverseBucketIterator ( ) BucketIterator [ float64 ] {
2024-02-28 08:06:43 -05:00
it := newReverseFloatBucketIterator ( h . NegativeSpans , h . NegativeBuckets , h . Schema , false , nil )
2023-10-09 02:40:59 -04:00
return & it
2021-12-06 08:47:22 -05:00
}
2022-10-03 07:15:27 -04:00
// AllBucketIterator returns a BucketIterator to iterate over all negative,
2021-12-15 10:50:37 -05:00
// zero, and positive buckets in ascending order (starting at the lowest bucket
2022-05-03 10:24:11 -04:00
// and going up). If the highest negative bucket or the lowest positive bucket
// overlap with the zero bucket, their upper or lower boundary, respectively, is
// set to the zero threshold.
2022-10-03 07:15:27 -04:00
func ( h * FloatHistogram ) AllBucketIterator ( ) BucketIterator [ float64 ] {
2022-01-06 11:44:30 -05:00
return & allFloatBucketIterator {
2023-07-05 07:05:53 -04:00
h : h ,
2024-02-28 08:06:43 -05:00
leftIter : newReverseFloatBucketIterator ( h . NegativeSpans , h . NegativeBuckets , h . Schema , false , nil ) ,
2023-10-09 02:40:59 -04:00
rightIter : h . floatBucketIterator ( true , 0 , h . Schema ) ,
2023-07-05 07:05:53 -04:00
state : - 1 ,
2022-01-06 11:44:30 -05:00
}
2021-12-06 08:47:22 -05:00
}
2023-03-27 17:54:29 -04:00
// AllReverseBucketIterator returns a BucketIterator to iterate over all negative,
// zero, and positive buckets in descending order (starting at the lowest bucket
// and going up). If the highest negative bucket or the lowest positive bucket
// overlap with the zero bucket, their upper or lower boundary, respectively, is
// set to the zero threshold.
func ( h * FloatHistogram ) AllReverseBucketIterator ( ) BucketIterator [ float64 ] {
2023-07-05 07:05:53 -04:00
return & allFloatBucketIterator {
h : h ,
2024-03-22 09:36:39 -04:00
leftIter : newReverseFloatBucketIterator ( h . PositiveSpans , h . PositiveBuckets , h . Schema , true , h . CustomValues ) ,
2023-10-09 02:40:59 -04:00
rightIter : h . floatBucketIterator ( false , 0 , h . Schema ) ,
2023-07-05 07:05:53 -04:00
state : - 1 ,
2023-03-27 17:54:29 -04:00
}
}
2023-11-03 10:47:59 -04:00
// Validate validates consistency between span and bucket slices. Also, buckets are checked
2024-02-28 08:06:43 -05:00
// against negative values. We check to make sure there are no unexpected fields or field values
// based on the exponential / custom buckets schema.
2023-11-03 10:47:59 -04:00
// We do not check for h.Count being at least as large as the sum of the
// counts in the buckets because floating point precision issues can
// create false positives here.
func ( h * FloatHistogram ) Validate ( ) error {
var nCount , pCount float64
2025-09-13 10:25:21 -04:00
switch {
case IsCustomBucketsSchema ( h . Schema ) :
2024-03-22 09:36:39 -04:00
if err := checkHistogramCustomBounds ( h . CustomValues , h . PositiveSpans , len ( h . PositiveBuckets ) ) ; err != nil {
2024-02-28 08:06:43 -05:00
return fmt . Errorf ( "custom buckets: %w" , err )
}
if h . ZeroCount != 0 {
2025-09-23 01:24:46 -04:00
return ErrHistogramCustomBucketsZeroCount
2024-02-28 08:06:43 -05:00
}
if h . ZeroThreshold != 0 {
2025-09-23 01:24:46 -04:00
return ErrHistogramCustomBucketsZeroThresh
2024-02-28 08:06:43 -05:00
}
if len ( h . NegativeSpans ) > 0 {
2025-09-23 01:24:46 -04:00
return ErrHistogramCustomBucketsNegSpans
2024-02-28 08:06:43 -05:00
}
if len ( h . NegativeBuckets ) > 0 {
2025-09-23 01:24:46 -04:00
return ErrHistogramCustomBucketsNegBuckets
2024-02-28 08:06:43 -05:00
}
2025-09-13 10:25:21 -04:00
case IsExponentialSchema ( h . Schema ) :
2024-02-28 08:06:43 -05:00
if err := checkHistogramSpans ( h . PositiveSpans , len ( h . PositiveBuckets ) ) ; err != nil {
return fmt . Errorf ( "positive side: %w" , err )
}
if err := checkHistogramSpans ( h . NegativeSpans , len ( h . NegativeBuckets ) ) ; err != nil {
return fmt . Errorf ( "negative side: %w" , err )
}
err := checkHistogramBuckets ( h . NegativeBuckets , & nCount , false )
if err != nil {
return fmt . Errorf ( "negative side: %w" , err )
}
2025-09-30 13:03:46 -04:00
if h . ZeroCount < 0 {
return fmt . Errorf ( "zero bucket has observation count of %v: %w" , h . ZeroCount , ErrHistogramNegativeBucketCount )
}
2024-03-22 09:36:39 -04:00
if h . CustomValues != nil {
2025-09-23 01:24:46 -04:00
return ErrHistogramExpSchemaCustomBounds
2024-02-28 08:06:43 -05:00
}
2025-09-13 10:25:21 -04:00
default :
2025-09-19 03:26:34 -04:00
return InvalidSchemaError ( h . Schema )
2023-11-03 10:47:59 -04:00
}
2025-09-30 13:03:46 -04:00
if h . Count < 0 {
return fmt . Errorf ( "observation count is %v: %w" , h . Count , ErrHistogramNegativeCount )
}
2024-02-28 08:06:43 -05:00
err := checkHistogramBuckets ( h . PositiveBuckets , & pCount , false )
2023-11-03 10:47:59 -04:00
if err != nil {
2023-11-07 22:49:39 -05:00
return fmt . Errorf ( "positive side: %w" , err )
2023-11-03 10:47:59 -04:00
}
return nil
}
2022-01-06 11:44:30 -05:00
// zeroCountForLargerThreshold returns what the histogram's zero count would be
2026-02-07 18:52:22 -05:00
// if the ZeroThreshold had the provided larger (or equal) value. It also returns the
// zero count of the compensation histogram `c` if provided (used for Kahan summation).
//
// If the provided ZeroThreshold is less than the histogram's ZeroThreshold, the method panics.
2022-01-06 11:44:30 -05:00
// If the largerThreshold ends up within a populated bucket of the histogram, it
// is adjusted upwards to the lower limit of that bucket (all in terms of
// absolute values) and that bucket's count is included in the returned
// count. The adjusted threshold is returned, too.
2026-02-07 18:52:22 -05:00
func ( h * FloatHistogram ) zeroCountForLargerThreshold (
largerThreshold float64 , c * FloatHistogram ) ( hZeroCount , threshold , cZeroCount float64 ,
) {
if c != nil {
cZeroCount = c . ZeroCount
}
2022-01-06 11:44:30 -05:00
// Fast path.
if largerThreshold == h . ZeroThreshold {
2026-02-07 18:52:22 -05:00
return h . ZeroCount , largerThreshold , cZeroCount
2022-01-06 11:44:30 -05:00
}
if largerThreshold < h . ZeroThreshold {
panic ( fmt . Errorf ( "new threshold %f is less than old threshold %f" , largerThreshold , h . ZeroThreshold ) )
}
outer :
for {
2026-02-07 18:52:22 -05:00
hZeroCount = h . ZeroCount
2022-01-06 11:44:30 -05:00
i := h . PositiveBucketIterator ( )
2026-02-07 18:52:22 -05:00
bucketsIdx := 0
2022-01-06 11:44:30 -05:00
for i . Next ( ) {
b := i . At ( )
if b . Lower >= largerThreshold {
break
}
2026-02-07 18:52:22 -05:00
// Bucket to be merged into zero bucket.
hZeroCount , cZeroCount = kahansum . Inc ( b . Count , hZeroCount , cZeroCount )
if c != nil {
hZeroCount , cZeroCount = kahansum . Inc ( c . PositiveBuckets [ bucketsIdx ] , hZeroCount , cZeroCount )
}
2022-01-06 11:44:30 -05:00
if b . Upper > largerThreshold {
// New threshold ended up within a bucket. if it's
// populated, we need to adjust largerThreshold before
// we are done here.
if b . Count != 0 {
largerThreshold = b . Upper
}
break
}
2026-02-07 18:52:22 -05:00
bucketsIdx ++
2022-01-06 11:44:30 -05:00
}
i = h . NegativeBucketIterator ( )
2026-02-07 18:52:22 -05:00
bucketsIdx = 0
2022-01-06 11:44:30 -05:00
for i . Next ( ) {
b := i . At ( )
if b . Upper <= - largerThreshold {
break
}
2026-02-07 18:52:22 -05:00
// Bucket to be merged into zero bucket.
hZeroCount , cZeroCount = kahansum . Inc ( b . Count , hZeroCount , cZeroCount )
if c != nil {
hZeroCount , cZeroCount = kahansum . Inc ( c . NegativeBuckets [ bucketsIdx ] , hZeroCount , cZeroCount )
}
2022-01-06 11:44:30 -05:00
if b . Lower < - largerThreshold {
// New threshold ended up within a bucket. If
// it's populated, we need to adjust
// largerThreshold and have to redo the whole
// thing because the treatment of the positive
// buckets is invalid now.
if b . Count != 0 {
largerThreshold = - b . Lower
continue outer
}
break
}
2026-02-07 18:52:22 -05:00
bucketsIdx ++
2022-01-06 11:44:30 -05:00
}
2026-02-07 18:52:22 -05:00
return hZeroCount , largerThreshold , cZeroCount
2022-01-06 11:44:30 -05:00
}
}
// trimBucketsInZeroBucket removes all buckets that are within the zero
// bucket. It assumes that the zero threshold is at a bucket boundary and that
// the counts in the buckets to remove are already part of the zero count.
2026-02-07 18:52:22 -05:00
// c is a histogram holding the Kahan compensation term.
func ( h * FloatHistogram ) trimBucketsInZeroBucket ( c * FloatHistogram ) {
2022-01-06 11:44:30 -05:00
i := h . PositiveBucketIterator ( )
bucketsIdx := 0
for i . Next ( ) {
b := i . At ( )
if b . Lower >= h . ZeroThreshold {
break
}
h . PositiveBuckets [ bucketsIdx ] = 0
2026-02-07 18:52:22 -05:00
if c != nil {
c . PositiveBuckets [ bucketsIdx ] = 0
}
2022-01-06 11:44:30 -05:00
bucketsIdx ++
}
i = h . NegativeBucketIterator ( )
bucketsIdx = 0
for i . Next ( ) {
b := i . At ( )
if b . Upper <= - h . ZeroThreshold {
break
}
h . NegativeBuckets [ bucketsIdx ] = 0
2026-02-07 18:52:22 -05:00
if c != nil {
c . NegativeBuckets [ bucketsIdx ] = 0
}
2022-01-06 11:44:30 -05:00
bucketsIdx ++
}
// We are abusing Compact to trim the buckets set to zero
// above. Premature compacting could cause additional cost, but this
// code path is probably rarely used anyway.
2026-02-07 18:52:22 -05:00
if c != nil {
h . kahanCompact ( 0 , c )
} else {
h . Compact ( 0 )
}
2022-01-06 11:44:30 -05:00
}
// reconcileZeroBuckets finds a zero bucket large enough to include the zero
// buckets of both histograms (the receiving histogram and the other histogram)
// with a zero threshold that is not within a populated bucket in either
2026-02-07 18:52:22 -05:00
// histogram. This method modifies the receiving histogram accordingly, and
// also modifies the compensation histogram `c` (used for Kahan summation) if provided,
// but leaves the other histogram as is. Instead, it returns the zero count the
// other histogram would have if it were modified, as well as its Kahan compensation term.
func ( h * FloatHistogram ) reconcileZeroBuckets ( other , c * FloatHistogram ) ( otherZeroCount , otherCZeroCount float64 ) {
otherZeroCount = other . ZeroCount
2022-01-06 11:44:30 -05:00
otherZeroThreshold := other . ZeroThreshold
for otherZeroThreshold != h . ZeroThreshold {
if h . ZeroThreshold > otherZeroThreshold {
2026-02-07 18:52:22 -05:00
otherZeroCount , otherZeroThreshold , otherCZeroCount = other . zeroCountForLargerThreshold ( h . ZeroThreshold , nil )
2022-01-06 11:44:30 -05:00
}
if otherZeroThreshold > h . ZeroThreshold {
2026-02-07 18:52:22 -05:00
var cZeroCount float64
h . ZeroCount , h . ZeroThreshold , cZeroCount = h . zeroCountForLargerThreshold ( otherZeroThreshold , c )
if c != nil {
c . ZeroCount = cZeroCount
}
h . trimBucketsInZeroBucket ( c )
2022-01-06 11:44:30 -05:00
}
}
2026-02-07 18:52:22 -05:00
return otherZeroCount , otherCZeroCount
2022-01-06 11:44:30 -05:00
}
// floatBucketIterator is a low-level constructor for bucket iterators.
//
// If positive is true, the returned iterator iterates through the positive
// buckets, otherwise through the negative buckets.
//
2024-02-28 08:06:43 -05:00
// Only for exponential schemas, if absoluteStartValue is < the lowest absolute
// value of any upper bucket boundary, the iterator starts with the first bucket.
// Otherwise, it will skip all buckets with an absolute value of their upper boundary ≤
// absoluteStartValue. For custom bucket schemas, absoluteStartValue is ignored and
// no buckets are skipped.
2022-01-06 11:44:30 -05:00
//
// targetSchema must be ≤ the schema of FloatHistogram (and of course within the
// legal values for schemas in general). The buckets are merged to match the
2024-06-07 06:50:59 -04:00
// targetSchema prior to iterating (without mutating FloatHistogram), but custom buckets
// schemas cannot be merged with other schemas.
2022-01-06 11:44:30 -05:00
func ( h * FloatHistogram ) floatBucketIterator (
positive bool , absoluteStartValue float64 , targetSchema int32 ,
2023-10-09 02:40:59 -04:00
) floatBucketIterator {
2024-02-28 08:06:43 -05:00
if h . UsesCustomBuckets ( ) && targetSchema != h . Schema {
2024-11-03 07:15:51 -05:00
panic ( errors . New ( "cannot merge from custom buckets schema to exponential schema" ) )
2024-02-28 08:06:43 -05:00
}
if ! h . UsesCustomBuckets ( ) && IsCustomBucketsSchema ( targetSchema ) {
2024-11-03 07:15:51 -05:00
panic ( errors . New ( "cannot merge from exponential buckets schema to custom schema" ) )
2024-02-28 08:06:43 -05:00
}
2022-01-06 11:44:30 -05:00
if targetSchema > h . Schema {
panic ( fmt . Errorf ( "cannot merge from schema %d to %d" , h . Schema , targetSchema ) )
}
2023-10-09 02:40:59 -04:00
i := floatBucketIterator {
2022-10-03 07:15:27 -04:00
baseBucketIterator : baseBucketIterator [ float64 , float64 ] {
schema : h . Schema ,
positive : positive ,
} ,
2023-11-29 01:54:05 -05:00
targetSchema : targetSchema ,
absoluteStartValue : absoluteStartValue ,
boundReachedStartValue : absoluteStartValue == 0 ,
2022-01-06 11:44:30 -05:00
}
if positive {
i . spans = h . PositiveSpans
i . buckets = h . PositiveBuckets
2024-03-22 09:36:39 -04:00
i . customValues = h . CustomValues
2022-01-06 11:44:30 -05:00
} else {
i . spans = h . NegativeSpans
i . buckets = h . NegativeBuckets
}
return i
2021-11-23 13:40:49 -05:00
}
2025-07-10 02:43:25 -04:00
// newReverseFloatBucketIterator is a low-level constructor for reverse bucket iterators.
2022-10-03 07:15:27 -04:00
func newReverseFloatBucketIterator (
2024-03-22 09:36:39 -04:00
spans [ ] Span , buckets [ ] float64 , schema int32 , positive bool , customValues [ ] float64 ,
2023-10-09 02:40:59 -04:00
) reverseFloatBucketIterator {
r := reverseFloatBucketIterator {
2022-10-03 07:15:27 -04:00
baseBucketIterator : baseBucketIterator [ float64 , float64 ] {
2024-02-28 08:06:43 -05:00
schema : schema ,
spans : spans ,
buckets : buckets ,
positive : positive ,
2024-03-22 09:36:39 -04:00
customValues : customValues ,
2022-10-03 07:15:27 -04:00
} ,
2021-11-23 13:40:49 -05:00
}
2022-01-06 11:44:30 -05:00
r . spansIdx = len ( r . spans ) - 1
r . bucketsIdx = len ( r . buckets ) - 1
if r . spansIdx >= 0 {
r . idxInSpan = int32 ( r . spans [ r . spansIdx ] . Length ) - 1
}
r . currIdx = 0
for _ , s := range r . spans {
r . currIdx += s . Offset + int32 ( s . Length )
}
2021-11-23 13:40:49 -05:00
return r
}
2022-01-06 11:44:30 -05:00
type floatBucketIterator struct {
2022-10-03 07:15:27 -04:00
baseBucketIterator [ float64 , float64 ]
2022-01-06 11:44:30 -05:00
2022-10-03 07:15:27 -04:00
targetSchema int32 // targetSchema is the schema to merge to and must be ≤ schema.
origIdx int32 // The bucket index within the original schema.
2022-01-06 11:44:30 -05:00
absoluteStartValue float64 // Never return buckets with an upper bound ≤ this value.
2023-10-09 02:40:59 -04:00
boundReachedStartValue bool // Has getBound reached absoluteStartValue already?
2022-01-06 11:44:30 -05:00
}
2023-07-18 12:17:47 -04:00
func ( i * floatBucketIterator ) At ( ) Bucket [ float64 ] {
// Need to use i.targetSchema rather than i.baseBucketIterator.schema.
2025-05-03 13:05:13 -04:00
return i . at ( i . targetSchema )
2023-07-18 12:17:47 -04:00
}
2022-01-06 11:44:30 -05:00
func ( i * floatBucketIterator ) Next ( ) bool {
if i . spansIdx >= len ( i . spans ) {
2021-11-23 13:40:49 -05:00
return false
}
model/histogram: Make histogram bucket iterators more robust
Currently, iterating over histogram buckets can panic if the spans are
not consistent with the buckets. We aim for validating histograms upon
ingestion, but there might still be data corruptions on disk that
could trigger the panic. While data corruption on disk is really bad
and will lead to all kind of weirdness, we should still avoid
panic'ing.
Note, though, that chunks are secured by checksums, so the corruptions
won't realistically happen because of disk faults, but more likely
because a chunk was generated in a faulty way in the first place, by
a software bug or even maliciously.
This commit prevents panics in the situation where there are fewer
buckets than described by the spans. Note that the missing buckets
will simply not be iterated over. There is no signalling of this
problem. We might still consider this separately, but for now, I would
say that this kind of corruption is exceedingly rare and doesn't
deserve special treatment (which will add a whole lot of complexity to
the code).
Signed-off-by: beorn7 <beorn@grafana.com>
2025-11-18 16:59:46 -05:00
span := i . spans [ i . spansIdx ]
2022-01-06 11:44:30 -05:00
2023-11-29 01:54:05 -05:00
if i . schema == i . targetSchema {
// Fast path for the common case.
2022-01-06 11:44:30 -05:00
if i . bucketsIdx == 0 {
// Seed origIdx for the first bucket.
2023-11-29 01:54:05 -05:00
i . currIdx = span . Offset
2022-01-06 11:44:30 -05:00
} else {
2023-11-29 01:54:05 -05:00
i . currIdx ++
2022-01-06 11:44:30 -05:00
}
model/histogram: Make histogram bucket iterators more robust
Currently, iterating over histogram buckets can panic if the spans are
not consistent with the buckets. We aim for validating histograms upon
ingestion, but there might still be data corruptions on disk that
could trigger the panic. While data corruption on disk is really bad
and will lead to all kind of weirdness, we should still avoid
panic'ing.
Note, though, that chunks are secured by checksums, so the corruptions
won't realistically happen because of disk faults, but more likely
because a chunk was generated in a faulty way in the first place, by
a software bug or even maliciously.
This commit prevents panics in the situation where there are fewer
buckets than described by the spans. Note that the missing buckets
will simply not be iterated over. There is no signalling of this
problem. We might still consider this separately, but for now, I would
say that this kind of corruption is exceedingly rare and doesn't
deserve special treatment (which will add a whole lot of complexity to
the code).
Signed-off-by: beorn7 <beorn@grafana.com>
2025-11-18 16:59:46 -05:00
if i . bucketsIdx >= len ( i . buckets ) {
// This protects against index out of range panic, which
// can only happen with an invalid histogram.
return false
}
2023-11-29 01:54:05 -05:00
for i . idxInSpan >= span . Length {
2022-01-06 11:44:30 -05:00
// We have exhausted the current span and have to find a new
// one. We even handle pathologic spans of length 0 here.
2023-11-29 01:54:05 -05:00
i . idxInSpan = 0
i . spansIdx ++
if i . spansIdx >= len ( i . spans ) {
return false
2022-01-06 11:44:30 -05:00
}
2023-11-29 01:54:05 -05:00
span = i . spans [ i . spansIdx ]
i . currIdx += span . Offset
2022-01-06 11:44:30 -05:00
}
2023-11-29 01:54:05 -05:00
i . currCount = i . buckets [ i . bucketsIdx ]
i . idxInSpan ++
2022-01-06 11:44:30 -05:00
i . bucketsIdx ++
2023-11-29 01:54:05 -05:00
} else {
// Copy all of these into local variables so that we can forward to the
// next bucket and then roll back if needed.
origIdx , spansIdx , idxInSpan := i . origIdx , i . spansIdx , i . idxInSpan
firstPass := true
i . currCount = 0
mergeLoop : // Merge together all buckets from the original schema that fall into one bucket in the targetSchema.
for {
if i . bucketsIdx == 0 {
// Seed origIdx for the first bucket.
origIdx = span . Offset
} else {
origIdx ++
}
model/histogram: Make histogram bucket iterators more robust
Currently, iterating over histogram buckets can panic if the spans are
not consistent with the buckets. We aim for validating histograms upon
ingestion, but there might still be data corruptions on disk that
could trigger the panic. While data corruption on disk is really bad
and will lead to all kind of weirdness, we should still avoid
panic'ing.
Note, though, that chunks are secured by checksums, so the corruptions
won't realistically happen because of disk faults, but more likely
because a chunk was generated in a faulty way in the first place, by
a software bug or even maliciously.
This commit prevents panics in the situation where there are fewer
buckets than described by the spans. Note that the missing buckets
will simply not be iterated over. There is no signalling of this
problem. We might still consider this separately, but for now, I would
say that this kind of corruption is exceedingly rare and doesn't
deserve special treatment (which will add a whole lot of complexity to
the code).
Signed-off-by: beorn7 <beorn@grafana.com>
2025-11-18 16:59:46 -05:00
if i . bucketsIdx >= len ( i . buckets ) {
// This protects against index out of range panic, which
// can only happen with an invalid histogram.
if firstPass {
return false
}
break mergeLoop
}
2023-11-29 01:54:05 -05:00
for idxInSpan >= span . Length {
// We have exhausted the current span and have to find a new
// one. We even handle pathologic spans of length 0 here.
idxInSpan = 0
spansIdx ++
if spansIdx >= len ( i . spans ) {
if firstPass {
return false
}
break mergeLoop
}
span = i . spans [ spansIdx ]
origIdx += span . Offset
}
currIdx := targetIdx ( origIdx , i . schema , i . targetSchema )
switch {
case firstPass :
i . currIdx = currIdx
firstPass = false
case currIdx != i . currIdx :
// Reached next bucket in targetSchema.
// Do not actually forward to the next bucket, but break out.
break mergeLoop
}
i . currCount += i . buckets [ i . bucketsIdx ]
idxInSpan ++
i . bucketsIdx ++
i . origIdx , i . spansIdx , i . idxInSpan = origIdx , spansIdx , idxInSpan
if i . schema == i . targetSchema {
// Don't need to test the next bucket for mergeability
// if we have no schema change anyway.
break mergeLoop
}
2021-11-23 13:40:49 -05:00
}
}
2023-11-29 01:54:05 -05:00
2024-02-28 08:06:43 -05:00
// Skip buckets before absoluteStartValue for exponential schemas.
2022-01-06 11:44:30 -05:00
// TODO(beorn7): Maybe do something more efficient than this recursive call.
2024-02-28 08:06:43 -05:00
if ! i . boundReachedStartValue && IsExponentialSchema ( i . targetSchema ) && getBoundExponential ( i . currIdx , i . targetSchema ) <= i . absoluteStartValue {
2022-01-06 11:44:30 -05:00
return i . Next ( )
}
2023-10-09 02:40:59 -04:00
i . boundReachedStartValue = true
2022-01-06 11:44:30 -05:00
return true
}
2021-11-23 13:40:49 -05:00
2021-12-06 08:47:22 -05:00
type reverseFloatBucketIterator struct {
2022-10-03 07:15:27 -04:00
baseBucketIterator [ float64 , float64 ]
idxInSpan int32 // Changed from uint32 to allow negative values for exhaustion detection.
2021-12-06 08:47:22 -05:00
}
2022-10-03 07:15:27 -04:00
func ( i * reverseFloatBucketIterator ) Next ( ) bool {
i . currIdx --
if i . bucketsIdx < 0 {
2021-12-06 08:47:22 -05:00
return false
}
2022-10-03 07:15:27 -04:00
for i . idxInSpan < 0 {
2021-12-06 08:47:22 -05:00
// We have exhausted the current span and have to find a new
// one. We'll even handle pathologic spans of length 0.
2022-10-03 07:15:27 -04:00
i . spansIdx --
model/histogram: Make histogram bucket iterators more robust
Currently, iterating over histogram buckets can panic if the spans are
not consistent with the buckets. We aim for validating histograms upon
ingestion, but there might still be data corruptions on disk that
could trigger the panic. While data corruption on disk is really bad
and will lead to all kind of weirdness, we should still avoid
panic'ing.
Note, though, that chunks are secured by checksums, so the corruptions
won't realistically happen because of disk faults, but more likely
because a chunk was generated in a faulty way in the first place, by
a software bug or even maliciously.
This commit prevents panics in the situation where there are fewer
buckets than described by the spans. Note that the missing buckets
will simply not be iterated over. There is no signalling of this
problem. We might still consider this separately, but for now, I would
say that this kind of corruption is exceedingly rare and doesn't
deserve special treatment (which will add a whole lot of complexity to
the code).
Signed-off-by: beorn7 <beorn@grafana.com>
2025-11-18 16:59:46 -05:00
if i . spansIdx < 0 {
// This protects against index out of range panic, which
// can only happen with an invalid histogram.
return false
}
2022-10-03 07:15:27 -04:00
i . idxInSpan = int32 ( i . spans [ i . spansIdx ] . Length ) - 1
i . currIdx -= i . spans [ i . spansIdx + 1 ] . Offset
2021-12-06 08:47:22 -05:00
}
2022-10-03 07:15:27 -04:00
i . currCount = i . buckets [ i . bucketsIdx ]
i . bucketsIdx --
i . idxInSpan --
2021-12-06 08:47:22 -05:00
return true
}
type allFloatBucketIterator struct {
2023-10-09 02:40:59 -04:00
h * FloatHistogram
leftIter reverseFloatBucketIterator
rightIter floatBucketIterator
2021-12-06 08:47:22 -05:00
// -1 means we are iterating negative buckets.
2021-12-15 10:50:37 -05:00
// 0 means it is time for the zero bucket.
2021-12-06 08:47:22 -05:00
// 1 means we are iterating positive buckets.
// Anything else means iteration is over.
state int8
2022-10-03 07:15:27 -04:00
currBucket Bucket [ float64 ]
2021-12-06 08:47:22 -05:00
}
2022-10-03 07:15:27 -04:00
func ( i * allFloatBucketIterator ) Next ( ) bool {
switch i . state {
2021-12-06 08:47:22 -05:00
case - 1 :
2023-07-05 07:05:53 -04:00
if i . leftIter . Next ( ) {
i . currBucket = i . leftIter . At ( )
switch {
case i . currBucket . Upper < 0 && i . currBucket . Upper > - i . h . ZeroThreshold :
2022-10-03 07:15:27 -04:00
i . currBucket . Upper = - i . h . ZeroThreshold
2023-07-05 07:05:53 -04:00
case i . currBucket . Lower > 0 && i . currBucket . Lower < i . h . ZeroThreshold :
i . currBucket . Lower = i . h . ZeroThreshold
2022-05-03 10:24:11 -04:00
}
2021-12-06 08:47:22 -05:00
return true
}
2022-10-03 07:15:27 -04:00
i . state = 0
return i . Next ( )
2021-12-06 08:47:22 -05:00
case 0 :
2022-10-03 07:15:27 -04:00
i . state = 1
if i . h . ZeroCount > 0 {
2024-02-28 08:06:43 -05:00
i . currBucket = i . h . ZeroBucket ( )
2021-12-06 08:47:22 -05:00
return true
}
2022-10-03 07:15:27 -04:00
return i . Next ( )
2021-12-06 08:47:22 -05:00
case 1 :
2023-07-05 07:05:53 -04:00
if i . rightIter . Next ( ) {
i . currBucket = i . rightIter . At ( )
switch {
case i . currBucket . Lower > 0 && i . currBucket . Lower < i . h . ZeroThreshold :
2022-10-03 07:15:27 -04:00
i . currBucket . Lower = i . h . ZeroThreshold
2023-07-05 07:05:53 -04:00
case i . currBucket . Upper < 0 && i . currBucket . Upper > - i . h . ZeroThreshold :
2023-03-27 17:54:29 -04:00
i . currBucket . Upper = - i . h . ZeroThreshold
}
return true
}
i . state = 42
return false
}
return false
}
2023-07-05 07:05:53 -04:00
func ( i * allFloatBucketIterator ) At ( ) Bucket [ float64 ] {
2023-03-27 17:54:29 -04:00
return i . currBucket
}
2023-07-27 07:27:13 -04:00
// targetIdx returns the bucket index in the target schema for the given bucket
// index idx in the original schema.
func targetIdx ( idx , originSchema , targetSchema int32 ) int32 {
return ( ( idx - 1 ) >> ( originSchema - targetSchema ) ) + 1
}
2023-08-23 19:17:23 -04:00
// addBuckets adds the buckets described by spansB/bucketsB to the buckets described by spansA/bucketsA,
// creating missing buckets in spansA/bucketsA as needed.
// It returns the resulting spans/buckets (which must be used instead of the original spansA/bucketsA,
// although spansA/bucketsA might get modified by this function).
// All buckets must use the same provided schema.
// Buckets in spansB/bucketsB with an absolute upper limit ≤ threshold are ignored.
// If negative is true, the buckets in spansB/bucketsB are subtracted rather than added.
func addBuckets (
2023-08-23 19:27:33 -04:00
schema int32 , threshold float64 , negative bool ,
spansA [ ] Span , bucketsA [ ] float64 ,
2023-08-23 19:17:23 -04:00
spansB [ ] Span , bucketsB [ ] float64 ,
) ( [ ] Span , [ ] float64 ) {
2023-08-16 10:26:31 -04:00
var (
2023-11-29 12:23:34 -05:00
iSpan = - 1
iBucket = - 1
2023-08-16 10:26:31 -04:00
iInSpan int32
indexA int32
2023-08-23 00:52:24 -04:00
indexB int32
bIdxB int
bucketB float64
2023-08-16 10:26:31 -04:00
deltaIndex int32
2023-08-23 00:52:24 -04:00
lowerThanThreshold = true
2023-08-16 10:26:31 -04:00
)
for _ , spanB := range spansB {
indexB += spanB . Offset
for j := 0 ; j < int ( spanB . Length ) ; j ++ {
2024-02-28 08:06:43 -05:00
if lowerThanThreshold && IsExponentialSchema ( schema ) && getBoundExponential ( indexB , schema ) <= threshold {
2023-08-16 10:26:31 -04:00
goto nextLoop
}
lowerThanThreshold = false
2023-08-23 00:52:24 -04:00
bucketB = bucketsB [ bIdxB ]
if negative {
bucketB *= - 1
}
2023-08-16 10:26:31 -04:00
if iSpan == - 1 {
if len ( spansA ) == 0 || spansA [ 0 ] . Offset > indexB {
// Add bucket before all others.
bucketsA = append ( bucketsA , 0 )
copy ( bucketsA [ 1 : ] , bucketsA )
2023-08-23 00:52:24 -04:00
bucketsA [ 0 ] = bucketB
if len ( spansA ) > 0 && spansA [ 0 ] . Offset == indexB + 1 {
2023-08-16 10:26:31 -04:00
spansA [ 0 ] . Length ++
spansA [ 0 ] . Offset --
goto nextLoop
}
2023-11-29 12:23:34 -05:00
spansA = append ( spansA , Span { } )
copy ( spansA [ 1 : ] , spansA )
spansA [ 0 ] = Span { Offset : indexB , Length : 1 }
if len ( spansA ) > 1 {
// Convert the absolute offset in the formerly
// first span to a relative offset.
spansA [ 1 ] . Offset -= indexB + 1
}
goto nextLoop
2023-08-16 10:26:31 -04:00
} else if spansA [ 0 ] . Offset == indexB {
// Just add to first bucket.
2023-08-23 00:52:24 -04:00
bucketsA [ 0 ] += bucketB
2023-08-16 10:26:31 -04:00
goto nextLoop
}
iSpan , iBucket , iInSpan = 0 , 0 , 0
indexA = spansA [ 0 ] . Offset
}
deltaIndex = indexB - indexA
for {
remainingInSpan := int32 ( spansA [ iSpan ] . Length ) - iInSpan
if deltaIndex < remainingInSpan {
// Bucket is in current span.
iBucket += int ( deltaIndex )
iInSpan += deltaIndex
2023-08-23 00:52:24 -04:00
bucketsA [ iBucket ] += bucketB
2023-08-16 10:26:31 -04:00
break
2023-11-29 12:23:34 -05:00
}
deltaIndex -= remainingInSpan
iBucket += int ( remainingInSpan )
iSpan ++
if iSpan == len ( spansA ) || deltaIndex < spansA [ iSpan ] . Offset {
// Bucket is in gap behind previous span (or there are no further spans).
bucketsA = append ( bucketsA , 0 )
copy ( bucketsA [ iBucket + 1 : ] , bucketsA [ iBucket : ] )
bucketsA [ iBucket ] = bucketB
switch {
case deltaIndex == 0 :
// Directly after previous span, extend previous span.
if iSpan < len ( spansA ) {
2023-08-16 10:26:31 -04:00
spansA [ iSpan ] . Offset --
}
2023-11-29 12:23:34 -05:00
iSpan --
iInSpan = int32 ( spansA [ iSpan ] . Length )
spansA [ iSpan ] . Length ++
goto nextLoop
case iSpan < len ( spansA ) && deltaIndex == spansA [ iSpan ] . Offset - 1 :
// Directly before next span, extend next span.
iInSpan = 0
spansA [ iSpan ] . Offset --
spansA [ iSpan ] . Length ++
goto nextLoop
default :
// No next span, or next span is not directly adjacent to new bucket.
// Add new span.
2023-08-16 10:26:31 -04:00
iInSpan = 0
2023-11-29 12:23:34 -05:00
if iSpan < len ( spansA ) {
spansA [ iSpan ] . Offset -= deltaIndex + 1
}
spansA = append ( spansA , Span { } )
copy ( spansA [ iSpan + 1 : ] , spansA [ iSpan : ] )
spansA [ iSpan ] = Span { Length : 1 , Offset : deltaIndex }
goto nextLoop
2023-08-16 10:26:31 -04:00
}
2023-11-29 12:23:34 -05:00
} else {
// Try start of next span.
deltaIndex -= spansA [ iSpan ] . Offset
iInSpan = 0
2023-08-16 10:26:31 -04:00
}
}
nextLoop :
indexA = indexB
indexB ++
bIdxB ++
}
}
return spansA , bucketsA
}
2023-10-09 09:09:46 -04:00
2026-02-07 18:52:22 -05:00
// kahanAddBuckets works like addBuckets but it is used in FloatHistogram's KahanAdd method
// and takes additional arguments, compensationBucketsA and compensationBucketsB,
// which hold the Kahan compensation values associated with histograms A and B.
// It returns the resulting spans/buckets and compensation buckets.
func kahanAddBuckets (
schema int32 , threshold float64 , negative bool ,
spansA [ ] Span , bucketsA [ ] float64 ,
spansB [ ] Span , bucketsB [ ] float64 ,
compensationBucketsA , compensationBucketsB [ ] float64 ,
) ( newSpans [ ] Span , newBucketsA , newBucketsC [ ] float64 ) {
var (
iSpan = - 1
iBucket = - 1
iInSpan int32
indexA int32
indexB int32
bIdxB int
bucketB float64
compensationBucketB float64
deltaIndex int32
lowerThanThreshold = true
)
for _ , spanB := range spansB {
indexB += spanB . Offset
for j := 0 ; j < int ( spanB . Length ) ; j ++ {
if lowerThanThreshold && IsExponentialSchema ( schema ) && getBoundExponential ( indexB , schema ) <= threshold {
goto nextLoop
}
lowerThanThreshold = false
bucketB = bucketsB [ bIdxB ]
if compensationBucketsB != nil {
compensationBucketB = compensationBucketsB [ bIdxB ]
}
if negative {
bucketB *= - 1
compensationBucketB *= - 1
}
if iSpan == - 1 {
if len ( spansA ) == 0 || spansA [ 0 ] . Offset > indexB {
// Add bucket before all others.
bucketsA = append ( bucketsA , 0 )
copy ( bucketsA [ 1 : ] , bucketsA )
bucketsA [ 0 ] = bucketB
compensationBucketsA = append ( compensationBucketsA , 0 )
copy ( compensationBucketsA [ 1 : ] , compensationBucketsA )
compensationBucketsA [ 0 ] = compensationBucketB
if len ( spansA ) > 0 && spansA [ 0 ] . Offset == indexB + 1 {
spansA [ 0 ] . Length ++
spansA [ 0 ] . Offset --
goto nextLoop
}
spansA = append ( spansA , Span { } )
copy ( spansA [ 1 : ] , spansA )
spansA [ 0 ] = Span { Offset : indexB , Length : 1 }
if len ( spansA ) > 1 {
// Convert the absolute offset in the formerly
// first span to a relative offset.
spansA [ 1 ] . Offset -= indexB + 1
}
goto nextLoop
} else if spansA [ 0 ] . Offset == indexB {
// Just add to first bucket.
bucketsA [ 0 ] , compensationBucketsA [ 0 ] = kahansum . Inc ( bucketB , bucketsA [ 0 ] , compensationBucketsA [ 0 ] )
2026-03-06 10:50:55 -05:00
if compensationBucketB != 0 {
bucketsA [ 0 ] , compensationBucketsA [ 0 ] = kahansum . Inc ( compensationBucketB , bucketsA [ 0 ] , compensationBucketsA [ 0 ] )
}
2026-02-07 18:52:22 -05:00
goto nextLoop
}
iSpan , iBucket , iInSpan = 0 , 0 , 0
indexA = spansA [ 0 ] . Offset
}
deltaIndex = indexB - indexA
for {
remainingInSpan := int32 ( spansA [ iSpan ] . Length ) - iInSpan
if deltaIndex < remainingInSpan {
// Bucket is in current span.
iBucket += int ( deltaIndex )
iInSpan += deltaIndex
bucketsA [ iBucket ] , compensationBucketsA [ iBucket ] = kahansum . Inc ( bucketB , bucketsA [ iBucket ] , compensationBucketsA [ iBucket ] )
2026-03-06 10:50:55 -05:00
if compensationBucketB != 0 {
bucketsA [ iBucket ] , compensationBucketsA [ iBucket ] = kahansum . Inc ( compensationBucketB , bucketsA [ iBucket ] , compensationBucketsA [ iBucket ] )
}
2026-02-07 18:52:22 -05:00
break
}
deltaIndex -= remainingInSpan
iBucket += int ( remainingInSpan )
iSpan ++
if iSpan == len ( spansA ) || deltaIndex < spansA [ iSpan ] . Offset {
// Bucket is in gap behind previous span (or there are no further spans).
bucketsA = append ( bucketsA , 0 )
copy ( bucketsA [ iBucket + 1 : ] , bucketsA [ iBucket : ] )
bucketsA [ iBucket ] = bucketB
compensationBucketsA = append ( compensationBucketsA , 0 )
copy ( compensationBucketsA [ iBucket + 1 : ] , compensationBucketsA [ iBucket : ] )
compensationBucketsA [ iBucket ] = compensationBucketB
switch {
case deltaIndex == 0 :
// Directly after previous span, extend previous span.
if iSpan < len ( spansA ) {
spansA [ iSpan ] . Offset --
}
iSpan --
iInSpan = int32 ( spansA [ iSpan ] . Length )
spansA [ iSpan ] . Length ++
goto nextLoop
case iSpan < len ( spansA ) && deltaIndex == spansA [ iSpan ] . Offset - 1 :
// Directly before next span, extend next span.
iInSpan = 0
spansA [ iSpan ] . Offset --
spansA [ iSpan ] . Length ++
goto nextLoop
default :
// No next span, or next span is not directly adjacent to new bucket.
// Add new span.
iInSpan = 0
if iSpan < len ( spansA ) {
spansA [ iSpan ] . Offset -= deltaIndex + 1
}
spansA = append ( spansA , Span { } )
copy ( spansA [ iSpan + 1 : ] , spansA [ iSpan : ] )
spansA [ iSpan ] = Span { Length : 1 , Offset : deltaIndex }
goto nextLoop
}
} else {
// Try start of next span.
deltaIndex -= spansA [ iSpan ] . Offset
iInSpan = 0
}
}
nextLoop :
indexA = indexB
indexB ++
bIdxB ++
}
}
return spansA , bucketsA , compensationBucketsA
}
2025-10-05 15:38:07 -04:00
// floatBucketsMatch compares bucket values of two float histograms using binary float comparison
// and returns true if all values match.
func floatBucketsMatch ( b1 , b2 [ ] float64 ) bool {
2023-10-09 09:09:46 -04:00
if len ( b1 ) != len ( b2 ) {
return false
}
for i , b := range b1 {
if math . Float64bits ( b ) != math . Float64bits ( b2 [ i ] ) {
return false
}
}
return true
}
2023-11-08 08:43:05 -05:00
2025-10-17 19:03:52 -04:00
// detectResetWithMismatchedCustomBounds checks if any bucket count has decreased when
// comparing NHCBs with mismatched custom bounds. It maps both histograms
// to the intersected bounds on-the-fly and compares values without allocating
// arrays for all mapped buckets.
// Will panic if called with histograms that are not NHCB.
func ( h * FloatHistogram ) detectResetWithMismatchedCustomBounds (
previous * FloatHistogram , currBounds , prevBounds [ ] float64 ,
) bool {
if h . Schema != CustomBucketsSchema || previous . Schema != CustomBucketsSchema {
panic ( "detectResetWithMismatchedCustomBounds called with non-NHCB schema" )
}
currIt := h . floatBucketIterator ( true , 0 , CustomBucketsSchema )
prevIt := previous . floatBucketIterator ( true , 0 , CustomBucketsSchema )
rollupSumForBound := func ( iter * floatBucketIterator , iterStarted bool , iterBucket Bucket [ float64 ] , bound float64 ) ( float64 , Bucket [ float64 ] , bool ) {
if ! iterStarted {
if ! iter . Next ( ) {
return 0 , Bucket [ float64 ] { } , false
}
iterBucket = iter . At ( )
}
var sum float64
for iterBucket . Upper <= bound {
sum += iterBucket . Count
if ! iter . Next ( ) {
return sum , Bucket [ float64 ] { } , false
}
iterBucket = iter . At ( )
}
return sum , iterBucket , true
}
var (
currBoundIdx , prevBoundIdx = 0 , 0
currBucket , prevBucket Bucket [ float64 ]
currIterStarted , currHasMore bool
prevIterStarted , prevHasMore bool
)
for currBoundIdx <= len ( currBounds ) && prevBoundIdx <= len ( prevBounds ) {
currBound := math . Inf ( 1 )
if currBoundIdx < len ( currBounds ) {
currBound = currBounds [ currBoundIdx ]
}
prevBound := math . Inf ( 1 )
if prevBoundIdx < len ( prevBounds ) {
prevBound = prevBounds [ prevBoundIdx ]
}
switch {
case currBound == prevBound :
// Check matching bound, rolling up lesser buckets that have not been accounter for yet.
currRollupSum := 0.0
if ! currIterStarted || currHasMore {
currRollupSum , currBucket , currHasMore = rollupSumForBound ( & currIt , currIterStarted , currBucket , currBound )
currIterStarted = true
}
prevRollupSum := 0.0
if ! prevIterStarted || prevHasMore {
prevRollupSum , prevBucket , prevHasMore = rollupSumForBound ( & prevIt , prevIterStarted , prevBucket , currBound )
prevIterStarted = true
}
if currRollupSum < prevRollupSum {
return true
}
currBoundIdx ++
prevBoundIdx ++
case currBound < prevBound :
currBoundIdx ++
default :
prevBoundIdx ++
}
}
return false
}
// intersectCustomBucketBounds returns the intersection of two custom bucket boundary sets.
func intersectCustomBucketBounds ( boundsA , boundsB [ ] float64 ) [ ] float64 {
if len ( boundsA ) == 0 || len ( boundsB ) == 0 {
return nil
}
var (
result [ ] float64
i , j = 0 , 0
)
for i < len ( boundsA ) && j < len ( boundsB ) {
switch {
case boundsA [ i ] == boundsB [ j ] :
if result == nil {
// Allocate a new slice because FloatHistogram.CustomValues has to be immutable.
result = make ( [ ] float64 , 0 , min ( len ( boundsA ) , len ( boundsB ) ) )
}
result = append ( result , boundsA [ i ] )
i ++
j ++
case boundsA [ i ] < boundsB [ j ] :
i ++
default :
j ++
}
}
return result
}
// addCustomBucketsWithMismatches handles adding/subtracting custom bucket histograms
// with mismatched bucket layouts by mapping both to an intersected layout.
2026-02-07 18:52:22 -05:00
// It also processes the Kahan compensation term if provided.
2025-10-17 19:03:52 -04:00
func addCustomBucketsWithMismatches (
negative bool ,
spansA [ ] Span , bucketsA , boundsA [ ] float64 ,
spansB [ ] Span , bucketsB , boundsB [ ] float64 ,
2026-02-07 18:52:22 -05:00
bucketsC [ ] float64 ,
2025-10-17 19:03:52 -04:00
intersectedBounds [ ] float64 ,
2026-02-07 18:52:22 -05:00
) ( [ ] Span , [ ] float64 , [ ] float64 ) {
2025-10-17 19:03:52 -04:00
targetBuckets := make ( [ ] float64 , len ( intersectedBounds ) + 1 )
2026-02-07 18:52:22 -05:00
cTargetBuckets := make ( [ ] float64 , len ( intersectedBounds ) + 1 )
2025-10-17 19:03:52 -04:00
2026-02-07 18:52:22 -05:00
mapBuckets := func ( spans [ ] Span , buckets , bounds [ ] float64 , negative , withCompensation bool ) {
2025-10-17 19:03:52 -04:00
srcIdx := 0
bucketIdx := 0
intersectIdx := 0
for _ , span := range spans {
srcIdx += int ( span . Offset )
for range span . Length {
if bucketIdx < len ( buckets ) {
value := buckets [ bucketIdx ]
// Find target bucket index.
targetIdx := len ( targetBuckets ) - 1 // Default to +Inf bucket.
if srcIdx < len ( bounds ) {
srcBound := bounds [ srcIdx ]
// Since both arrays are sorted, we can continue from where we left off.
for intersectIdx < len ( intersectedBounds ) {
if intersectedBounds [ intersectIdx ] >= srcBound {
targetIdx = intersectIdx
break
}
intersectIdx ++
}
}
if negative {
2026-02-07 18:52:22 -05:00
targetBuckets [ targetIdx ] , cTargetBuckets [ targetIdx ] = kahansum . Dec ( value , targetBuckets [ targetIdx ] , cTargetBuckets [ targetIdx ] )
2025-10-17 19:03:52 -04:00
} else {
2026-02-07 18:52:22 -05:00
targetBuckets [ targetIdx ] , cTargetBuckets [ targetIdx ] = kahansum . Inc ( value , targetBuckets [ targetIdx ] , cTargetBuckets [ targetIdx ] )
if withCompensation && bucketsC != nil {
targetBuckets [ targetIdx ] , cTargetBuckets [ targetIdx ] = kahansum . Inc ( bucketsC [ bucketIdx ] , targetBuckets [ targetIdx ] , cTargetBuckets [ targetIdx ] )
}
2025-10-17 19:03:52 -04:00
}
}
srcIdx ++
bucketIdx ++
}
}
}
2026-02-07 18:52:22 -05:00
// Map histograms to the intersected layout.
mapBuckets ( spansA , bucketsA , boundsA , false , true )
mapBuckets ( spansB , bucketsB , boundsB , negative , false )
2025-10-17 19:03:52 -04:00
// Build spans and buckets, excluding zero-valued buckets from the final result.
2026-02-07 18:52:22 -05:00
destSpans := spansA [ : 0 ] // Reuse spansA capacity for destSpans since we don't need it anymore.
destBuckets := targetBuckets [ : 0 ] // Reuse targetBuckets capacity for destBuckets since it's guaranteed to be large enough.
cDestBuckets := cTargetBuckets [ : 0 ] // Reuse cTargetBuckets capacity for cDestBuckets since it's guaranteed to be large enough.
2025-10-17 19:03:52 -04:00
lastIdx := int32 ( - 1 )
2026-02-07 18:52:22 -05:00
for i := range targetBuckets {
if targetBuckets [ i ] == 0 && cTargetBuckets [ i ] == 0 {
2025-10-17 19:03:52 -04:00
continue
}
2026-02-07 18:52:22 -05:00
destBuckets = append ( destBuckets , targetBuckets [ i ] )
cDestBuckets = append ( cDestBuckets , cTargetBuckets [ i ] )
2025-10-17 19:03:52 -04:00
idx := int32 ( i )
if len ( destSpans ) > 0 && idx == lastIdx + 1 {
// Consecutive bucket, extend the last span.
destSpans [ len ( destSpans ) - 1 ] . Length ++
} else {
// New span needed.
// TODO: optimize away small gaps.
offset := idx
if len ( destSpans ) > 0 {
// Convert to relative offset from the end of the last span.
prevEnd := lastIdx
offset = idx - prevEnd - 1
}
destSpans = append ( destSpans , Span {
Offset : offset ,
Length : 1 ,
} )
}
lastIdx = idx
}
2026-02-07 18:52:22 -05:00
return destSpans , destBuckets , cDestBuckets
2025-10-17 19:03:52 -04:00
}
2023-11-08 08:43:05 -05:00
// ReduceResolution reduces the float histogram's spans, buckets into target schema.
Add histogram validation in remote-read and during reducing resolution (#17561)
ReduceResolution is currently called before validation during
ingestion. This will cause a panic if there are not enough buckets in
the histogram. If there are too many buckets, the spurious buckets are
ignored, and therefore the error in the input histogram is masked.
Furthermore, invalid negative offsets might cause problems, too.
Therefore, we need to do some minimal validation in reduceResolution.
Fortunately, it is easy and shouldn't slow things down. Sadly, it
requires to return errors, which triggers a bunch of code changes.
Even here is a bright side, we can get rud of a few panics. (Remember:
Don't panic!)
In different news, we haven't done a full validation of histograms
read via remote-read. This is not so much a security concern (as you
can throw off Prometheus easily by feeding it bogus data via
remote-read) but more that remote-read sources might be makeshift and
could accidentally create invalid histograms. We really don't want to
panic in that case. So this commit does not only add a check of the
spans and buckets as needed for resolution reduction but also a full
validation during remote-read.
Signed-off-by: beorn7 <beorn@grafana.com>
2025-11-20 18:22:24 -05:00
// An error is returned in the following cases:
// - The target schema is not smaller than the current histogram's schema.
// - The histogram has custom buckets.
// - The target schema is a custom buckets schema.
// - Any spans have an invalid offset.
// - The spans are inconsistent with the number of buckets.
func ( h * FloatHistogram ) ReduceResolution ( targetSchema int32 ) error {
// Note that the follow three returns are not returning a
// histogram.Error because they are programming errors.
2024-02-28 08:06:43 -05:00
if h . UsesCustomBuckets ( ) {
Add histogram validation in remote-read and during reducing resolution (#17561)
ReduceResolution is currently called before validation during
ingestion. This will cause a panic if there are not enough buckets in
the histogram. If there are too many buckets, the spurious buckets are
ignored, and therefore the error in the input histogram is masked.
Furthermore, invalid negative offsets might cause problems, too.
Therefore, we need to do some minimal validation in reduceResolution.
Fortunately, it is easy and shouldn't slow things down. Sadly, it
requires to return errors, which triggers a bunch of code changes.
Even here is a bright side, we can get rud of a few panics. (Remember:
Don't panic!)
In different news, we haven't done a full validation of histograms
read via remote-read. This is not so much a security concern (as you
can throw off Prometheus easily by feeding it bogus data via
remote-read) but more that remote-read sources might be makeshift and
could accidentally create invalid histograms. We really don't want to
panic in that case. So this commit does not only add a check of the
spans and buckets as needed for resolution reduction but also a full
validation during remote-read.
Signed-off-by: beorn7 <beorn@grafana.com>
2025-11-20 18:22:24 -05:00
return errors . New ( "cannot reduce resolution when there are custom buckets" )
2024-02-28 08:06:43 -05:00
}
if IsCustomBucketsSchema ( targetSchema ) {
Add histogram validation in remote-read and during reducing resolution (#17561)
ReduceResolution is currently called before validation during
ingestion. This will cause a panic if there are not enough buckets in
the histogram. If there are too many buckets, the spurious buckets are
ignored, and therefore the error in the input histogram is masked.
Furthermore, invalid negative offsets might cause problems, too.
Therefore, we need to do some minimal validation in reduceResolution.
Fortunately, it is easy and shouldn't slow things down. Sadly, it
requires to return errors, which triggers a bunch of code changes.
Even here is a bright side, we can get rud of a few panics. (Remember:
Don't panic!)
In different news, we haven't done a full validation of histograms
read via remote-read. This is not so much a security concern (as you
can throw off Prometheus easily by feeding it bogus data via
remote-read) but more that remote-read sources might be makeshift and
could accidentally create invalid histograms. We really don't want to
panic in that case. So this commit does not only add a check of the
spans and buckets as needed for resolution reduction but also a full
validation during remote-read.
Signed-off-by: beorn7 <beorn@grafana.com>
2025-11-20 18:22:24 -05:00
return errors . New ( "cannot reduce resolution to custom buckets schema" )
2024-02-28 08:06:43 -05:00
}
2023-11-25 04:38:15 -05:00
if targetSchema >= h . Schema {
Add histogram validation in remote-read and during reducing resolution (#17561)
ReduceResolution is currently called before validation during
ingestion. This will cause a panic if there are not enough buckets in
the histogram. If there are too many buckets, the spurious buckets are
ignored, and therefore the error in the input histogram is masked.
Furthermore, invalid negative offsets might cause problems, too.
Therefore, we need to do some minimal validation in reduceResolution.
Fortunately, it is easy and shouldn't slow things down. Sadly, it
requires to return errors, which triggers a bunch of code changes.
Even here is a bright side, we can get rud of a few panics. (Remember:
Don't panic!)
In different news, we haven't done a full validation of histograms
read via remote-read. This is not so much a security concern (as you
can throw off Prometheus easily by feeding it bogus data via
remote-read) but more that remote-read sources might be makeshift and
could accidentally create invalid histograms. We really don't want to
panic in that case. So this commit does not only add a check of the
spans and buckets as needed for resolution reduction but also a full
validation during remote-read.
Signed-off-by: beorn7 <beorn@grafana.com>
2025-11-20 18:22:24 -05:00
return fmt . Errorf ( "cannot reduce resolution from schema %d to %d" , h . Schema , targetSchema )
2023-11-25 04:38:15 -05:00
}
Add histogram validation in remote-read and during reducing resolution (#17561)
ReduceResolution is currently called before validation during
ingestion. This will cause a panic if there are not enough buckets in
the histogram. If there are too many buckets, the spurious buckets are
ignored, and therefore the error in the input histogram is masked.
Furthermore, invalid negative offsets might cause problems, too.
Therefore, we need to do some minimal validation in reduceResolution.
Fortunately, it is easy and shouldn't slow things down. Sadly, it
requires to return errors, which triggers a bunch of code changes.
Even here is a bright side, we can get rud of a few panics. (Remember:
Don't panic!)
In different news, we haven't done a full validation of histograms
read via remote-read. This is not so much a security concern (as you
can throw off Prometheus easily by feeding it bogus data via
remote-read) but more that remote-read sources might be makeshift and
could accidentally create invalid histograms. We really don't want to
panic in that case. So this commit does not only add a check of the
spans and buckets as needed for resolution reduction but also a full
validation during remote-read.
Signed-off-by: beorn7 <beorn@grafana.com>
2025-11-20 18:22:24 -05:00
var err error
if h . PositiveSpans , h . PositiveBuckets , err = reduceResolution (
h . PositiveSpans , h . PositiveBuckets , h . Schema , targetSchema , false , true ,
) ; err != nil {
return err
}
if h . NegativeSpans , h . NegativeBuckets , err = reduceResolution (
h . NegativeSpans , h . NegativeBuckets , h . Schema , targetSchema , false , true ,
) ; err != nil {
return err
}
2023-11-26 13:12:44 -05:00
2023-11-10 08:33:34 -05:00
h . Schema = targetSchema
Add histogram validation in remote-read and during reducing resolution (#17561)
ReduceResolution is currently called before validation during
ingestion. This will cause a panic if there are not enough buckets in
the histogram. If there are too many buckets, the spurious buckets are
ignored, and therefore the error in the input histogram is masked.
Furthermore, invalid negative offsets might cause problems, too.
Therefore, we need to do some minimal validation in reduceResolution.
Fortunately, it is easy and shouldn't slow things down. Sadly, it
requires to return errors, which triggers a bunch of code changes.
Even here is a bright side, we can get rud of a few panics. (Remember:
Don't panic!)
In different news, we haven't done a full validation of histograms
read via remote-read. This is not so much a security concern (as you
can throw off Prometheus easily by feeding it bogus data via
remote-read) but more that remote-read sources might be makeshift and
could accidentally create invalid histograms. We really don't want to
panic in that case. So this commit does not only add a check of the
spans and buckets as needed for resolution reduction but also a full
validation during remote-read.
Signed-off-by: beorn7 <beorn@grafana.com>
2025-11-20 18:22:24 -05:00
return nil
2023-11-08 08:43:05 -05:00
}
2025-10-08 20:10:07 -04:00
2026-02-07 18:52:22 -05:00
// kahanReduceResolution works like reduceResolution, but it is specialized for FloatHistogram's KahanAdd method.
// Unlike reduceResolution, which supports both float and integer buckets, this function only operates on float buckets.
// It also takes an additional argument, originCompensationBuckets, representing the compensation buckets for the origin histogram.
// Modifies both the origin histogram buckets and their associated compensation buckets.
func kahanReduceResolution (
originSpans [ ] Span ,
originReceivingBuckets [ ] float64 ,
originCompensationBuckets [ ] float64 ,
originSchema ,
targetSchema int32 ,
inplace bool ,
) ( newSpans [ ] Span , newReceivingBuckets , newCompensationBuckets [ ] float64 ) {
var (
targetSpans [ ] Span // The spans in the target schema.
targetReceivingBuckets [ ] float64 // The receiving bucket counts in the target schema.
targetCompensationBuckets [ ] float64 // The compensation bucket counts in the target schema.
bucketIdx int32 // The index of bucket in the origin schema.
bucketCountIdx int // The position of a bucket in origin bucket count slice `originBuckets`.
targetBucketIdx int32 // The index of bucket in the target schema.
lastTargetBucketIdx int32 // The index of the last added target bucket.
)
if inplace {
// Slice reuse is safe because when reducing the resolution,
// target slices don't grow faster than origin slices are being read.
targetSpans = originSpans [ : 0 ]
targetReceivingBuckets = originReceivingBuckets [ : 0 ]
targetCompensationBuckets = originCompensationBuckets [ : 0 ]
}
for _ , span := range originSpans {
// Determine the index of the first bucket in this span.
bucketIdx += span . Offset
for j := 0 ; j < int ( span . Length ) ; j ++ {
// Determine the index of the bucket in the target schema from the index in the original schema.
targetBucketIdx = targetIdx ( bucketIdx , originSchema , targetSchema )
switch {
case len ( targetSpans ) == 0 :
// This is the first span in the targetSpans.
span := Span {
Offset : targetBucketIdx ,
Length : 1 ,
}
targetSpans = append ( targetSpans , span )
targetReceivingBuckets = append ( targetReceivingBuckets , originReceivingBuckets [ bucketCountIdx ] )
lastTargetBucketIdx = targetBucketIdx
targetCompensationBuckets = append ( targetCompensationBuckets , originCompensationBuckets [ bucketCountIdx ] )
case lastTargetBucketIdx == targetBucketIdx :
// The current bucket has to be merged into the same target bucket as the previous bucket.
lastBucketIdx := len ( targetReceivingBuckets ) - 1
targetReceivingBuckets [ lastBucketIdx ] , targetCompensationBuckets [ lastBucketIdx ] = kahansum . Inc (
originReceivingBuckets [ bucketCountIdx ] ,
targetReceivingBuckets [ lastBucketIdx ] ,
targetCompensationBuckets [ lastBucketIdx ] ,
)
targetReceivingBuckets [ lastBucketIdx ] , targetCompensationBuckets [ lastBucketIdx ] = kahansum . Inc (
originCompensationBuckets [ bucketCountIdx ] ,
targetReceivingBuckets [ lastBucketIdx ] ,
targetCompensationBuckets [ lastBucketIdx ] ,
)
case ( lastTargetBucketIdx + 1 ) == targetBucketIdx :
// The current bucket has to go into a new target bucket,
// and that bucket is next to the previous target bucket,
// so we add it to the current target span.
targetSpans [ len ( targetSpans ) - 1 ] . Length ++
lastTargetBucketIdx ++
targetReceivingBuckets = append ( targetReceivingBuckets , originReceivingBuckets [ bucketCountIdx ] )
targetCompensationBuckets = append ( targetCompensationBuckets , originCompensationBuckets [ bucketCountIdx ] )
case ( lastTargetBucketIdx + 1 ) < targetBucketIdx :
// The current bucket has to go into a new target bucket,
// and that bucket is separated by a gap from the previous target bucket,
// so we need to add a new target span.
span := Span {
Offset : targetBucketIdx - lastTargetBucketIdx - 1 ,
Length : 1 ,
}
targetSpans = append ( targetSpans , span )
lastTargetBucketIdx = targetBucketIdx
targetReceivingBuckets = append ( targetReceivingBuckets , originReceivingBuckets [ bucketCountIdx ] )
targetCompensationBuckets = append ( targetCompensationBuckets , originCompensationBuckets [ bucketCountIdx ] )
}
bucketIdx ++
bucketCountIdx ++
}
}
return targetSpans , targetReceivingBuckets , targetCompensationBuckets
}
// newCompensationHistogram initializes a new compensation histogram that can be used
// alongside the current FloatHistogram in Kahan summation.
// The compensation histogram is structured to match the receiving histogram's bucket layout
// including its schema, zero threshold and custom values, and it shares spans with the receiving
// histogram. However, the bucket values in the compensation histogram are initialized to zero.
func ( h * FloatHistogram ) newCompensationHistogram ( ) * FloatHistogram {
c := & FloatHistogram {
CounterResetHint : h . CounterResetHint ,
Schema : h . Schema ,
ZeroThreshold : h . ZeroThreshold ,
CustomValues : h . CustomValues ,
PositiveBuckets : make ( [ ] float64 , len ( h . PositiveBuckets ) ) ,
PositiveSpans : h . PositiveSpans ,
NegativeSpans : h . NegativeSpans ,
}
if ! h . UsesCustomBuckets ( ) {
c . NegativeBuckets = make ( [ ] float64 , len ( h . NegativeBuckets ) )
}
return c
}
2025-10-08 20:10:07 -04:00
// checkSchemaAndBounds checks if two histograms are compatible because they
2025-10-17 19:03:52 -04:00
// both use a standard exponential schema or because they both are NHCBs.
2025-10-08 20:10:07 -04:00
func ( h * FloatHistogram ) checkSchemaAndBounds ( other * FloatHistogram ) error {
if h . UsesCustomBuckets ( ) != other . UsesCustomBuckets ( ) {
return ErrHistogramsIncompatibleSchema
}
return nil
}
// adjustCounterReset is used for addition and subtraction. Those operation are
// usually only performed between gauge histograms, but if one or both are
// counters, we try to at least set the counter reset hint to something
// meaningful (see code comments below). The return counterResetCollision is
// true if one histogram has a counter reset hint of CounterReset and the other
// NotCounterReset. All other combinations are not considered a collision.
func ( h * FloatHistogram ) adjustCounterReset ( other * FloatHistogram ) ( counterResetCollision bool ) {
switch {
case other . CounterResetHint == h . CounterResetHint :
// Adding apples to apples, all good. No need to change anything.
case h . CounterResetHint == GaugeType :
// Adding something else to a gauge. That's probably OK. Outcome is a gauge.
// Nothing to do since the receiver is already marked as gauge.
case other . CounterResetHint == GaugeType :
// Similar to before, but this time the receiver is "something else" and we have to change it to gauge.
h . CounterResetHint = GaugeType
case h . CounterResetHint == UnknownCounterReset :
// With the receiver's CounterResetHint being "unknown", this could still be legitimate
// if the caller knows what they are doing. Outcome is then again "unknown".
// No need to do anything since the receiver's CounterResetHint is already "unknown".
case other . CounterResetHint == UnknownCounterReset :
// Similar to before, but now we have to set the receiver's CounterResetHint to "unknown".
h . CounterResetHint = UnknownCounterReset
default :
// All other cases shouldn't actually happen.
// They are a direct collision of CounterReset and NotCounterReset.
// Conservatively set the CounterResetHint to "unknown" and issue a warning.
h . CounterResetHint = UnknownCounterReset
return true
}
return false
}
2026-02-07 18:52:22 -05:00
// HasOverflow reports whether any of the FloatHistogram's fields contain an infinite value.
// This can happen when aggregating multiple histograms and exceeding float64 capacity.
func ( h * FloatHistogram ) HasOverflow ( ) bool {
if math . IsInf ( h . ZeroCount , 0 ) || math . IsInf ( h . Count , 0 ) || math . IsInf ( h . Sum , 0 ) {
return true
}
for _ , v := range h . PositiveBuckets {
if math . IsInf ( v , 0 ) {
return true
}
}
for _ , v := range h . NegativeBuckets {
if math . IsInf ( v , 0 ) {
return true
}
}
for _ , v := range h . CustomValues {
if math . IsInf ( v , 0 ) {
return true
}
}
return false
}
2026-02-25 03:10:42 -05:00
// TrimBuckets trims native histogram buckets.
func ( h * FloatHistogram ) TrimBuckets ( rhs float64 , isUpperTrim bool ) * FloatHistogram {
var (
trimmedHist = h . Copy ( )
updatedCount , updatedSum float64
trimmedBuckets bool
isCustomBucket = trimmedHist . UsesCustomBuckets ( )
hasPositive , hasNegative bool
)
if isUpperTrim {
// Calculate the fraction to keep for buckets that contain the trim value.
// For TRIM_UPPER, we keep observations below the trim point (rhs).
// Example: histogram </ float.
for i , iter := 0 , trimmedHist . PositiveBucketIterator ( ) ; iter . Next ( ) ; i ++ {
bucket := iter . At ( )
if bucket . Count == 0 {
continue
}
hasPositive = true
switch {
case bucket . Upper <= rhs :
// Bucket is entirely below the trim point - keep all.
updatedCount += bucket . Count
bucketMidpoint := computeMidpoint ( bucket . Lower , bucket . Upper , true , isCustomBucket )
updatedSum += bucketMidpoint * bucket . Count
case bucket . Lower < rhs :
// Bucket contains the trim point - interpolate.
keepCount , bucketMidpoint := computeBucketTrim ( bucket , rhs , isUpperTrim , true , isCustomBucket )
updatedCount += keepCount
updatedSum += bucketMidpoint * keepCount
if trimmedHist . PositiveBuckets [ i ] != keepCount {
trimmedHist . PositiveBuckets [ i ] = keepCount
trimmedBuckets = true
}
default :
// Bucket is entirely above the trim point - discard.
trimmedHist . PositiveBuckets [ i ] = 0
trimmedBuckets = true
}
}
for i , iter := 0 , trimmedHist . NegativeBucketIterator ( ) ; iter . Next ( ) ; i ++ {
bucket := iter . At ( )
if bucket . Count == 0 {
continue
}
hasNegative = true
switch {
case bucket . Upper <= rhs :
// Bucket is entirely below the trim point - keep all.
updatedCount += bucket . Count
bucketMidpoint := computeMidpoint ( bucket . Lower , bucket . Upper , false , isCustomBucket )
updatedSum += bucketMidpoint * bucket . Count
case bucket . Lower < rhs :
// Bucket contains the trim point - interpolate.
keepCount , bucketMidpoint := computeBucketTrim ( bucket , rhs , isUpperTrim , false , isCustomBucket )
updatedCount += keepCount
updatedSum += bucketMidpoint * keepCount
if trimmedHist . NegativeBuckets [ i ] != keepCount {
trimmedHist . NegativeBuckets [ i ] = keepCount
trimmedBuckets = true
}
default :
trimmedHist . NegativeBuckets [ i ] = 0
trimmedBuckets = true
}
}
} else { // !isUpperTrim
// For TRIM_LOWER, we keep observations above the trim point (rhs).
// Example: histogram >/ float.
for i , iter := 0 , trimmedHist . PositiveBucketIterator ( ) ; iter . Next ( ) ; i ++ {
bucket := iter . At ( )
if bucket . Count == 0 {
continue
}
hasPositive = true
switch {
case bucket . Lower >= rhs :
// Bucket is entirely below the trim point - keep all.
updatedCount += bucket . Count
bucketMidpoint := computeMidpoint ( bucket . Lower , bucket . Upper , true , isCustomBucket )
updatedSum += bucketMidpoint * bucket . Count
case bucket . Upper > rhs :
// Bucket contains the trim point - interpolate.
keepCount , bucketMidpoint := computeBucketTrim ( bucket , rhs , isUpperTrim , true , isCustomBucket )
updatedCount += keepCount
updatedSum += bucketMidpoint * keepCount
if trimmedHist . PositiveBuckets [ i ] != keepCount {
trimmedHist . PositiveBuckets [ i ] = keepCount
trimmedBuckets = true
}
default :
trimmedHist . PositiveBuckets [ i ] = 0
trimmedBuckets = true
}
}
for i , iter := 0 , trimmedHist . NegativeBucketIterator ( ) ; iter . Next ( ) ; i ++ {
bucket := iter . At ( )
if bucket . Count == 0 {
continue
}
hasNegative = true
switch {
case bucket . Lower >= rhs :
// Bucket is entirely below the trim point - keep all.
updatedCount += bucket . Count
bucketMidpoint := computeMidpoint ( bucket . Lower , bucket . Upper , false , isCustomBucket )
updatedSum += bucketMidpoint * bucket . Count
case bucket . Upper > rhs :
// Bucket contains the trim point - interpolate.
keepCount , bucketMidpoint := computeBucketTrim ( bucket , rhs , isUpperTrim , false , isCustomBucket )
updatedCount += keepCount
updatedSum += bucketMidpoint * keepCount
if trimmedHist . NegativeBuckets [ i ] != keepCount {
trimmedHist . NegativeBuckets [ i ] = keepCount
trimmedBuckets = true
}
default :
trimmedHist . NegativeBuckets [ i ] = 0
trimmedBuckets = true
}
}
}
// Handle the zero count bucket.
if trimmedHist . ZeroCount > 0 {
keepCount , bucketMidpoint := computeZeroBucketTrim ( trimmedHist . ZeroBucket ( ) , rhs , hasNegative , hasPositive , isUpperTrim )
if trimmedHist . ZeroCount != keepCount {
trimmedHist . ZeroCount = keepCount
trimmedBuckets = true
}
updatedSum += bucketMidpoint * keepCount
updatedCount += keepCount
}
if trimmedBuckets {
// Only update the totals in case some bucket(s) were fully (or partially) trimmed.
trimmedHist . Count = updatedCount
trimmedHist . Sum = updatedSum
trimmedHist . Compact ( 0 )
}
return trimmedHist
}
func handleInfinityBuckets ( isUpperTrim bool , b Bucket [ float64 ] , rhs float64 ) ( underCount , bucketMidpoint float64 ) {
zeroIfInf := func ( x float64 ) float64 {
if math . IsInf ( x , 0 ) {
return 0
}
return x
}
// Case 1: Bucket with lower bound -Inf.
if math . IsInf ( b . Lower , - 1 ) {
// TRIM_UPPER (</) - remove values greater than rhs
if isUpperTrim {
if rhs >= b . Upper {
// As the rhs is greater than the upper bound, we keep the entire current bucket.
return b . Count , 0
}
if rhs > 0 && b . Upper > 0 && ! math . IsInf ( b . Upper , 1 ) {
// If upper is finite and positive, we treat lower as 0 (despite it de facto being -Inf).
// This is only possible with NHCB, so we can always use linear interpolation.
return b . Count * rhs / b . Upper , rhs / 2
}
if b . Upper <= 0 {
return b . Count , rhs
}
// Otherwise, we are targeting a valid trim, but as we don't know the exact distribution of values that belongs to an infinite bucket, we need to remove the entire bucket.
return 0 , zeroIfInf ( b . Upper )
}
// TRIM_LOWER (>/) - remove values less than rhs
if rhs <= b . Lower {
// Impossible to happen because the lower bound is -Inf. Returning the entire current bucket.
return b . Count , 0
}
if rhs >= 0 && b . Upper > rhs && ! math . IsInf ( b . Upper , 1 ) {
// If upper is finite and positive, we treat lower as 0 (despite it de facto being -Inf).
// This is only possible with NHCB, so we can always use linear interpolation.
return b . Count * ( 1 - rhs / b . Upper ) , ( rhs + b . Upper ) / 2
}
// Otherwise, we are targeting a valid trim, but as we don't know the exact distribution of values that belongs to an infinite bucket, we need to remove the entire bucket.
return 0 , zeroIfInf ( b . Upper )
}
// Case 2: Bucket with upper bound +Inf.
if math . IsInf ( b . Upper , 1 ) {
if isUpperTrim {
// TRIM_UPPER (</) - remove values greater than rhs.
// We don't care about lower here, because:
// when rhs >= lower and the bucket extends to +Inf, some values in this bucket could be > rhs, so we conservatively remove the entire bucket;
// when rhs < lower, all values in this bucket are >= lower > rhs, so all values should be removed.
return 0 , zeroIfInf ( b . Lower )
}
// TRIM_LOWER (>/) - remove values less than rhs.
if rhs >= b . Lower {
return b . Count , rhs
}
// lower < rhs: we are inside the infinity bucket, but as we don't know the exact distribution of values, we conservatively remove the entire bucket.
return 0 , zeroIfInf ( b . Lower )
}
panic ( fmt . Errorf ( "one of the bounds must be infinite for handleInfinityBuckets, got %v" , b ) )
}
// computeSplit calculates the portion of the bucket's count <= rhs (trim point).
func computeSplit ( b Bucket [ float64 ] , rhs float64 , isPositive , isLinear bool ) float64 {
if rhs <= b . Lower {
return 0
}
if rhs >= b . Upper {
return b . Count
}
var fraction float64
switch {
case isLinear :
fraction = ( rhs - b . Lower ) / ( b . Upper - b . Lower )
default :
// Exponential interpolation.
logLower := math . Log2 ( math . Abs ( b . Lower ) )
logUpper := math . Log2 ( math . Abs ( b . Upper ) )
logV := math . Log2 ( math . Abs ( rhs ) )
if isPositive {
fraction = ( logV - logLower ) / ( logUpper - logLower )
} else {
fraction = 1 - ( ( logV - logUpper ) / ( logLower - logUpper ) )
}
}
return b . Count * fraction
}
func computeZeroBucketTrim ( zeroBucket Bucket [ float64 ] , rhs float64 , hasNegative , hasPositive , isUpperTrim bool ) ( float64 , float64 ) {
var (
lower = zeroBucket . Lower
upper = zeroBucket . Upper
)
if hasNegative && ! hasPositive {
upper = 0
}
if hasPositive && ! hasNegative {
lower = 0
}
var fraction , midpoint float64
if isUpperTrim {
if rhs <= lower {
return 0 , 0
}
if rhs >= upper {
return zeroBucket . Count , ( lower + upper ) / 2
}
fraction = ( rhs - lower ) / ( upper - lower )
midpoint = ( lower + rhs ) / 2
} else { // lower trim
if rhs <= lower {
return zeroBucket . Count , ( lower + upper ) / 2
}
if rhs >= upper {
return 0 , 0
}
fraction = ( upper - rhs ) / ( upper - lower )
midpoint = ( rhs + upper ) / 2
}
return zeroBucket . Count * fraction , midpoint
}
func computeBucketTrim ( b Bucket [ float64 ] , rhs float64 , isUpperTrim , isPositive , isCustomBucket bool ) ( float64 , float64 ) {
if math . IsInf ( b . Lower , - 1 ) || math . IsInf ( b . Upper , 1 ) {
return handleInfinityBuckets ( isUpperTrim , b , rhs )
}
underCount := computeSplit ( b , rhs , isPositive , isCustomBucket )
if isUpperTrim {
return underCount , computeMidpoint ( b . Lower , rhs , isPositive , isCustomBucket )
}
return b . Count - underCount , computeMidpoint ( rhs , b . Upper , isPositive , isCustomBucket )
}
func computeMidpoint ( survivingIntervalLowerBound , survivingIntervalUpperBound float64 , isPositive , isLinear bool ) float64 {
if math . IsInf ( survivingIntervalLowerBound , 0 ) {
if math . IsInf ( survivingIntervalUpperBound , 0 ) {
return 0
}
if survivingIntervalUpperBound > 0 {
return survivingIntervalUpperBound / 2
}
return survivingIntervalUpperBound
} else if math . IsInf ( survivingIntervalUpperBound , 0 ) {
return survivingIntervalLowerBound
}
if isLinear {
return ( survivingIntervalLowerBound + survivingIntervalUpperBound ) / 2
}
geoMean := math . Sqrt ( math . Abs ( survivingIntervalLowerBound * survivingIntervalUpperBound ) )
if isPositive {
return geoMean
}
return - geoMean
}