Add histogram validation in remote-read and during reducing resolution (#17561)
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (push) Waiting to run
CI / Build Prometheus for all architectures (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run

ReduceResolution is currently called before validation during
ingestion. This will cause a panic if there are not enough buckets in
the histogram. If there are too many buckets, the spurious buckets are
ignored, and therefore the error in the input histogram is masked.

Furthermore, invalid negative offsets might cause problems, too.

Therefore, we need to do some minimal validation in reduceResolution.
Fortunately, it is easy and shouldn't slow things down. Sadly, it
requires to return errors, which triggers a bunch of code changes.
Even here is a bright side, we can get rud of a few panics. (Remember:
Don't panic!)

In different news, we haven't done a full validation of histograms
read via remote-read. This is not so much a security concern (as you
can throw off Prometheus easily by feeding it bogus data via
remote-read) but more that remote-read sources might be makeshift and
could accidentally create invalid histograms. We really don't want to
panic in that case. So this commit does not only add a check of the
spans and buckets as needed for resolution reduction but also a full
validation during remote-read.

Signed-off-by: beorn7 <beorn@grafana.com>
This commit is contained in:
Björn Rabenstein 2025-11-21 00:22:24 +01:00 committed by GitHub
parent fc27eef43f
commit b8d19543b8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 395 additions and 115 deletions

View file

@ -164,8 +164,8 @@ func (h *FloatHistogram) CopyToSchema(targetSchema int32) *FloatHistogram {
Sum: h.Sum,
}
c.PositiveSpans, c.PositiveBuckets = reduceResolution(h.PositiveSpans, h.PositiveBuckets, h.Schema, targetSchema, false, false)
c.NegativeSpans, c.NegativeBuckets = reduceResolution(h.NegativeSpans, h.NegativeBuckets, h.Schema, targetSchema, false, false)
c.PositiveSpans, c.PositiveBuckets = mustReduceResolution(h.PositiveSpans, h.PositiveBuckets, h.Schema, targetSchema, false, false)
c.NegativeSpans, c.NegativeBuckets = mustReduceResolution(h.NegativeSpans, h.NegativeBuckets, h.Schema, targetSchema, false, false)
return &c
}
@ -393,13 +393,13 @@ func (h *FloatHistogram) Add(other *FloatHistogram) (res *FloatHistogram, counte
switch {
case other.Schema < h.Schema:
hPositiveSpans, hPositiveBuckets = reduceResolution(hPositiveSpans, hPositiveBuckets, h.Schema, other.Schema, false, true)
hNegativeSpans, hNegativeBuckets = reduceResolution(hNegativeSpans, hNegativeBuckets, h.Schema, other.Schema, false, true)
hPositiveSpans, hPositiveBuckets = mustReduceResolution(hPositiveSpans, hPositiveBuckets, h.Schema, other.Schema, false, true)
hNegativeSpans, hNegativeBuckets = mustReduceResolution(hNegativeSpans, hNegativeBuckets, h.Schema, other.Schema, false, true)
h.Schema = other.Schema
case other.Schema > h.Schema:
otherPositiveSpans, otherPositiveBuckets = reduceResolution(otherPositiveSpans, otherPositiveBuckets, other.Schema, h.Schema, false, false)
otherNegativeSpans, otherNegativeBuckets = reduceResolution(otherNegativeSpans, otherNegativeBuckets, other.Schema, h.Schema, false, false)
otherPositiveSpans, otherPositiveBuckets = mustReduceResolution(otherPositiveSpans, otherPositiveBuckets, other.Schema, h.Schema, false, false)
otherNegativeSpans, otherNegativeBuckets = mustReduceResolution(otherNegativeSpans, otherNegativeBuckets, other.Schema, h.Schema, false, false)
}
h.PositiveSpans, h.PositiveBuckets = addBuckets(h.Schema, h.ZeroThreshold, false, hPositiveSpans, hPositiveBuckets, otherPositiveSpans, otherPositiveBuckets)
@ -459,12 +459,12 @@ func (h *FloatHistogram) Sub(other *FloatHistogram) (res *FloatHistogram, counte
switch {
case other.Schema < h.Schema:
hPositiveSpans, hPositiveBuckets = reduceResolution(hPositiveSpans, hPositiveBuckets, h.Schema, other.Schema, false, true)
hNegativeSpans, hNegativeBuckets = reduceResolution(hNegativeSpans, hNegativeBuckets, h.Schema, other.Schema, false, true)
hPositiveSpans, hPositiveBuckets = mustReduceResolution(hPositiveSpans, hPositiveBuckets, h.Schema, other.Schema, false, true)
hNegativeSpans, hNegativeBuckets = mustReduceResolution(hNegativeSpans, hNegativeBuckets, h.Schema, other.Schema, false, true)
h.Schema = other.Schema
case other.Schema > h.Schema:
otherPositiveSpans, otherPositiveBuckets = reduceResolution(otherPositiveSpans, otherPositiveBuckets, other.Schema, h.Schema, false, false)
otherNegativeSpans, otherNegativeBuckets = reduceResolution(otherNegativeSpans, otherNegativeBuckets, other.Schema, h.Schema, false, false)
otherPositiveSpans, otherPositiveBuckets = mustReduceResolution(otherPositiveSpans, otherPositiveBuckets, other.Schema, h.Schema, false, false)
otherNegativeSpans, otherNegativeBuckets = mustReduceResolution(otherNegativeSpans, otherNegativeBuckets, other.Schema, h.Schema, false, false)
}
h.PositiveSpans, h.PositiveBuckets = addBuckets(h.Schema, h.ZeroThreshold, true, hPositiveSpans, hPositiveBuckets, otherPositiveSpans, otherPositiveBuckets)
@ -1582,25 +1582,40 @@ func addCustomBucketsWithMismatches(
}
// ReduceResolution reduces the float histogram's spans, buckets into target schema.
// The target schema must be smaller than the current float histogram's schema.
// This will panic if the histogram has custom buckets or if the target schema is
// a custom buckets schema.
func (h *FloatHistogram) ReduceResolution(targetSchema int32) *FloatHistogram {
// An error is returned in the following cases:
// - The target schema is not smaller than the current histogram's schema.
// - The histogram has custom buckets.
// - The target schema is a custom buckets schema.
// - Any spans have an invalid offset.
// - The spans are inconsistent with the number of buckets.
func (h *FloatHistogram) ReduceResolution(targetSchema int32) error {
// Note that the follow three returns are not returning a
// histogram.Error because they are programming errors.
if h.UsesCustomBuckets() {
panic("cannot reduce resolution when there are custom buckets")
return errors.New("cannot reduce resolution when there are custom buckets")
}
if IsCustomBucketsSchema(targetSchema) {
panic("cannot reduce resolution to custom buckets schema")
return errors.New("cannot reduce resolution to custom buckets schema")
}
if targetSchema >= h.Schema {
panic(fmt.Errorf("cannot reduce resolution from schema %d to %d", h.Schema, targetSchema))
return fmt.Errorf("cannot reduce resolution from schema %d to %d", h.Schema, targetSchema)
}
h.PositiveSpans, h.PositiveBuckets = reduceResolution(h.PositiveSpans, h.PositiveBuckets, h.Schema, targetSchema, false, true)
h.NegativeSpans, h.NegativeBuckets = reduceResolution(h.NegativeSpans, h.NegativeBuckets, h.Schema, targetSchema, false, true)
var err error
if h.PositiveSpans, h.PositiveBuckets, err = reduceResolution(
h.PositiveSpans, h.PositiveBuckets, h.Schema, targetSchema, false, true,
); err != nil {
return err
}
if h.NegativeSpans, h.NegativeBuckets, err = reduceResolution(
h.NegativeSpans, h.NegativeBuckets, h.Schema, targetSchema, false, true,
); err != nil {
return err
}
h.Schema = targetSchema
return h
return nil
}
// checkSchemaAndBounds checks if two histograms are compatible because they

View file

@ -4141,14 +4141,16 @@ func createRandomSpans(rng *rand.Rand, spanNum int32) ([]Span, []float64) {
func TestFloatHistogramReduceResolution(t *testing.T) {
tcs := map[string]struct {
origin *FloatHistogram
target *FloatHistogram
origin *FloatHistogram
targetSchema int32
target *FloatHistogram
errorMsg string
}{
"valid float histogram": {
origin: &FloatHistogram{
Schema: 0,
PositiveSpans: []Span{
{Offset: 0, Length: 4},
{Offset: -2, Length: 4},
{Offset: 0, Length: 0},
{Offset: 3, Length: 2},
},
@ -4160,10 +4162,11 @@ func TestFloatHistogramReduceResolution(t *testing.T) {
},
NegativeBuckets: []float64{1, 3, 1, 2, 1, 1},
},
targetSchema: -1,
target: &FloatHistogram{
Schema: -1,
PositiveSpans: []Span{
{Offset: 0, Length: 3},
{Offset: -1, Length: 3},
{Offset: 1, Length: 1},
},
PositiveBuckets: []float64{1, 4, 2, 2},
@ -4174,12 +4177,58 @@ func TestFloatHistogramReduceResolution(t *testing.T) {
NegativeBuckets: []float64{1, 4, 2, 2},
},
},
"not enough buckets": {
origin: &FloatHistogram{
Schema: 0,
PositiveSpans: []Span{
{Offset: -2, Length: 4},
{Offset: 0, Length: 0},
{Offset: 3, Length: 2},
},
PositiveBuckets: []float64{1, 3, 1, 2, 1},
},
targetSchema: -1,
errorMsg: "have 5 buckets but spans need more: histogram spans specify different number of buckets than provided",
},
"too many buckets": {
origin: &FloatHistogram{
Schema: 0,
PositiveSpans: []Span{
{Offset: -2, Length: 4},
{Offset: 0, Length: 0},
{Offset: 3, Length: 2},
},
PositiveBuckets: []float64{1, 3, 1, 2, 1, 1, 5},
},
targetSchema: -1,
errorMsg: "spans need 6 buckets, have 7 buckets: histogram spans specify different number of buckets than provided",
},
"negative offset": {
origin: &FloatHistogram{
Schema: 0,
PositiveSpans: []Span{
{Offset: -2, Length: 4},
{Offset: -1, Length: 0},
{Offset: 3, Length: 2},
},
PositiveBuckets: []float64{1, 3, 1, 2, 1, 1},
},
targetSchema: -1,
errorMsg: "span number 2 with offset -1: histogram has a span whose offset is negative",
},
}
for _, tc := range tcs {
target := tc.origin.ReduceResolution(tc.target.Schema)
require.Equal(t, tc.target, target)
// Check that receiver histogram was mutated:
require.Equal(t, tc.target, tc.origin)
for tn, tc := range tcs {
t.Run(tn, func(t *testing.T) {
err := tc.origin.ReduceResolution(tc.targetSchema)
if tc.errorMsg != "" {
require.Equal(t, tc.errorMsg, err.Error())
// The returned error should be a histogram.Error.
require.ErrorAs(t, err, &Error{})
return
}
require.NoError(t, err)
require.Equal(t, tc.target, tc.origin)
})
}
}

View file

@ -738,6 +738,8 @@ var exponentialBounds = [][]float64{
// deltas. Set it to false if the buckets contain absolute counts.
// Set inplace to true to reuse input slices and avoid allocations (otherwise
// new slices will be allocated for result).
// The functions returns an error if there are too many or too few buckets for the spans
// or if any span except the first has a negative offset.
func reduceResolution[IBC InternalBucketCount](
originSpans []Span,
originBuckets []IBC,
@ -745,7 +747,7 @@ func reduceResolution[IBC InternalBucketCount](
targetSchema int32,
deltaBuckets bool,
inplace bool,
) ([]Span, []IBC) {
) ([]Span, []IBC, error) {
var (
targetSpans []Span // The spans in the target schema.
targetBuckets []IBC // The bucket counts in the target schema.
@ -764,10 +766,18 @@ func reduceResolution[IBC InternalBucketCount](
targetBuckets = originBuckets[:0]
}
for _, span := range originSpans {
for n, span := range originSpans {
if n > 0 && span.Offset < 0 {
return nil, nil, fmt.Errorf("span number %d with offset %d: %w", n+1, span.Offset, ErrHistogramSpanNegativeOffset)
}
// Determine the index of the first bucket in this span.
bucketIdx += span.Offset
for j := 0; j < int(span.Length); j++ {
// Protect against too few buckets in the origin.
if bucketCountIdx >= len(originBuckets) {
return nil, nil, fmt.Errorf("have %d buckets but spans need more: %w", len(originBuckets), ErrHistogramSpansBucketsMismatch)
}
// Determine the index of the bucket in the target schema from the index in the original schema.
targetBucketIdx = targetIdx(bucketIdx, originSchema, targetSchema)
@ -826,12 +836,33 @@ func reduceResolution[IBC InternalBucketCount](
targetBuckets = append(targetBuckets, originBuckets[bucketCountIdx])
}
}
bucketIdx++
bucketCountIdx++
}
}
if bucketCountIdx != len(originBuckets) {
return nil, nil, fmt.Errorf("spans need %d buckets, have %d buckets: %w", bucketCountIdx, len(originBuckets), ErrHistogramSpansBucketsMismatch)
}
return targetSpans, targetBuckets, nil
}
// mustReduceResolution works like reduceResolution, but panics instead of
// returning an error. Use mustReduceResolution if you are sure that the spans
// and buckets are valid.
func mustReduceResolution[IBC InternalBucketCount](
originSpans []Span,
originBuckets []IBC,
originSchema,
targetSchema int32,
deltaBuckets bool,
inplace bool,
) ([]Span, []IBC) {
targetSpans, targetBuckets, err := reduceResolution(
originSpans, originBuckets, originSchema, targetSchema, deltaBuckets, inplace,
)
if err != nil {
panic(err)
}
return targetSpans, targetBuckets
}

View file

@ -142,7 +142,7 @@ func TestReduceResolutionHistogram(t *testing.T) {
for _, tc := range cases {
spansCopy, bucketsCopy := slices.Clone(tc.spans), slices.Clone(tc.buckets)
spans, buckets := reduceResolution(tc.spans, tc.buckets, tc.schema, tc.targetSchema, true, false)
spans, buckets := mustReduceResolution(tc.spans, tc.buckets, tc.schema, tc.targetSchema, true, false)
require.Equal(t, tc.expectedSpans, spans)
require.Equal(t, tc.expectedBuckets, buckets)
// Verify inputs were not mutated:
@ -151,7 +151,7 @@ func TestReduceResolutionHistogram(t *testing.T) {
// Output slices reuse input slices:
const inplace = true
spans, buckets = reduceResolution(tc.spans, tc.buckets, tc.schema, tc.targetSchema, true, inplace)
spans, buckets = mustReduceResolution(tc.spans, tc.buckets, tc.schema, tc.targetSchema, true, inplace)
require.Equal(t, tc.expectedSpans, spans)
require.Equal(t, tc.expectedBuckets, buckets)
// Verify inputs were mutated which is now expected:
@ -190,7 +190,7 @@ func TestReduceResolutionFloatHistogram(t *testing.T) {
for _, tc := range cases {
spansCopy, bucketsCopy := slices.Clone(tc.spans), slices.Clone(tc.buckets)
spans, buckets := reduceResolution(tc.spans, tc.buckets, tc.schema, tc.targetSchema, false, false)
spans, buckets := mustReduceResolution(tc.spans, tc.buckets, tc.schema, tc.targetSchema, false, false)
require.Equal(t, tc.expectedSpans, spans)
require.Equal(t, tc.expectedBuckets, buckets)
// Verify inputs were not mutated:
@ -199,7 +199,7 @@ func TestReduceResolutionFloatHistogram(t *testing.T) {
// Output slices reuse input slices:
const inplace = true
spans, buckets = reduceResolution(tc.spans, tc.buckets, tc.schema, tc.targetSchema, false, inplace)
spans, buckets = mustReduceResolution(tc.spans, tc.buckets, tc.schema, tc.targetSchema, false, inplace)
require.Equal(t, tc.expectedSpans, spans)
require.Equal(t, tc.expectedBuckets, buckets)
// Verify inputs were mutated which is now expected:

View file

@ -14,6 +14,7 @@
package histogram
import (
"errors"
"fmt"
"math"
"slices"
@ -617,26 +618,37 @@ func (c *cumulativeBucketIterator) At() Bucket[uint64] {
}
// ReduceResolution reduces the histogram's spans, buckets into target schema.
// The target schema must be smaller than the current histogram's schema.
// This will panic if the histogram has custom buckets or if the target schema is
// a custom buckets schema.
func (h *Histogram) ReduceResolution(targetSchema int32) *Histogram {
// An error is returned in the following cases:
// - The target schema is not smaller than the current histogram's schema.
// - The histogram has custom buckets.
// - The target schema is a custom buckets schema.
// - Any spans have an invalid offset.
// - The spans are inconsistent with the number of buckets.
func (h *Histogram) ReduceResolution(targetSchema int32) error {
// Note that the follow three returns are not returning a
// histogram.Error because they are programming errors.
if h.UsesCustomBuckets() {
panic("cannot reduce resolution when there are custom buckets")
return errors.New("cannot reduce resolution when there are custom buckets")
}
if IsCustomBucketsSchema(targetSchema) {
panic("cannot reduce resolution to custom buckets schema")
return errors.New("cannot reduce resolution to custom buckets schema")
}
if targetSchema >= h.Schema {
panic(fmt.Errorf("cannot reduce resolution from schema %d to %d", h.Schema, targetSchema))
return fmt.Errorf("cannot reduce resolution from schema %d to %d", h.Schema, targetSchema)
}
h.PositiveSpans, h.PositiveBuckets = reduceResolution(
var err error
if h.PositiveSpans, h.PositiveBuckets, err = reduceResolution(
h.PositiveSpans, h.PositiveBuckets, h.Schema, targetSchema, true, true,
)
h.NegativeSpans, h.NegativeBuckets = reduceResolution(
); err != nil {
return err
}
if h.NegativeSpans, h.NegativeBuckets, err = reduceResolution(
h.NegativeSpans, h.NegativeBuckets, h.Schema, targetSchema, true, true,
)
); err != nil {
return err
}
h.Schema = targetSchema
return h
return nil
}

View file

@ -1719,14 +1719,16 @@ func BenchmarkHistogramValidation(b *testing.B) {
func TestHistogramReduceResolution(t *testing.T) {
tcs := map[string]struct {
origin *Histogram
target *Histogram
origin *Histogram
targetSchema int32
target *Histogram
errorMsg string
}{
"valid histogram": {
origin: &Histogram{
Schema: 0,
PositiveSpans: []Span{
{Offset: 0, Length: 4},
{Offset: -2, Length: 4},
{Offset: 0, Length: 0},
{Offset: 3, Length: 2},
},
@ -1738,10 +1740,11 @@ func TestHistogramReduceResolution(t *testing.T) {
},
NegativeBuckets: []int64{1, 2, -2, 1, -1, 0},
},
targetSchema: -1,
target: &Histogram{
Schema: -1,
PositiveSpans: []Span{
{Offset: 0, Length: 3},
{Offset: -1, Length: 3},
{Offset: 1, Length: 1},
},
PositiveBuckets: []int64{1, 3, -2, 0},
@ -1752,12 +1755,58 @@ func TestHistogramReduceResolution(t *testing.T) {
NegativeBuckets: []int64{1, 3, -2, 0},
},
},
"not enough buckets": {
origin: &Histogram{
Schema: 0,
PositiveSpans: []Span{
{Offset: -2, Length: 4},
{Offset: 0, Length: 0},
{Offset: 3, Length: 2},
},
PositiveBuckets: []int64{1, 2, -2, 1, -1},
},
targetSchema: -1,
errorMsg: "have 5 buckets but spans need more: histogram spans specify different number of buckets than provided",
},
"too many buckets": {
origin: &Histogram{
Schema: 0,
PositiveSpans: []Span{
{Offset: -2, Length: 4},
{Offset: 0, Length: 0},
{Offset: 3, Length: 2},
},
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 3},
},
targetSchema: -1,
errorMsg: "spans need 6 buckets, have 7 buckets: histogram spans specify different number of buckets than provided",
},
"negative offset": {
origin: &Histogram{
Schema: 0,
PositiveSpans: []Span{
{Offset: -2, Length: 4},
{Offset: -1, Length: 0},
{Offset: 3, Length: 2},
},
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0},
},
targetSchema: -1,
errorMsg: "span number 2 with offset -1: histogram has a span whose offset is negative",
},
}
for _, tc := range tcs {
target := tc.origin.ReduceResolution(tc.target.Schema)
require.Equal(t, tc.target, target)
// Check that receiver histogram was mutated:
require.Equal(t, tc.target, tc.origin)
for tn, tc := range tcs {
t.Run(tn, func(t *testing.T) {
err := tc.origin.ReduceResolution(tc.targetSchema)
if tc.errorMsg != "" {
require.Equal(t, tc.errorMsg, err.Error())
// The returned error should be a histogram.Error.
require.ErrorAs(t, err, &Error{})
return
}
require.NoError(t, err)
require.Equal(t, tc.target, tc.origin)
})
}
}

View file

@ -352,7 +352,7 @@ func (p *NHCBParser) swapExemplars() {
}
// processNHCB converts the collated classic histogram series to NHCB and caches the info
// to be returned to callers. Retruns true if the conversion was successful.
// to be returned to callers. Returns true if the conversion was successful.
func (p *NHCBParser) processNHCB() bool {
if p.state != stateCollecting {
return false

View file

@ -389,6 +389,7 @@ type bucketLimitAppender struct {
}
func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
var err error
if h != nil {
// Return with an early error if the histogram has too many buckets and the
// schema is not exponential, in which case we can't reduce the resolution.
@ -399,7 +400,9 @@ func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labe
if h.Schema <= histogram.ExponentialSchemaMin {
return 0, errBucketLimit
}
h = h.ReduceResolution(h.Schema - 1)
if err = h.ReduceResolution(h.Schema - 1); err != nil {
return 0, err
}
}
}
if fh != nil {
@ -412,11 +415,12 @@ func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labe
if fh.Schema <= histogram.ExponentialSchemaMin {
return 0, errBucketLimit
}
fh = fh.ReduceResolution(fh.Schema - 1)
if err = fh.ReduceResolution(fh.Schema - 1); err != nil {
return 0, err
}
}
}
ref, err := app.Appender.AppendHistogram(ref, lset, t, h, fh)
if err != nil {
if ref, err = app.Appender.AppendHistogram(ref, lset, t, h, fh); err != nil {
return 0, err
}
return ref, nil
@ -429,18 +433,22 @@ type maxSchemaAppender struct {
}
func (app *maxSchemaAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
var err error
if h != nil {
if histogram.IsExponentialSchemaReserved(h.Schema) && h.Schema > app.maxSchema {
h = h.ReduceResolution(app.maxSchema)
if err = h.ReduceResolution(app.maxSchema); err != nil {
return 0, err
}
}
}
if fh != nil {
if histogram.IsExponentialSchemaReserved(fh.Schema) && fh.Schema > app.maxSchema {
fh = fh.ReduceResolution(app.maxSchema)
if err = fh.ReduceResolution(app.maxSchema); err != nil {
return 0, err
}
}
}
ref, err := app.Appender.AppendHistogram(ref, lset, t, h, fh)
if err != nil {
if ref, err = app.Appender.AppendHistogram(ref, lset, t, h, fh); err != nil {
return 0, err
}
return ref, nil

View file

@ -389,6 +389,11 @@ type concreteSeriesIterator struct {
curValType chunkenc.ValueType
series *concreteSeries
err error
// These are pre-filled with the current model histogram if curValType
// is ValHistogram or ValFloatHistogram, respectively.
curH *histogram.Histogram
curFH *histogram.FloatHistogram
}
func newConcreteSeriesIterator(series *concreteSeries) chunkenc.Iterator {
@ -461,9 +466,7 @@ func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType {
c.curValType = chunkenc.ValHistogram
}
if c.curValType == chunkenc.ValHistogram {
h := &c.series.histograms[c.histogramsCur]
c.curValType = getHistogramValType(h)
c.err = validateHistogramSchema(h)
c.setCurrentHistogram()
}
if c.err != nil {
c.curValType = chunkenc.ValNone
@ -471,18 +474,57 @@ func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType {
return c.curValType
}
func validateHistogramSchema(h *prompb.Histogram) error {
if histogram.IsKnownSchema(h.Schema) {
return nil
}
return histogram.UnknownSchemaError(h.Schema)
}
// setCurrentHistogram pre-fills either the curH or the curFH field with a
// converted model histogram and sets c.curValType accordingly. It validates the
// histogram and sets c.err accordingly. This all has to be done in Seek() and
// Next() already so that we know if the histogram we got from the remote-read
// source is valid or not before we allow the AtHistogram()/AtFloatHistogram()
// call.
func (c *concreteSeriesIterator) setCurrentHistogram() {
pbH := c.series.histograms[c.histogramsCur]
func getHistogramValType(h *prompb.Histogram) chunkenc.ValueType {
if h.IsFloatHistogram() {
return chunkenc.ValFloatHistogram
// Basic schema check first.
schema := pbH.Schema
if !histogram.IsKnownSchema(schema) {
c.err = histogram.UnknownSchemaError(schema)
return
}
return chunkenc.ValHistogram
if pbH.IsFloatHistogram() {
c.curValType = chunkenc.ValFloatHistogram
mFH := pbH.ToFloatHistogram()
if mFH.Schema > histogram.ExponentialSchemaMax && mFH.Schema <= histogram.ExponentialSchemaMaxReserved {
// This is a very slow path, but it should only happen if the
// sample is from a newer Prometheus version that supports higher
// resolution.
if err := mFH.ReduceResolution(histogram.ExponentialSchemaMax); err != nil {
c.err = err
return
}
}
if err := mFH.Validate(); err != nil {
c.err = err
return
}
c.curFH = mFH
return
}
c.curValType = chunkenc.ValHistogram
mH := pbH.ToIntHistogram()
if mH.Schema > histogram.ExponentialSchemaMax && mH.Schema <= histogram.ExponentialSchemaMaxReserved {
// This is a very slow path, but it should only happen if the
// sample is from a newer Prometheus version that supports higher
// resolution.
if err := mH.ReduceResolution(histogram.ExponentialSchemaMax); err != nil {
c.err = err
return
}
}
if err := mH.Validate(); err != nil {
c.err = err
return
}
c.curH = mH
}
// At implements chunkenc.Iterator.
@ -499,31 +541,19 @@ func (c *concreteSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *hist
if c.curValType != chunkenc.ValHistogram {
panic("iterator is not on an integer histogram sample")
}
h := c.series.histograms[c.histogramsCur]
mh := h.ToIntHistogram()
if mh.Schema > histogram.ExponentialSchemaMax && mh.Schema <= histogram.ExponentialSchemaMaxReserved {
// This is a very slow path, but it should only happen if the
// sample is from a newer Prometheus version that supports higher
// resolution.
mh.ReduceResolution(histogram.ExponentialSchemaMax)
}
return h.Timestamp, mh
return c.series.histograms[c.histogramsCur].Timestamp, c.curH
}
// AtFloatHistogram implements chunkenc.Iterator.
func (c *concreteSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
if c.curValType == chunkenc.ValHistogram || c.curValType == chunkenc.ValFloatHistogram {
fh := c.series.histograms[c.histogramsCur]
mfh := fh.ToFloatHistogram() // integer will be auto-converted.
if mfh.Schema > histogram.ExponentialSchemaMax && mfh.Schema <= histogram.ExponentialSchemaMaxReserved {
// This is a very slow path, but it should only happen if the
// sample is from a newer Prometheus version that supports higher
// resolution.
mfh.ReduceResolution(histogram.ExponentialSchemaMax)
}
return fh.Timestamp, mfh
switch c.curValType {
case chunkenc.ValFloatHistogram:
return c.series.histograms[c.histogramsCur].Timestamp, c.curFH
case chunkenc.ValHistogram:
return c.series.histograms[c.histogramsCur].Timestamp, c.curH.ToFloat(nil)
default:
panic("iterator is not on a histogram sample")
}
panic("iterator is not on a histogram sample")
}
// AtT implements chunkenc.Iterator.
@ -571,9 +601,7 @@ func (c *concreteSeriesIterator) Next() chunkenc.ValueType {
}
if c.curValType == chunkenc.ValHistogram {
h := &c.series.histograms[c.histogramsCur]
c.curValType = getHistogramValType(h)
c.err = validateHistogramSchema(h)
c.setCurrentHistogram()
}
if c.err != nil {
c.curValType = chunkenc.ValNone

View file

@ -546,7 +546,7 @@ func TestConcreteSeriesIterator_FloatAndHistogramSamples(t *testing.T) {
require.Equal(t, chunkenc.ValNone, it.Seek(1))
}
func TestConcreteSeriesIterator_InvalidHistogramSamples(t *testing.T) {
func TestConcreteSeriesIterator_HistogramSamplesWithInvalidSchema(t *testing.T) {
for _, schema := range []int32{-100, 100} {
t.Run(fmt.Sprintf("schema=%d", schema), func(t *testing.T) {
h := prompb.FromIntHistogram(2, &testHistogram)
@ -591,6 +591,47 @@ func TestConcreteSeriesIterator_InvalidHistogramSamples(t *testing.T) {
}
}
func TestConcreteSeriesIterator_HistogramSamplesWithMissingBucket(t *testing.T) {
mh := testHistogram.Copy()
mh.PositiveSpans = []histogram.Span{{Offset: 0, Length: 2}}
h := prompb.FromIntHistogram(2, mh)
fh := prompb.FromFloatHistogram(4, mh.ToFloat(nil))
series := &concreteSeries{
labels: labels.FromStrings("foo", "bar"),
floats: []prompb.Sample{
{Value: 1, Timestamp: 0},
{Value: 2, Timestamp: 3},
},
histograms: []prompb.Histogram{
h,
fh,
},
}
it := series.Iterator(nil)
require.Equal(t, chunkenc.ValFloat, it.Next())
require.Equal(t, chunkenc.ValNone, it.Next())
require.Error(t, it.Err())
require.ErrorIs(t, it.Err(), histogram.ErrHistogramSpansBucketsMismatch)
it = series.Iterator(it)
require.Equal(t, chunkenc.ValFloat, it.Next())
require.Equal(t, chunkenc.ValNone, it.Next())
require.ErrorIs(t, it.Err(), histogram.ErrHistogramSpansBucketsMismatch)
it = series.Iterator(it)
require.Equal(t, chunkenc.ValNone, it.Seek(1))
require.ErrorIs(t, it.Err(), histogram.ErrHistogramSpansBucketsMismatch)
it = series.Iterator(it)
require.Equal(t, chunkenc.ValFloat, it.Seek(3))
require.Equal(t, chunkenc.ValNone, it.Next())
require.ErrorIs(t, it.Err(), histogram.ErrHistogramSpansBucketsMismatch)
it = series.Iterator(it)
require.Equal(t, chunkenc.ValNone, it.Seek(4))
require.ErrorIs(t, it.Err(), histogram.ErrHistogramSpansBucketsMismatch)
}
func TestConcreteSeriesIterator_ReducesHighResolutionHistograms(t *testing.T) {
for _, schema := range []int32{9, 52} {
t.Run(fmt.Sprintf("schema=%d", schema), func(t *testing.T) {

View file

@ -697,19 +697,23 @@ func (app *remoteWriteAppender) Append(ref storage.SeriesRef, lset labels.Labels
}
func (app *remoteWriteAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
var err error
if t > app.maxTime {
return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds)
}
if h != nil && histogram.IsExponentialSchemaReserved(h.Schema) && h.Schema > histogram.ExponentialSchemaMax {
h = h.ReduceResolution(histogram.ExponentialSchemaMax)
if err = h.ReduceResolution(histogram.ExponentialSchemaMax); err != nil {
return 0, err
}
}
if fh != nil && histogram.IsExponentialSchemaReserved(fh.Schema) && fh.Schema > histogram.ExponentialSchemaMax {
fh = fh.ReduceResolution(histogram.ExponentialSchemaMax)
if err = fh.ReduceResolution(histogram.ExponentialSchemaMax); err != nil {
return 0, err
}
}
ref, err := app.Appender.AppendHistogram(ref, l, t, h, fh)
if err != nil {
if ref, err = app.Appender.AppendHistogram(ref, l, t, h, fh); err != nil {
return 0, err
}
return ref, nil

View file

@ -884,7 +884,14 @@ func (it *floatHistogramIterator) AtFloatHistogram(fh *histogram.FloatHistogram)
// chunk is from a newer Prometheus version that supports higher
// resolution.
fh = fh.Copy()
fh.ReduceResolution(histogram.ExponentialSchemaMax)
if err := fh.ReduceResolution(histogram.ExponentialSchemaMax); err != nil {
// With the checks above, this can only happen
// with invalid data in a chunk. As this is a
// rare edge case of a rare edge case, we'd
// rather not create all the plumbing to handle
// this error gracefully.
panic(err)
}
}
return it.t, fh
}
@ -915,7 +922,13 @@ func (it *floatHistogramIterator) AtFloatHistogram(fh *histogram.FloatHistogram)
// This is a very slow path, but it should only happen if the
// chunk is from a newer Prometheus version that supports higher
// resolution.
fh.ReduceResolution(histogram.ExponentialSchemaMax)
if err := fh.ReduceResolution(histogram.ExponentialSchemaMax); err != nil {
// With the checks above, this can only happen with
// invalid data in a chunk. As this is a rare edge case
// of a rare edge case, we'd rather not create all the
// plumbing to handle this error gracefully.
panic(err)
}
}
return it.t, fh

View file

@ -939,7 +939,14 @@ func (it *histogramIterator) AtHistogram(h *histogram.Histogram) (int64, *histog
// chunk is from a newer Prometheus version that supports higher
// resolution.
h = h.Copy()
h.ReduceResolution(histogram.ExponentialSchemaMax)
if err := h.ReduceResolution(histogram.ExponentialSchemaMax); err != nil {
// With the checks above, this can only happen
// with invalid data in a chunk. As this is a
// rare edge case of a rare edge case, we'd
// rather not create all the plumbing to handle
// this error gracefully.
panic(err)
}
}
return it.t, h
}
@ -970,7 +977,13 @@ func (it *histogramIterator) AtHistogram(h *histogram.Histogram) (int64, *histog
// This is a very slow path, but it should only happen if the
// chunk is from a newer Prometheus version that supports higher
// resolution.
h.ReduceResolution(histogram.ExponentialSchemaMax)
if err := h.ReduceResolution(histogram.ExponentialSchemaMax); err != nil {
// With the checks above, this can only happen with
// invalid data in a chunk. As this is a rare edge case
// of a rare edge case, we'd rather not create all the
// plumbing to handle this error gracefully.
panic(err)
}
}
return it.t, h
@ -1000,7 +1013,14 @@ func (it *histogramIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int
// chunk is from a newer Prometheus version that supports higher
// resolution.
fh = fh.Copy()
fh.ReduceResolution(histogram.ExponentialSchemaMax)
if err := fh.ReduceResolution(histogram.ExponentialSchemaMax); err != nil {
// With the checks above, this can only happen
// with invalid data in a chunk. As this is a
// rare edge case of a rare edge case, we'd
// rather not create all the plumbing to handle
// this error gracefully.
panic(err)
}
}
return it.t, fh
}
@ -1039,7 +1059,13 @@ func (it *histogramIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int
// This is a very slow path, but it should only happen if the
// chunk is from a newer Prometheus version that supports higher
// resolution.
fh.ReduceResolution(histogram.ExponentialSchemaMax)
if err := fh.ReduceResolution(histogram.ExponentialSchemaMax); err != nil {
// With the checks above, this can only happen with
// invalid data in a chunk. As this is a rare edge case
// of a rare edge case, we'd rather not create all the
// plumbing to handle this error gracefully.
panic(err)
}
}
return it.t, fh

View file

@ -475,7 +475,9 @@ func (d *Decoder) HistogramSamples(rec []byte, histograms []RefHistogramSample)
// This is a very slow path, but it should only happen if the
// record is from a newer Prometheus version that supports higher
// resolution.
rh.H.ReduceResolution(histogram.ExponentialSchemaMax)
if err := rh.H.ReduceResolution(histogram.ExponentialSchemaMax); err != nil {
return nil, fmt.Errorf("error reducing resolution of histogram #%d: %w", len(histograms)+1, err)
}
}
histograms = append(histograms, rh)
@ -579,7 +581,9 @@ func (d *Decoder) FloatHistogramSamples(rec []byte, histograms []RefFloatHistogr
// This is a very slow path, but it should only happen if the
// record is from a newer Prometheus version that supports higher
// resolution.
rh.FH.ReduceResolution(histogram.ExponentialSchemaMax)
if err := rh.FH.ReduceResolution(histogram.ExponentialSchemaMax); err != nil {
return nil, fmt.Errorf("error reducing resolution of histogram #%d: %w", len(histograms)+1, err)
}
}
histograms = append(histograms, rh)