From b8d19543b8ddaac0c528f73db308a5771d423304 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Rabenstein?= Date: Fri, 21 Nov 2025 00:22:24 +0100 Subject: [PATCH] Add histogram validation in remote-read and during reducing resolution (#17561) ReduceResolution is currently called before validation during ingestion. This will cause a panic if there are not enough buckets in the histogram. If there are too many buckets, the spurious buckets are ignored, and therefore the error in the input histogram is masked. Furthermore, invalid negative offsets might cause problems, too. Therefore, we need to do some minimal validation in reduceResolution. Fortunately, it is easy and shouldn't slow things down. Sadly, it requires to return errors, which triggers a bunch of code changes. Even here is a bright side, we can get rud of a few panics. (Remember: Don't panic!) In different news, we haven't done a full validation of histograms read via remote-read. This is not so much a security concern (as you can throw off Prometheus easily by feeding it bogus data via remote-read) but more that remote-read sources might be makeshift and could accidentally create invalid histograms. We really don't want to panic in that case. So this commit does not only add a check of the spans and buckets as needed for resolution reduction but also a full validation during remote-read. Signed-off-by: beorn7 --- model/histogram/float_histogram.go | 55 ++++++++----- model/histogram/float_histogram_test.go | 67 +++++++++++++--- model/histogram/generic.go | 37 ++++++++- model/histogram/generic_test.go | 8 +- model/histogram/histogram.go | 36 ++++++--- model/histogram/histogram_test.go | 67 +++++++++++++--- model/textparse/nhcbparse.go | 2 +- scrape/target.go | 24 ++++-- storage/remote/codec.go | 100 +++++++++++++++--------- storage/remote/codec_test.go | 43 +++++++++- storage/remote/write_handler.go | 12 ++- tsdb/chunkenc/float_histogram.go | 17 +++- tsdb/chunkenc/histogram.go | 34 +++++++- tsdb/record/record.go | 8 +- 14 files changed, 395 insertions(+), 115 deletions(-) diff --git a/model/histogram/float_histogram.go b/model/histogram/float_histogram.go index 28f35572c2..91fcac1cfb 100644 --- a/model/histogram/float_histogram.go +++ b/model/histogram/float_histogram.go @@ -164,8 +164,8 @@ func (h *FloatHistogram) CopyToSchema(targetSchema int32) *FloatHistogram { Sum: h.Sum, } - c.PositiveSpans, c.PositiveBuckets = reduceResolution(h.PositiveSpans, h.PositiveBuckets, h.Schema, targetSchema, false, false) - c.NegativeSpans, c.NegativeBuckets = reduceResolution(h.NegativeSpans, h.NegativeBuckets, h.Schema, targetSchema, false, false) + c.PositiveSpans, c.PositiveBuckets = mustReduceResolution(h.PositiveSpans, h.PositiveBuckets, h.Schema, targetSchema, false, false) + c.NegativeSpans, c.NegativeBuckets = mustReduceResolution(h.NegativeSpans, h.NegativeBuckets, h.Schema, targetSchema, false, false) return &c } @@ -393,13 +393,13 @@ func (h *FloatHistogram) Add(other *FloatHistogram) (res *FloatHistogram, counte switch { case other.Schema < h.Schema: - hPositiveSpans, hPositiveBuckets = reduceResolution(hPositiveSpans, hPositiveBuckets, h.Schema, other.Schema, false, true) - hNegativeSpans, hNegativeBuckets = reduceResolution(hNegativeSpans, hNegativeBuckets, h.Schema, other.Schema, false, true) + hPositiveSpans, hPositiveBuckets = mustReduceResolution(hPositiveSpans, hPositiveBuckets, h.Schema, other.Schema, false, true) + hNegativeSpans, hNegativeBuckets = mustReduceResolution(hNegativeSpans, hNegativeBuckets, h.Schema, other.Schema, false, true) h.Schema = other.Schema case other.Schema > h.Schema: - otherPositiveSpans, otherPositiveBuckets = reduceResolution(otherPositiveSpans, otherPositiveBuckets, other.Schema, h.Schema, false, false) - otherNegativeSpans, otherNegativeBuckets = reduceResolution(otherNegativeSpans, otherNegativeBuckets, other.Schema, h.Schema, false, false) + otherPositiveSpans, otherPositiveBuckets = mustReduceResolution(otherPositiveSpans, otherPositiveBuckets, other.Schema, h.Schema, false, false) + otherNegativeSpans, otherNegativeBuckets = mustReduceResolution(otherNegativeSpans, otherNegativeBuckets, other.Schema, h.Schema, false, false) } h.PositiveSpans, h.PositiveBuckets = addBuckets(h.Schema, h.ZeroThreshold, false, hPositiveSpans, hPositiveBuckets, otherPositiveSpans, otherPositiveBuckets) @@ -459,12 +459,12 @@ func (h *FloatHistogram) Sub(other *FloatHistogram) (res *FloatHistogram, counte switch { case other.Schema < h.Schema: - hPositiveSpans, hPositiveBuckets = reduceResolution(hPositiveSpans, hPositiveBuckets, h.Schema, other.Schema, false, true) - hNegativeSpans, hNegativeBuckets = reduceResolution(hNegativeSpans, hNegativeBuckets, h.Schema, other.Schema, false, true) + hPositiveSpans, hPositiveBuckets = mustReduceResolution(hPositiveSpans, hPositiveBuckets, h.Schema, other.Schema, false, true) + hNegativeSpans, hNegativeBuckets = mustReduceResolution(hNegativeSpans, hNegativeBuckets, h.Schema, other.Schema, false, true) h.Schema = other.Schema case other.Schema > h.Schema: - otherPositiveSpans, otherPositiveBuckets = reduceResolution(otherPositiveSpans, otherPositiveBuckets, other.Schema, h.Schema, false, false) - otherNegativeSpans, otherNegativeBuckets = reduceResolution(otherNegativeSpans, otherNegativeBuckets, other.Schema, h.Schema, false, false) + otherPositiveSpans, otherPositiveBuckets = mustReduceResolution(otherPositiveSpans, otherPositiveBuckets, other.Schema, h.Schema, false, false) + otherNegativeSpans, otherNegativeBuckets = mustReduceResolution(otherNegativeSpans, otherNegativeBuckets, other.Schema, h.Schema, false, false) } h.PositiveSpans, h.PositiveBuckets = addBuckets(h.Schema, h.ZeroThreshold, true, hPositiveSpans, hPositiveBuckets, otherPositiveSpans, otherPositiveBuckets) @@ -1582,25 +1582,40 @@ func addCustomBucketsWithMismatches( } // ReduceResolution reduces the float histogram's spans, buckets into target schema. -// The target schema must be smaller than the current float histogram's schema. -// This will panic if the histogram has custom buckets or if the target schema is -// a custom buckets schema. -func (h *FloatHistogram) ReduceResolution(targetSchema int32) *FloatHistogram { +// An error is returned in the following cases: +// - The target schema is not smaller than the current histogram's schema. +// - The histogram has custom buckets. +// - The target schema is a custom buckets schema. +// - Any spans have an invalid offset. +// - The spans are inconsistent with the number of buckets. +func (h *FloatHistogram) ReduceResolution(targetSchema int32) error { + // Note that the follow three returns are not returning a + // histogram.Error because they are programming errors. if h.UsesCustomBuckets() { - panic("cannot reduce resolution when there are custom buckets") + return errors.New("cannot reduce resolution when there are custom buckets") } if IsCustomBucketsSchema(targetSchema) { - panic("cannot reduce resolution to custom buckets schema") + return errors.New("cannot reduce resolution to custom buckets schema") } if targetSchema >= h.Schema { - panic(fmt.Errorf("cannot reduce resolution from schema %d to %d", h.Schema, targetSchema)) + return fmt.Errorf("cannot reduce resolution from schema %d to %d", h.Schema, targetSchema) } - h.PositiveSpans, h.PositiveBuckets = reduceResolution(h.PositiveSpans, h.PositiveBuckets, h.Schema, targetSchema, false, true) - h.NegativeSpans, h.NegativeBuckets = reduceResolution(h.NegativeSpans, h.NegativeBuckets, h.Schema, targetSchema, false, true) + var err error + + if h.PositiveSpans, h.PositiveBuckets, err = reduceResolution( + h.PositiveSpans, h.PositiveBuckets, h.Schema, targetSchema, false, true, + ); err != nil { + return err + } + if h.NegativeSpans, h.NegativeBuckets, err = reduceResolution( + h.NegativeSpans, h.NegativeBuckets, h.Schema, targetSchema, false, true, + ); err != nil { + return err + } h.Schema = targetSchema - return h + return nil } // checkSchemaAndBounds checks if two histograms are compatible because they diff --git a/model/histogram/float_histogram_test.go b/model/histogram/float_histogram_test.go index ac339f152e..e79f5a0f49 100644 --- a/model/histogram/float_histogram_test.go +++ b/model/histogram/float_histogram_test.go @@ -4141,14 +4141,16 @@ func createRandomSpans(rng *rand.Rand, spanNum int32) ([]Span, []float64) { func TestFloatHistogramReduceResolution(t *testing.T) { tcs := map[string]struct { - origin *FloatHistogram - target *FloatHistogram + origin *FloatHistogram + targetSchema int32 + target *FloatHistogram + errorMsg string }{ "valid float histogram": { origin: &FloatHistogram{ Schema: 0, PositiveSpans: []Span{ - {Offset: 0, Length: 4}, + {Offset: -2, Length: 4}, {Offset: 0, Length: 0}, {Offset: 3, Length: 2}, }, @@ -4160,10 +4162,11 @@ func TestFloatHistogramReduceResolution(t *testing.T) { }, NegativeBuckets: []float64{1, 3, 1, 2, 1, 1}, }, + targetSchema: -1, target: &FloatHistogram{ Schema: -1, PositiveSpans: []Span{ - {Offset: 0, Length: 3}, + {Offset: -1, Length: 3}, {Offset: 1, Length: 1}, }, PositiveBuckets: []float64{1, 4, 2, 2}, @@ -4174,12 +4177,58 @@ func TestFloatHistogramReduceResolution(t *testing.T) { NegativeBuckets: []float64{1, 4, 2, 2}, }, }, + "not enough buckets": { + origin: &FloatHistogram{ + Schema: 0, + PositiveSpans: []Span{ + {Offset: -2, Length: 4}, + {Offset: 0, Length: 0}, + {Offset: 3, Length: 2}, + }, + PositiveBuckets: []float64{1, 3, 1, 2, 1}, + }, + targetSchema: -1, + errorMsg: "have 5 buckets but spans need more: histogram spans specify different number of buckets than provided", + }, + "too many buckets": { + origin: &FloatHistogram{ + Schema: 0, + PositiveSpans: []Span{ + {Offset: -2, Length: 4}, + {Offset: 0, Length: 0}, + {Offset: 3, Length: 2}, + }, + PositiveBuckets: []float64{1, 3, 1, 2, 1, 1, 5}, + }, + targetSchema: -1, + errorMsg: "spans need 6 buckets, have 7 buckets: histogram spans specify different number of buckets than provided", + }, + "negative offset": { + origin: &FloatHistogram{ + Schema: 0, + PositiveSpans: []Span{ + {Offset: -2, Length: 4}, + {Offset: -1, Length: 0}, + {Offset: 3, Length: 2}, + }, + PositiveBuckets: []float64{1, 3, 1, 2, 1, 1}, + }, + targetSchema: -1, + errorMsg: "span number 2 with offset -1: histogram has a span whose offset is negative", + }, } - for _, tc := range tcs { - target := tc.origin.ReduceResolution(tc.target.Schema) - require.Equal(t, tc.target, target) - // Check that receiver histogram was mutated: - require.Equal(t, tc.target, tc.origin) + for tn, tc := range tcs { + t.Run(tn, func(t *testing.T) { + err := tc.origin.ReduceResolution(tc.targetSchema) + if tc.errorMsg != "" { + require.Equal(t, tc.errorMsg, err.Error()) + // The returned error should be a histogram.Error. + require.ErrorAs(t, err, &Error{}) + return + } + require.NoError(t, err) + require.Equal(t, tc.target, tc.origin) + }) } } diff --git a/model/histogram/generic.go b/model/histogram/generic.go index cd385407d5..649db769c7 100644 --- a/model/histogram/generic.go +++ b/model/histogram/generic.go @@ -738,6 +738,8 @@ var exponentialBounds = [][]float64{ // deltas. Set it to false if the buckets contain absolute counts. // Set inplace to true to reuse input slices and avoid allocations (otherwise // new slices will be allocated for result). +// The functions returns an error if there are too many or too few buckets for the spans +// or if any span except the first has a negative offset. func reduceResolution[IBC InternalBucketCount]( originSpans []Span, originBuckets []IBC, @@ -745,7 +747,7 @@ func reduceResolution[IBC InternalBucketCount]( targetSchema int32, deltaBuckets bool, inplace bool, -) ([]Span, []IBC) { +) ([]Span, []IBC, error) { var ( targetSpans []Span // The spans in the target schema. targetBuckets []IBC // The bucket counts in the target schema. @@ -764,10 +766,18 @@ func reduceResolution[IBC InternalBucketCount]( targetBuckets = originBuckets[:0] } - for _, span := range originSpans { + for n, span := range originSpans { + if n > 0 && span.Offset < 0 { + return nil, nil, fmt.Errorf("span number %d with offset %d: %w", n+1, span.Offset, ErrHistogramSpanNegativeOffset) + } // Determine the index of the first bucket in this span. bucketIdx += span.Offset for j := 0; j < int(span.Length); j++ { + // Protect against too few buckets in the origin. + if bucketCountIdx >= len(originBuckets) { + return nil, nil, fmt.Errorf("have %d buckets but spans need more: %w", len(originBuckets), ErrHistogramSpansBucketsMismatch) + } + // Determine the index of the bucket in the target schema from the index in the original schema. targetBucketIdx = targetIdx(bucketIdx, originSchema, targetSchema) @@ -826,12 +836,33 @@ func reduceResolution[IBC InternalBucketCount]( targetBuckets = append(targetBuckets, originBuckets[bucketCountIdx]) } } - bucketIdx++ bucketCountIdx++ } } + if bucketCountIdx != len(originBuckets) { + return nil, nil, fmt.Errorf("spans need %d buckets, have %d buckets: %w", bucketCountIdx, len(originBuckets), ErrHistogramSpansBucketsMismatch) + } + return targetSpans, targetBuckets, nil +} +// mustReduceResolution works like reduceResolution, but panics instead of +// returning an error. Use mustReduceResolution if you are sure that the spans +// and buckets are valid. +func mustReduceResolution[IBC InternalBucketCount]( + originSpans []Span, + originBuckets []IBC, + originSchema, + targetSchema int32, + deltaBuckets bool, + inplace bool, +) ([]Span, []IBC) { + targetSpans, targetBuckets, err := reduceResolution( + originSpans, originBuckets, originSchema, targetSchema, deltaBuckets, inplace, + ) + if err != nil { + panic(err) + } return targetSpans, targetBuckets } diff --git a/model/histogram/generic_test.go b/model/histogram/generic_test.go index 1651830e9d..54324beaff 100644 --- a/model/histogram/generic_test.go +++ b/model/histogram/generic_test.go @@ -142,7 +142,7 @@ func TestReduceResolutionHistogram(t *testing.T) { for _, tc := range cases { spansCopy, bucketsCopy := slices.Clone(tc.spans), slices.Clone(tc.buckets) - spans, buckets := reduceResolution(tc.spans, tc.buckets, tc.schema, tc.targetSchema, true, false) + spans, buckets := mustReduceResolution(tc.spans, tc.buckets, tc.schema, tc.targetSchema, true, false) require.Equal(t, tc.expectedSpans, spans) require.Equal(t, tc.expectedBuckets, buckets) // Verify inputs were not mutated: @@ -151,7 +151,7 @@ func TestReduceResolutionHistogram(t *testing.T) { // Output slices reuse input slices: const inplace = true - spans, buckets = reduceResolution(tc.spans, tc.buckets, tc.schema, tc.targetSchema, true, inplace) + spans, buckets = mustReduceResolution(tc.spans, tc.buckets, tc.schema, tc.targetSchema, true, inplace) require.Equal(t, tc.expectedSpans, spans) require.Equal(t, tc.expectedBuckets, buckets) // Verify inputs were mutated which is now expected: @@ -190,7 +190,7 @@ func TestReduceResolutionFloatHistogram(t *testing.T) { for _, tc := range cases { spansCopy, bucketsCopy := slices.Clone(tc.spans), slices.Clone(tc.buckets) - spans, buckets := reduceResolution(tc.spans, tc.buckets, tc.schema, tc.targetSchema, false, false) + spans, buckets := mustReduceResolution(tc.spans, tc.buckets, tc.schema, tc.targetSchema, false, false) require.Equal(t, tc.expectedSpans, spans) require.Equal(t, tc.expectedBuckets, buckets) // Verify inputs were not mutated: @@ -199,7 +199,7 @@ func TestReduceResolutionFloatHistogram(t *testing.T) { // Output slices reuse input slices: const inplace = true - spans, buckets = reduceResolution(tc.spans, tc.buckets, tc.schema, tc.targetSchema, false, inplace) + spans, buckets = mustReduceResolution(tc.spans, tc.buckets, tc.schema, tc.targetSchema, false, inplace) require.Equal(t, tc.expectedSpans, spans) require.Equal(t, tc.expectedBuckets, buckets) // Verify inputs were mutated which is now expected: diff --git a/model/histogram/histogram.go b/model/histogram/histogram.go index 959df4c87a..5fc68ef9d0 100644 --- a/model/histogram/histogram.go +++ b/model/histogram/histogram.go @@ -14,6 +14,7 @@ package histogram import ( + "errors" "fmt" "math" "slices" @@ -617,26 +618,37 @@ func (c *cumulativeBucketIterator) At() Bucket[uint64] { } // ReduceResolution reduces the histogram's spans, buckets into target schema. -// The target schema must be smaller than the current histogram's schema. -// This will panic if the histogram has custom buckets or if the target schema is -// a custom buckets schema. -func (h *Histogram) ReduceResolution(targetSchema int32) *Histogram { +// An error is returned in the following cases: +// - The target schema is not smaller than the current histogram's schema. +// - The histogram has custom buckets. +// - The target schema is a custom buckets schema. +// - Any spans have an invalid offset. +// - The spans are inconsistent with the number of buckets. +func (h *Histogram) ReduceResolution(targetSchema int32) error { + // Note that the follow three returns are not returning a + // histogram.Error because they are programming errors. if h.UsesCustomBuckets() { - panic("cannot reduce resolution when there are custom buckets") + return errors.New("cannot reduce resolution when there are custom buckets") } if IsCustomBucketsSchema(targetSchema) { - panic("cannot reduce resolution to custom buckets schema") + return errors.New("cannot reduce resolution to custom buckets schema") } if targetSchema >= h.Schema { - panic(fmt.Errorf("cannot reduce resolution from schema %d to %d", h.Schema, targetSchema)) + return fmt.Errorf("cannot reduce resolution from schema %d to %d", h.Schema, targetSchema) } - h.PositiveSpans, h.PositiveBuckets = reduceResolution( + var err error + + if h.PositiveSpans, h.PositiveBuckets, err = reduceResolution( h.PositiveSpans, h.PositiveBuckets, h.Schema, targetSchema, true, true, - ) - h.NegativeSpans, h.NegativeBuckets = reduceResolution( + ); err != nil { + return err + } + if h.NegativeSpans, h.NegativeBuckets, err = reduceResolution( h.NegativeSpans, h.NegativeBuckets, h.Schema, targetSchema, true, true, - ) + ); err != nil { + return err + } h.Schema = targetSchema - return h + return nil } diff --git a/model/histogram/histogram_test.go b/model/histogram/histogram_test.go index e4c6ce683b..ae17f9be37 100644 --- a/model/histogram/histogram_test.go +++ b/model/histogram/histogram_test.go @@ -1719,14 +1719,16 @@ func BenchmarkHistogramValidation(b *testing.B) { func TestHistogramReduceResolution(t *testing.T) { tcs := map[string]struct { - origin *Histogram - target *Histogram + origin *Histogram + targetSchema int32 + target *Histogram + errorMsg string }{ "valid histogram": { origin: &Histogram{ Schema: 0, PositiveSpans: []Span{ - {Offset: 0, Length: 4}, + {Offset: -2, Length: 4}, {Offset: 0, Length: 0}, {Offset: 3, Length: 2}, }, @@ -1738,10 +1740,11 @@ func TestHistogramReduceResolution(t *testing.T) { }, NegativeBuckets: []int64{1, 2, -2, 1, -1, 0}, }, + targetSchema: -1, target: &Histogram{ Schema: -1, PositiveSpans: []Span{ - {Offset: 0, Length: 3}, + {Offset: -1, Length: 3}, {Offset: 1, Length: 1}, }, PositiveBuckets: []int64{1, 3, -2, 0}, @@ -1752,12 +1755,58 @@ func TestHistogramReduceResolution(t *testing.T) { NegativeBuckets: []int64{1, 3, -2, 0}, }, }, + "not enough buckets": { + origin: &Histogram{ + Schema: 0, + PositiveSpans: []Span{ + {Offset: -2, Length: 4}, + {Offset: 0, Length: 0}, + {Offset: 3, Length: 2}, + }, + PositiveBuckets: []int64{1, 2, -2, 1, -1}, + }, + targetSchema: -1, + errorMsg: "have 5 buckets but spans need more: histogram spans specify different number of buckets than provided", + }, + "too many buckets": { + origin: &Histogram{ + Schema: 0, + PositiveSpans: []Span{ + {Offset: -2, Length: 4}, + {Offset: 0, Length: 0}, + {Offset: 3, Length: 2}, + }, + PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 3}, + }, + targetSchema: -1, + errorMsg: "spans need 6 buckets, have 7 buckets: histogram spans specify different number of buckets than provided", + }, + "negative offset": { + origin: &Histogram{ + Schema: 0, + PositiveSpans: []Span{ + {Offset: -2, Length: 4}, + {Offset: -1, Length: 0}, + {Offset: 3, Length: 2}, + }, + PositiveBuckets: []int64{1, 2, -2, 1, -1, 0}, + }, + targetSchema: -1, + errorMsg: "span number 2 with offset -1: histogram has a span whose offset is negative", + }, } - for _, tc := range tcs { - target := tc.origin.ReduceResolution(tc.target.Schema) - require.Equal(t, tc.target, target) - // Check that receiver histogram was mutated: - require.Equal(t, tc.target, tc.origin) + for tn, tc := range tcs { + t.Run(tn, func(t *testing.T) { + err := tc.origin.ReduceResolution(tc.targetSchema) + if tc.errorMsg != "" { + require.Equal(t, tc.errorMsg, err.Error()) + // The returned error should be a histogram.Error. + require.ErrorAs(t, err, &Error{}) + return + } + require.NoError(t, err) + require.Equal(t, tc.target, tc.origin) + }) } } diff --git a/model/textparse/nhcbparse.go b/model/textparse/nhcbparse.go index ab821f0e63..79441e1f75 100644 --- a/model/textparse/nhcbparse.go +++ b/model/textparse/nhcbparse.go @@ -352,7 +352,7 @@ func (p *NHCBParser) swapExemplars() { } // processNHCB converts the collated classic histogram series to NHCB and caches the info -// to be returned to callers. Retruns true if the conversion was successful. +// to be returned to callers. Returns true if the conversion was successful. func (p *NHCBParser) processNHCB() bool { if p.state != stateCollecting { return false diff --git a/scrape/target.go b/scrape/target.go index 563fe33f82..2aabff20e2 100644 --- a/scrape/target.go +++ b/scrape/target.go @@ -389,6 +389,7 @@ type bucketLimitAppender struct { } func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + var err error if h != nil { // Return with an early error if the histogram has too many buckets and the // schema is not exponential, in which case we can't reduce the resolution. @@ -399,7 +400,9 @@ func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labe if h.Schema <= histogram.ExponentialSchemaMin { return 0, errBucketLimit } - h = h.ReduceResolution(h.Schema - 1) + if err = h.ReduceResolution(h.Schema - 1); err != nil { + return 0, err + } } } if fh != nil { @@ -412,11 +415,12 @@ func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labe if fh.Schema <= histogram.ExponentialSchemaMin { return 0, errBucketLimit } - fh = fh.ReduceResolution(fh.Schema - 1) + if err = fh.ReduceResolution(fh.Schema - 1); err != nil { + return 0, err + } } } - ref, err := app.Appender.AppendHistogram(ref, lset, t, h, fh) - if err != nil { + if ref, err = app.Appender.AppendHistogram(ref, lset, t, h, fh); err != nil { return 0, err } return ref, nil @@ -429,18 +433,22 @@ type maxSchemaAppender struct { } func (app *maxSchemaAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + var err error if h != nil { if histogram.IsExponentialSchemaReserved(h.Schema) && h.Schema > app.maxSchema { - h = h.ReduceResolution(app.maxSchema) + if err = h.ReduceResolution(app.maxSchema); err != nil { + return 0, err + } } } if fh != nil { if histogram.IsExponentialSchemaReserved(fh.Schema) && fh.Schema > app.maxSchema { - fh = fh.ReduceResolution(app.maxSchema) + if err = fh.ReduceResolution(app.maxSchema); err != nil { + return 0, err + } } } - ref, err := app.Appender.AppendHistogram(ref, lset, t, h, fh) - if err != nil { + if ref, err = app.Appender.AppendHistogram(ref, lset, t, h, fh); err != nil { return 0, err } return ref, nil diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 7e21909354..059d5e66ce 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -389,6 +389,11 @@ type concreteSeriesIterator struct { curValType chunkenc.ValueType series *concreteSeries err error + + // These are pre-filled with the current model histogram if curValType + // is ValHistogram or ValFloatHistogram, respectively. + curH *histogram.Histogram + curFH *histogram.FloatHistogram } func newConcreteSeriesIterator(series *concreteSeries) chunkenc.Iterator { @@ -461,9 +466,7 @@ func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType { c.curValType = chunkenc.ValHistogram } if c.curValType == chunkenc.ValHistogram { - h := &c.series.histograms[c.histogramsCur] - c.curValType = getHistogramValType(h) - c.err = validateHistogramSchema(h) + c.setCurrentHistogram() } if c.err != nil { c.curValType = chunkenc.ValNone @@ -471,18 +474,57 @@ func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType { return c.curValType } -func validateHistogramSchema(h *prompb.Histogram) error { - if histogram.IsKnownSchema(h.Schema) { - return nil - } - return histogram.UnknownSchemaError(h.Schema) -} +// setCurrentHistogram pre-fills either the curH or the curFH field with a +// converted model histogram and sets c.curValType accordingly. It validates the +// histogram and sets c.err accordingly. This all has to be done in Seek() and +// Next() already so that we know if the histogram we got from the remote-read +// source is valid or not before we allow the AtHistogram()/AtFloatHistogram() +// call. +func (c *concreteSeriesIterator) setCurrentHistogram() { + pbH := c.series.histograms[c.histogramsCur] -func getHistogramValType(h *prompb.Histogram) chunkenc.ValueType { - if h.IsFloatHistogram() { - return chunkenc.ValFloatHistogram + // Basic schema check first. + schema := pbH.Schema + if !histogram.IsKnownSchema(schema) { + c.err = histogram.UnknownSchemaError(schema) + return } - return chunkenc.ValHistogram + + if pbH.IsFloatHistogram() { + c.curValType = chunkenc.ValFloatHistogram + mFH := pbH.ToFloatHistogram() + if mFH.Schema > histogram.ExponentialSchemaMax && mFH.Schema <= histogram.ExponentialSchemaMaxReserved { + // This is a very slow path, but it should only happen if the + // sample is from a newer Prometheus version that supports higher + // resolution. + if err := mFH.ReduceResolution(histogram.ExponentialSchemaMax); err != nil { + c.err = err + return + } + } + if err := mFH.Validate(); err != nil { + c.err = err + return + } + c.curFH = mFH + return + } + c.curValType = chunkenc.ValHistogram + mH := pbH.ToIntHistogram() + if mH.Schema > histogram.ExponentialSchemaMax && mH.Schema <= histogram.ExponentialSchemaMaxReserved { + // This is a very slow path, but it should only happen if the + // sample is from a newer Prometheus version that supports higher + // resolution. + if err := mH.ReduceResolution(histogram.ExponentialSchemaMax); err != nil { + c.err = err + return + } + } + if err := mH.Validate(); err != nil { + c.err = err + return + } + c.curH = mH } // At implements chunkenc.Iterator. @@ -499,31 +541,19 @@ func (c *concreteSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *hist if c.curValType != chunkenc.ValHistogram { panic("iterator is not on an integer histogram sample") } - h := c.series.histograms[c.histogramsCur] - mh := h.ToIntHistogram() - if mh.Schema > histogram.ExponentialSchemaMax && mh.Schema <= histogram.ExponentialSchemaMaxReserved { - // This is a very slow path, but it should only happen if the - // sample is from a newer Prometheus version that supports higher - // resolution. - mh.ReduceResolution(histogram.ExponentialSchemaMax) - } - return h.Timestamp, mh + return c.series.histograms[c.histogramsCur].Timestamp, c.curH } // AtFloatHistogram implements chunkenc.Iterator. func (c *concreteSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { - if c.curValType == chunkenc.ValHistogram || c.curValType == chunkenc.ValFloatHistogram { - fh := c.series.histograms[c.histogramsCur] - mfh := fh.ToFloatHistogram() // integer will be auto-converted. - if mfh.Schema > histogram.ExponentialSchemaMax && mfh.Schema <= histogram.ExponentialSchemaMaxReserved { - // This is a very slow path, but it should only happen if the - // sample is from a newer Prometheus version that supports higher - // resolution. - mfh.ReduceResolution(histogram.ExponentialSchemaMax) - } - return fh.Timestamp, mfh + switch c.curValType { + case chunkenc.ValFloatHistogram: + return c.series.histograms[c.histogramsCur].Timestamp, c.curFH + case chunkenc.ValHistogram: + return c.series.histograms[c.histogramsCur].Timestamp, c.curH.ToFloat(nil) + default: + panic("iterator is not on a histogram sample") } - panic("iterator is not on a histogram sample") } // AtT implements chunkenc.Iterator. @@ -571,9 +601,7 @@ func (c *concreteSeriesIterator) Next() chunkenc.ValueType { } if c.curValType == chunkenc.ValHistogram { - h := &c.series.histograms[c.histogramsCur] - c.curValType = getHistogramValType(h) - c.err = validateHistogramSchema(h) + c.setCurrentHistogram() } if c.err != nil { c.curValType = chunkenc.ValNone diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index ce3a09b878..ba67ff33d9 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -546,7 +546,7 @@ func TestConcreteSeriesIterator_FloatAndHistogramSamples(t *testing.T) { require.Equal(t, chunkenc.ValNone, it.Seek(1)) } -func TestConcreteSeriesIterator_InvalidHistogramSamples(t *testing.T) { +func TestConcreteSeriesIterator_HistogramSamplesWithInvalidSchema(t *testing.T) { for _, schema := range []int32{-100, 100} { t.Run(fmt.Sprintf("schema=%d", schema), func(t *testing.T) { h := prompb.FromIntHistogram(2, &testHistogram) @@ -591,6 +591,47 @@ func TestConcreteSeriesIterator_InvalidHistogramSamples(t *testing.T) { } } +func TestConcreteSeriesIterator_HistogramSamplesWithMissingBucket(t *testing.T) { + mh := testHistogram.Copy() + mh.PositiveSpans = []histogram.Span{{Offset: 0, Length: 2}} + h := prompb.FromIntHistogram(2, mh) + fh := prompb.FromFloatHistogram(4, mh.ToFloat(nil)) + series := &concreteSeries{ + labels: labels.FromStrings("foo", "bar"), + floats: []prompb.Sample{ + {Value: 1, Timestamp: 0}, + {Value: 2, Timestamp: 3}, + }, + histograms: []prompb.Histogram{ + h, + fh, + }, + } + it := series.Iterator(nil) + require.Equal(t, chunkenc.ValFloat, it.Next()) + require.Equal(t, chunkenc.ValNone, it.Next()) + require.Error(t, it.Err()) + require.ErrorIs(t, it.Err(), histogram.ErrHistogramSpansBucketsMismatch) + + it = series.Iterator(it) + require.Equal(t, chunkenc.ValFloat, it.Next()) + require.Equal(t, chunkenc.ValNone, it.Next()) + require.ErrorIs(t, it.Err(), histogram.ErrHistogramSpansBucketsMismatch) + + it = series.Iterator(it) + require.Equal(t, chunkenc.ValNone, it.Seek(1)) + require.ErrorIs(t, it.Err(), histogram.ErrHistogramSpansBucketsMismatch) + + it = series.Iterator(it) + require.Equal(t, chunkenc.ValFloat, it.Seek(3)) + require.Equal(t, chunkenc.ValNone, it.Next()) + require.ErrorIs(t, it.Err(), histogram.ErrHistogramSpansBucketsMismatch) + + it = series.Iterator(it) + require.Equal(t, chunkenc.ValNone, it.Seek(4)) + require.ErrorIs(t, it.Err(), histogram.ErrHistogramSpansBucketsMismatch) +} + func TestConcreteSeriesIterator_ReducesHighResolutionHistograms(t *testing.T) { for _, schema := range []int32{9, 52} { t.Run(fmt.Sprintf("schema=%d", schema), func(t *testing.T) { diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 67c244167b..b95c85b6c4 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -697,19 +697,23 @@ func (app *remoteWriteAppender) Append(ref storage.SeriesRef, lset labels.Labels } func (app *remoteWriteAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + var err error if t > app.maxTime { return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds) } if h != nil && histogram.IsExponentialSchemaReserved(h.Schema) && h.Schema > histogram.ExponentialSchemaMax { - h = h.ReduceResolution(histogram.ExponentialSchemaMax) + if err = h.ReduceResolution(histogram.ExponentialSchemaMax); err != nil { + return 0, err + } } if fh != nil && histogram.IsExponentialSchemaReserved(fh.Schema) && fh.Schema > histogram.ExponentialSchemaMax { - fh = fh.ReduceResolution(histogram.ExponentialSchemaMax) + if err = fh.ReduceResolution(histogram.ExponentialSchemaMax); err != nil { + return 0, err + } } - ref, err := app.Appender.AppendHistogram(ref, l, t, h, fh) - if err != nil { + if ref, err = app.Appender.AppendHistogram(ref, l, t, h, fh); err != nil { return 0, err } return ref, nil diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index 8002dd0d4e..d960e835f2 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -884,7 +884,14 @@ func (it *floatHistogramIterator) AtFloatHistogram(fh *histogram.FloatHistogram) // chunk is from a newer Prometheus version that supports higher // resolution. fh = fh.Copy() - fh.ReduceResolution(histogram.ExponentialSchemaMax) + if err := fh.ReduceResolution(histogram.ExponentialSchemaMax); err != nil { + // With the checks above, this can only happen + // with invalid data in a chunk. As this is a + // rare edge case of a rare edge case, we'd + // rather not create all the plumbing to handle + // this error gracefully. + panic(err) + } } return it.t, fh } @@ -915,7 +922,13 @@ func (it *floatHistogramIterator) AtFloatHistogram(fh *histogram.FloatHistogram) // This is a very slow path, but it should only happen if the // chunk is from a newer Prometheus version that supports higher // resolution. - fh.ReduceResolution(histogram.ExponentialSchemaMax) + if err := fh.ReduceResolution(histogram.ExponentialSchemaMax); err != nil { + // With the checks above, this can only happen with + // invalid data in a chunk. As this is a rare edge case + // of a rare edge case, we'd rather not create all the + // plumbing to handle this error gracefully. + panic(err) + } } return it.t, fh diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index cc1d771235..be1c31ae76 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -939,7 +939,14 @@ func (it *histogramIterator) AtHistogram(h *histogram.Histogram) (int64, *histog // chunk is from a newer Prometheus version that supports higher // resolution. h = h.Copy() - h.ReduceResolution(histogram.ExponentialSchemaMax) + if err := h.ReduceResolution(histogram.ExponentialSchemaMax); err != nil { + // With the checks above, this can only happen + // with invalid data in a chunk. As this is a + // rare edge case of a rare edge case, we'd + // rather not create all the plumbing to handle + // this error gracefully. + panic(err) + } } return it.t, h } @@ -970,7 +977,13 @@ func (it *histogramIterator) AtHistogram(h *histogram.Histogram) (int64, *histog // This is a very slow path, but it should only happen if the // chunk is from a newer Prometheus version that supports higher // resolution. - h.ReduceResolution(histogram.ExponentialSchemaMax) + if err := h.ReduceResolution(histogram.ExponentialSchemaMax); err != nil { + // With the checks above, this can only happen with + // invalid data in a chunk. As this is a rare edge case + // of a rare edge case, we'd rather not create all the + // plumbing to handle this error gracefully. + panic(err) + } } return it.t, h @@ -1000,7 +1013,14 @@ func (it *histogramIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int // chunk is from a newer Prometheus version that supports higher // resolution. fh = fh.Copy() - fh.ReduceResolution(histogram.ExponentialSchemaMax) + if err := fh.ReduceResolution(histogram.ExponentialSchemaMax); err != nil { + // With the checks above, this can only happen + // with invalid data in a chunk. As this is a + // rare edge case of a rare edge case, we'd + // rather not create all the plumbing to handle + // this error gracefully. + panic(err) + } } return it.t, fh } @@ -1039,7 +1059,13 @@ func (it *histogramIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int // This is a very slow path, but it should only happen if the // chunk is from a newer Prometheus version that supports higher // resolution. - fh.ReduceResolution(histogram.ExponentialSchemaMax) + if err := fh.ReduceResolution(histogram.ExponentialSchemaMax); err != nil { + // With the checks above, this can only happen with + // invalid data in a chunk. As this is a rare edge case + // of a rare edge case, we'd rather not create all the + // plumbing to handle this error gracefully. + panic(err) + } } return it.t, fh diff --git a/tsdb/record/record.go b/tsdb/record/record.go index 561810a3a5..5791f60df4 100644 --- a/tsdb/record/record.go +++ b/tsdb/record/record.go @@ -475,7 +475,9 @@ func (d *Decoder) HistogramSamples(rec []byte, histograms []RefHistogramSample) // This is a very slow path, but it should only happen if the // record is from a newer Prometheus version that supports higher // resolution. - rh.H.ReduceResolution(histogram.ExponentialSchemaMax) + if err := rh.H.ReduceResolution(histogram.ExponentialSchemaMax); err != nil { + return nil, fmt.Errorf("error reducing resolution of histogram #%d: %w", len(histograms)+1, err) + } } histograms = append(histograms, rh) @@ -579,7 +581,9 @@ func (d *Decoder) FloatHistogramSamples(rec []byte, histograms []RefFloatHistogr // This is a very slow path, but it should only happen if the // record is from a newer Prometheus version that supports higher // resolution. - rh.FH.ReduceResolution(histogram.ExponentialSchemaMax) + if err := rh.FH.ReduceResolution(histogram.ExponentialSchemaMax); err != nil { + return nil, fmt.Errorf("error reducing resolution of histogram #%d: %w", len(histograms)+1, err) + } } histograms = append(histograms, rh)