diff --git a/storage/remote/otlptranslator/prometheusremotewrite/helper.go b/storage/remote/otlptranslator/prometheusremotewrite/helper.go index 730486772e..1d321218e7 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/helper.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/helper.go @@ -228,6 +228,9 @@ func (c *PrometheusConverter) addHistogramDataPoints( } pt := dataPoints.At(x) + // Clear stale exemplars from the previous data point to prevent + // them from leaking into _sum and _count of this data point. + appOpts.Exemplars = nil timestamp := convertTimeStamp(pt.Timestamp()) startTimestamp := convertTimeStamp(pt.StartTimestamp()) baseLabels, err := c.createAttributes(pt.Attributes(), settings, reservedLabelNames, false, appOpts.Metadata) diff --git a/storage/remote/otlptranslator/prometheusremotewrite/helper_test.go b/storage/remote/otlptranslator/prometheusremotewrite/helper_test.go index c3fecc813b..f4f5283164 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/helper_test.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/helper_test.go @@ -30,6 +30,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/prompb" @@ -955,6 +956,121 @@ func TestPrometheusConverter_AddHistogramDataPoints(t *testing.T) { } } +// TestAddHistogramDataPoints_ExemplarLeakAcrossDataPoints verifies that +// exemplars from a previous data point don't leak into _sum/_count of the +// next data point. Regression test for stale exemplar leak. +func TestAddHistogramDataPoints_ExemplarLeakAcrossDataPoints(t *testing.T) { + ts := pcommon.Timestamp(time.Now().UnixNano()) + exTs := pcommon.Timestamp(time.Now().Add(time.Second).UnixNano()) + + metric := pmetric.NewMetric() + metric.SetName("test_hist") + metric.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + + // First data point: has buckets and an exemplar with value 200 (> bound 100, so falls into +Inf). + pt1 := metric.Histogram().DataPoints().AppendEmpty() + pt1.SetTimestamp(ts) + pt1.SetStartTimestamp(ts) + pt1.SetSum(42) + pt1.SetCount(10) + pt1.ExplicitBounds().FromRaw([]float64{100}) + pt1.BucketCounts().FromRaw([]uint64{7, 3}) + + ex := pt1.Exemplars().AppendEmpty() + ex.SetTimestamp(exTs) + ex.SetDoubleValue(200) // > 100, so falls into the +Inf bucket. + + // Second data point: no exemplars. + pt2 := metric.Histogram().DataPoints().AppendEmpty() + pt2.SetTimestamp(ts) + pt2.SetStartTimestamp(ts) + pt2.SetSum(84) + pt2.SetCount(20) + pt2.ExplicitBounds().FromRaw([]float64{100}) + pt2.BucketCounts().FromRaw([]uint64{14, 6}) + + appTest := teststorage.NewAppendable() + app := appTest.AppenderV2(t.Context()) + converter := NewPrometheusConverter(app) + settings := Settings{} + resource := pcommon.NewResource() + + require.NoError(t, converter.setResourceContext(resource, settings)) + require.NoError(t, converter.setScopeContext(scope{}, settings)) + require.NoError(t, converter.addHistogramDataPoints( + context.Background(), + metric.Histogram().DataPoints(), + settings, + storage.AOptions{ + MetricFamilyName: metric.Name(), + }, + )) + require.NoError(t, app.Commit()) + + exConverted := exemplar.Exemplar{ + Value: 200, + Ts: convertTimeStamp(exTs), + HasTs: true, + } + tsMs := convertTimeStamp(ts) + + want := []sample{ + // -- First data point -- + // _sum: no exemplars. + { + MF: "test_hist", + L: labels.FromStrings(model.MetricNameLabel, "test_hist_sum"), + T: tsMs, ST: tsMs, V: 42, + }, + // _count: no exemplars. + { + MF: "test_hist", + L: labels.FromStrings(model.MetricNameLabel, "test_hist_count"), + T: tsMs, ST: tsMs, V: 10, + }, + // le=100 bucket: no exemplars (exemplar value 200 > 100). + { + MF: "test_hist", + L: labels.FromStrings(model.MetricNameLabel, "test_hist_bucket", model.BucketLabel, "100"), + T: tsMs, ST: tsMs, V: 7, + }, + // le=+Inf bucket: gets the exemplar. + { + MF: "test_hist", + L: labels.FromStrings(model.MetricNameLabel, "test_hist_bucket", model.BucketLabel, "+Inf"), + T: tsMs, ST: tsMs, V: 10, + ES: []exemplar.Exemplar{exConverted}, + }, + // -- Second data point -- + // _sum: NO exemplars (this is the regression check). + { + MF: "test_hist", + L: labels.FromStrings(model.MetricNameLabel, "test_hist_sum"), + T: tsMs, ST: tsMs, V: 84, + }, + // _count: NO exemplars (this is the regression check). + { + MF: "test_hist", + L: labels.FromStrings(model.MetricNameLabel, "test_hist_count"), + T: tsMs, ST: tsMs, V: 20, + }, + // le=100 bucket: no exemplars. + { + MF: "test_hist", + L: labels.FromStrings(model.MetricNameLabel, "test_hist_bucket", model.BucketLabel, "100"), + T: tsMs, ST: tsMs, V: 14, + }, + // le=+Inf bucket: no exemplars. + { + MF: "test_hist", + L: labels.FromStrings(model.MetricNameLabel, "test_hist_bucket", model.BucketLabel, "+Inf"), + T: tsMs, ST: tsMs, V: 20, + }, + } + + teststorage.RequireEqual(t, want, appTest.ResultSamples()) +} + func TestGetPromExemplars(t *testing.T) { ctx := context.Background() c := NewPrometheusConverter(teststorage.NewAppendable().AppenderV2(t.Context()))