diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index ed7aa52c8a..f6b7611e8e 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -292,6 +292,7 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error { logger.Info("Enabling native ingestion of delta OTLP metrics, storing the raw sample values without conversion. WARNING: Delta support is in an early stage of development. The ingestion and querying process is likely to change over time.") case "type-and-unit-labels": c.scrape.EnableTypeAndUnitLabels = true + c.web.EnableTypeAndUnitLabels = true logger.Info("Experimental type and unit labels enabled") case "use-uncached-io": c.tsdb.UseUncachedIO = true diff --git a/storage/remote/otlptranslator/prometheusremotewrite/helper.go b/storage/remote/otlptranslator/prometheusremotewrite/helper.go index ae6bb63dfb..4cbdceab6d 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/helper.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/helper.go @@ -25,6 +25,7 @@ import ( "slices" "sort" "strconv" + "strings" "time" "unicode/utf8" @@ -118,7 +119,7 @@ var seps = []byte{'\xff'} // if logOnOverwrite is true, the overwrite is logged. Resulting label names are sanitized. // If settings.PromoteResourceAttributes is not empty, it's a set of resource attributes that should be promoted to labels. func createAttributes(resource pcommon.Resource, attributes pcommon.Map, scope scope, settings Settings, - ignoreAttrs []string, logOnOverwrite bool, extras ...string, + ignoreAttrs []string, logOnOverwrite bool, metadata prompb.MetricMetadata, extras ...string, ) []prompb.Label { resourceAttrs := resource.Attributes() serviceName, haveServiceName := resourceAttrs.Get(conventions.AttributeServiceName) @@ -142,6 +143,9 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, scope s if haveInstanceID { maxLabelCount++ } + if settings.EnableTypeAndUnitLabels { + maxLabelCount += 2 + } // Ensure attributes are sorted by key for consistent merging of keys which // collide when sanitized. @@ -186,6 +190,16 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, scope s l["otel_scope_schema_url"] = scope.schemaURL } + if settings.EnableTypeAndUnitLabels { + unitNamer := otlptranslator.UnitNamer{UTF8Allowed: settings.AllowUTF8} + if metadata.Type != prompb.MetricMetadata_UNKNOWN { + l["__type__"] = strings.ToLower(metadata.Type.String()) + } + if metadata.Unit != "" { + l["__unit__"] = unitNamer.Build(metadata.Unit) + } + } + // Map service.name + service.namespace to job. if haveServiceName { val := serviceName.AsString() @@ -255,7 +269,7 @@ func aggregationTemporality(metric pmetric.Metric) (pmetric.AggregationTemporali // However, work is under way to resolve this shortcoming through a feature called native histograms custom buckets: // https://github.com/prometheus/prometheus/issues/13485. func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPoints pmetric.HistogramDataPointSlice, - resource pcommon.Resource, settings Settings, baseName string, scope scope, + resource pcommon.Resource, settings Settings, metadata prompb.MetricMetadata, scope scope, ) error { for x := 0; x < dataPoints.Len(); x++ { if err := c.everyN.checkContext(ctx); err != nil { @@ -264,7 +278,7 @@ func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPo pt := dataPoints.At(x) timestamp := convertTimeStamp(pt.Timestamp()) - baseLabels := createAttributes(resource, pt.Attributes(), scope, settings, nil, false) + baseLabels := createAttributes(resource, pt.Attributes(), scope, settings, nil, false, metadata) // If the sum is unset, it indicates the _sum metric point should be // omitted @@ -278,7 +292,7 @@ func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPo sum.Value = math.Float64frombits(value.StaleNaN) } - sumlabels := createLabels(baseName+sumStr, baseLabels) + sumlabels := createLabels(metadata.MetricFamilyName+sumStr, baseLabels) c.addSample(sum, sumlabels) } @@ -291,7 +305,7 @@ func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPo count.Value = math.Float64frombits(value.StaleNaN) } - countlabels := createLabels(baseName+countStr, baseLabels) + countlabels := createLabels(metadata.MetricFamilyName+countStr, baseLabels) c.addSample(count, countlabels) // cumulative count for conversion to cumulative histogram @@ -315,7 +329,7 @@ func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPo bucket.Value = math.Float64frombits(value.StaleNaN) } boundStr := strconv.FormatFloat(bound, 'f', -1, 64) - labels := createLabels(baseName+bucketStr, baseLabels, leStr, boundStr) + labels := createLabels(metadata.MetricFamilyName+bucketStr, baseLabels, leStr, boundStr) ts := c.addSample(bucket, labels) bucketBounds = append(bucketBounds, bucketBoundsData{ts: ts, bound: bound}) @@ -329,7 +343,7 @@ func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPo } else { infBucket.Value = float64(pt.Count()) } - infLabels := createLabels(baseName+bucketStr, baseLabels, leStr, pInfStr) + infLabels := createLabels(metadata.MetricFamilyName+bucketStr, baseLabels, leStr, pInfStr) ts := c.addSample(infBucket, infLabels) bucketBounds = append(bucketBounds, bucketBoundsData{ts: ts, bound: math.Inf(1)}) @@ -339,7 +353,7 @@ func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPo startTimestamp := pt.StartTimestamp() if settings.ExportCreatedMetric && startTimestamp != 0 { - labels := createLabels(baseName+createdSuffix, baseLabels) + labels := createLabels(metadata.MetricFamilyName+createdSuffix, baseLabels) c.addTimeSeriesIfNeeded(labels, startTimestamp, pt.Timestamp()) } } @@ -465,7 +479,7 @@ func findMinAndMaxTimestamps(metric pmetric.Metric, minTimestamp, maxTimestamp p } func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoints pmetric.SummaryDataPointSlice, resource pcommon.Resource, - settings Settings, baseName string, scope scope, + settings Settings, metadata prompb.MetricMetadata, scope scope, ) error { for x := 0; x < dataPoints.Len(); x++ { if err := c.everyN.checkContext(ctx); err != nil { @@ -474,7 +488,7 @@ func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoin pt := dataPoints.At(x) timestamp := convertTimeStamp(pt.Timestamp()) - baseLabels := createAttributes(resource, pt.Attributes(), scope, settings, nil, false) + baseLabels := createAttributes(resource, pt.Attributes(), scope, settings, nil, false, metadata) // treat sum as a sample in an individual TimeSeries sum := &prompb.Sample{ @@ -485,7 +499,7 @@ func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoin sum.Value = math.Float64frombits(value.StaleNaN) } // sum and count of the summary should append suffix to baseName - sumlabels := createLabels(baseName+sumStr, baseLabels) + sumlabels := createLabels(metadata.MetricFamilyName+sumStr, baseLabels) c.addSample(sum, sumlabels) // treat count as a sample in an individual TimeSeries @@ -496,7 +510,7 @@ func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoin if pt.Flags().NoRecordedValue() { count.Value = math.Float64frombits(value.StaleNaN) } - countlabels := createLabels(baseName+countStr, baseLabels) + countlabels := createLabels(metadata.MetricFamilyName+countStr, baseLabels) c.addSample(count, countlabels) // process each percentile/quantile @@ -510,13 +524,13 @@ func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoin quantile.Value = math.Float64frombits(value.StaleNaN) } percentileStr := strconv.FormatFloat(qt.Quantile(), 'f', -1, 64) - qtlabels := createLabels(baseName, baseLabels, quantileStr, percentileStr) + qtlabels := createLabels(metadata.MetricFamilyName, baseLabels, quantileStr, percentileStr) c.addSample(quantile, qtlabels) } startTimestamp := pt.StartTimestamp() if settings.ExportCreatedMetric && startTimestamp != 0 { - createdLabels := createLabels(baseName+createdSuffix, baseLabels) + createdLabels := createLabels(metadata.MetricFamilyName+createdSuffix, baseLabels) c.addTimeSeriesIfNeeded(createdLabels, startTimestamp, pt.Timestamp()) } } @@ -542,6 +556,20 @@ func createLabels(name string, baseLabels []prompb.Label, extras ...string) []pr return labels } +// addTypeAndUnitLabels appends type and unit labels to the given labels slice. +func addTypeAndUnitLabels(labels []prompb.Label, metadata prompb.MetricMetadata, settings Settings) []prompb.Label { + unitNamer := otlptranslator.UnitNamer{UTF8Allowed: settings.AllowUTF8} + + labels = slices.DeleteFunc(labels, func(l prompb.Label) bool { + return l.Name == "__type__" || l.Name == "__unit__" + }) + + labels = append(labels, prompb.Label{Name: "__type__", Value: strings.ToLower(metadata.Type.String())}) + labels = append(labels, prompb.Label{Name: "__unit__", Value: unitNamer.Build(metadata.Unit)}) + + return labels +} + // getOrCreateTimeSeries returns the time series corresponding to the label set if existent, and false. // Otherwise it creates a new one and returns that, and true. func (c *PrometheusConverter) getOrCreateTimeSeries(lbls []prompb.Label) (*prompb.TimeSeries, bool) { @@ -627,7 +655,7 @@ func addResourceTargetInfo(resource pcommon.Resource, settings Settings, earlies // Do not pass identifying attributes as ignoreAttrs below. identifyingAttrs = nil } - labels := createAttributes(resource, attributes, scope{}, settings, identifyingAttrs, false, model.MetricNameLabel, name) + labels := createAttributes(resource, attributes, scope{}, settings, identifyingAttrs, false, prompb.MetricMetadata{}, model.MetricNameLabel, name) haveIdentifier := false for _, l := range labels { if l.Name == model.JobLabel || l.Name == model.InstanceLabel { diff --git a/storage/remote/otlptranslator/prometheusremotewrite/helper_test.go b/storage/remote/otlptranslator/prometheusremotewrite/helper_test.go index abf455937d..b2e491377f 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/helper_test.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/helper_test.go @@ -531,7 +531,7 @@ func TestCreateAttributes(t *testing.T) { }), PromoteScopeMetadata: tc.promoteScope, } - lbls := createAttributes(resource, attrs, tc.scope, settings, tc.ignoreAttrs, false, model.MetricNameLabel, "test_metric") + lbls := createAttributes(resource, attrs, tc.scope, settings, tc.ignoreAttrs, false, prompb.MetricMetadata{}, model.MetricNameLabel, "test_metric") require.ElementsMatch(t, lbls, tc.expectedLabels) }) @@ -746,7 +746,7 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) { ExportCreatedMetric: true, PromoteScopeMetadata: tt.promoteScope, }, - metric.Name(), + prompb.MetricMetadata{MetricFamilyName: metric.Name()}, tt.scope, ) @@ -944,7 +944,7 @@ func TestPrometheusConverter_AddHistogramDataPoints(t *testing.T) { ExportCreatedMetric: true, PromoteScopeMetadata: tt.promoteScope, }, - metric.Name(), + prompb.MetricMetadata{MetricFamilyName: metric.Name()}, tt.scope, ) @@ -988,3 +988,58 @@ func TestGetPromExemplars(t *testing.T) { require.Error(t, err) }) } + +func TestAddTypeAndUnitLabels(t *testing.T) { + testCases := []struct { + name string + inputLabels []prompb.Label + metadata prompb.MetricMetadata + expectedLabels []prompb.Label + }{ + { + name: "overwrites existing type and unit labels and preserves other labels", + inputLabels: []prompb.Label{ + {Name: "job", Value: "test-job"}, + {Name: "__type__", Value: "old_type"}, + {Name: "instance", Value: "test-instance"}, + {Name: "__unit__", Value: "old_unit"}, + {Name: "custom_label", Value: "custom_value"}, + }, + metadata: prompb.MetricMetadata{ + Type: prompb.MetricMetadata_COUNTER, + Unit: "seconds", + }, + expectedLabels: []prompb.Label{ + {Name: "job", Value: "test-job"}, + {Name: "instance", Value: "test-instance"}, + {Name: "custom_label", Value: "custom_value"}, + {Name: "__type__", Value: "counter"}, + {Name: "__unit__", Value: "seconds"}, + }, + }, + { + name: "adds type and unit labels when missing", + inputLabels: []prompb.Label{ + {Name: "job", Value: "test-job"}, + {Name: "instance", Value: "test-instance"}, + }, + metadata: prompb.MetricMetadata{ + Type: prompb.MetricMetadata_GAUGE, + Unit: "bytes", + }, + expectedLabels: []prompb.Label{ + {Name: "job", Value: "test-job"}, + {Name: "instance", Value: "test-instance"}, + {Name: "__type__", Value: "gauge"}, + {Name: "__unit__", Value: "bytes"}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := addTypeAndUnitLabels(tc.inputLabels, tc.metadata, Settings{AllowUTF8: false}) + require.ElementsMatch(t, tc.expectedLabels, result) + }) + } +} diff --git a/storage/remote/otlptranslator/prometheusremotewrite/histograms.go b/storage/remote/otlptranslator/prometheusremotewrite/histograms.go index 855e122213..421526926e 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/histograms.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/histograms.go @@ -36,8 +36,7 @@ const defaultZeroThreshold = 1e-128 // addExponentialHistogramDataPoints adds OTel exponential histogram data points to the corresponding time series // as native histogram samples. func (c *PrometheusConverter) addExponentialHistogramDataPoints(ctx context.Context, dataPoints pmetric.ExponentialHistogramDataPointSlice, - resource pcommon.Resource, settings Settings, promName string, temporality pmetric.AggregationTemporality, - scope scope, + resource pcommon.Resource, settings Settings, metadata prompb.MetricMetadata, temporality pmetric.AggregationTemporality, scope scope, ) (annotations.Annotations, error) { var annots annotations.Annotations for x := 0; x < dataPoints.Len(); x++ { @@ -60,9 +59,11 @@ func (c *PrometheusConverter) addExponentialHistogramDataPoints(ctx context.Cont settings, nil, true, + metadata, model.MetricNameLabel, - promName, + metadata.MetricFamilyName, ) + ts, _ := c.getOrCreateTimeSeries(lbls) ts.Histograms = append(ts.Histograms, histogram) @@ -253,8 +254,7 @@ func convertBucketsLayout(bucketCounts []uint64, offset, scaleDown int32, adjust } func (c *PrometheusConverter) addCustomBucketsHistogramDataPoints(ctx context.Context, dataPoints pmetric.HistogramDataPointSlice, - resource pcommon.Resource, settings Settings, promName string, temporality pmetric.AggregationTemporality, - scope scope, + resource pcommon.Resource, settings Settings, metadata prompb.MetricMetadata, temporality pmetric.AggregationTemporality, scope scope, ) (annotations.Annotations, error) { var annots annotations.Annotations @@ -278,8 +278,9 @@ func (c *PrometheusConverter) addCustomBucketsHistogramDataPoints(ctx context.Co settings, nil, true, + metadata, model.MetricNameLabel, - promName, + metadata.MetricFamilyName, ) ts, _ := c.getOrCreateTimeSeries(lbls) diff --git a/storage/remote/otlptranslator/prometheusremotewrite/histograms_test.go b/storage/remote/otlptranslator/prometheusremotewrite/histograms_test.go index 3fc6320d4e..eec7e95240 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/histograms_test.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/histograms_test.go @@ -855,7 +855,7 @@ func TestPrometheusConverter_addExponentialHistogramDataPoints(t *testing.T) { ExportCreatedMetric: true, PromoteScopeMetadata: tt.promoteScope, }, - namer.Build(TranslatorMetricFromOtelMetric(metric)), + prompb.MetricMetadata{MetricFamilyName: namer.Build(TranslatorMetricFromOtelMetric(metric))}, pmetric.AggregationTemporalityCumulative, tt.scope, ) @@ -1312,7 +1312,7 @@ func TestPrometheusConverter_addCustomBucketsHistogramDataPoints(t *testing.T) { ConvertHistogramsToNHCB: true, PromoteScopeMetadata: tt.promoteScope, }, - namer.Build(TranslatorMetricFromOtelMetric(metric)), + prompb.MetricMetadata{MetricFamilyName: namer.Build(TranslatorMetricFromOtelMetric(metric))}, pmetric.AggregationTemporalityCumulative, tt.scope, ) diff --git a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go index 7358cc0820..6488e1e7d1 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go @@ -53,7 +53,8 @@ type Settings struct { // LookbackDelta is the PromQL engine lookback delta. LookbackDelta time.Duration // PromoteScopeMetadata controls whether to promote OTel scope metadata to metric labels. - PromoteScopeMetadata bool + PromoteScopeMetadata bool + EnableTypeAndUnitLabels bool } // PrometheusConverter converts from OTel write format to Prometheus remote write format. @@ -170,13 +171,13 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric continue } - promName := namer.Build(TranslatorMetricFromOtelMetric(metric)) - c.metadata = append(c.metadata, prompb.MetricMetadata{ + metadata := prompb.MetricMetadata{ Type: otelMetricTypeToPromMetricType(metric), - MetricFamilyName: promName, + MetricFamilyName: namer.Build(TranslatorMetricFromOtelMetric(metric)), Help: metric.Description(), Unit: metric.Unit(), - }) + } + c.metadata = append(c.metadata, metadata) // handle individual metrics based on type //exhaustive:enforce @@ -187,7 +188,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) break } - if err := c.addGaugeNumberDataPoints(ctx, dataPoints, resource, settings, promName, scope); err != nil { + if err := c.addGaugeNumberDataPoints(ctx, dataPoints, resource, settings, metadata, scope); err != nil { errs = multierr.Append(errs, err) if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return @@ -199,7 +200,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) break } - if err := c.addSumNumberDataPoints(ctx, dataPoints, resource, metric, settings, promName, scope); err != nil { + if err := c.addSumNumberDataPoints(ctx, dataPoints, resource, metric, settings, metadata, scope); err != nil { errs = multierr.Append(errs, err) if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return @@ -213,7 +214,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric } if settings.ConvertHistogramsToNHCB { ws, err := c.addCustomBucketsHistogramDataPoints( - ctx, dataPoints, resource, settings, promName, temporality, scope, + ctx, dataPoints, resource, settings, metadata, temporality, scope, ) annots.Merge(ws) if err != nil { @@ -223,7 +224,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric } } } else { - if err := c.addHistogramDataPoints(ctx, dataPoints, resource, settings, promName, scope); err != nil { + if err := c.addHistogramDataPoints(ctx, dataPoints, resource, settings, metadata, scope); err != nil { errs = multierr.Append(errs, err) if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return @@ -241,7 +242,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric dataPoints, resource, settings, - promName, + metadata, temporality, scope, ) @@ -258,7 +259,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) break } - if err := c.addSummaryDataPoints(ctx, dataPoints, resource, settings, promName, scope); err != nil { + if err := c.addSummaryDataPoints(ctx, dataPoints, resource, settings, metadata, scope); err != nil { errs = multierr.Append(errs, err) if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return diff --git a/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go b/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go index df25e17be0..68a28c0eca 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go @@ -29,7 +29,7 @@ import ( ) func (c *PrometheusConverter) addGaugeNumberDataPoints(ctx context.Context, dataPoints pmetric.NumberDataPointSlice, - resource pcommon.Resource, settings Settings, name string, scope scope, + resource pcommon.Resource, settings Settings, metadata prompb.MetricMetadata, scope scope, ) error { for x := 0; x < dataPoints.Len(); x++ { if err := c.everyN.checkContext(ctx); err != nil { @@ -44,8 +44,9 @@ func (c *PrometheusConverter) addGaugeNumberDataPoints(ctx context.Context, data settings, nil, true, + metadata, model.MetricNameLabel, - name, + metadata.MetricFamilyName, ) sample := &prompb.Sample{ // convert ns to ms @@ -60,6 +61,7 @@ func (c *PrometheusConverter) addGaugeNumberDataPoints(ctx context.Context, data if pt.Flags().NoRecordedValue() { sample.Value = math.Float64frombits(value.StaleNaN) } + c.addSample(sample, labels) } @@ -67,7 +69,7 @@ func (c *PrometheusConverter) addGaugeNumberDataPoints(ctx context.Context, data } func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPoints pmetric.NumberDataPointSlice, - resource pcommon.Resource, metric pmetric.Metric, settings Settings, name string, scope scope, + resource pcommon.Resource, metric pmetric.Metric, settings Settings, metadata prompb.MetricMetadata, scope scope, ) error { for x := 0; x < dataPoints.Len(); x++ { if err := c.everyN.checkContext(ctx); err != nil { @@ -82,8 +84,9 @@ func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPo settings, nil, true, + metadata, model.MetricNameLabel, - name, + metadata.MetricFamilyName, ) sample := &prompb.Sample{ // convert ns to ms @@ -98,6 +101,7 @@ func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPo if pt.Flags().NoRecordedValue() { sample.Value = math.Float64frombits(value.StaleNaN) } + ts := c.addSample(sample, lbls) if ts != nil { exemplars, err := getPromExemplars[pmetric.NumberDataPoint](ctx, &c.everyN, pt) @@ -118,7 +122,7 @@ func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPo copy(createdLabels, lbls) for i, l := range createdLabels { if l.Name == model.MetricNameLabel { - createdLabels[i].Value = name + createdSuffix + createdLabels[i].Value = metadata.MetricFamilyName + createdSuffix break } } diff --git a/storage/remote/otlptranslator/prometheusremotewrite/number_data_points_test.go b/storage/remote/otlptranslator/prometheusremotewrite/number_data_points_test.go index dfe0a810c8..b166700b69 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/number_data_points_test.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/number_data_points_test.go @@ -124,7 +124,7 @@ func TestPrometheusConverter_addGaugeNumberDataPoints(t *testing.T) { ExportCreatedMetric: true, PromoteScopeMetadata: tt.promoteScope, }, - metric.Name(), + prompb.MetricMetadata{MetricFamilyName: metric.Name()}, tt.scope, ) @@ -362,7 +362,7 @@ func TestPrometheusConverter_addSumNumberDataPoints(t *testing.T) { ExportCreatedMetric: true, PromoteScopeMetadata: tt.promoteScope, }, - metric.Name(), + prompb.MetricMetadata{MetricFamilyName: metric.Name()}, tt.scope, ) diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 29ba1b1bd9..81e2681088 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -533,6 +533,8 @@ type OTLPOptions struct { // LookbackDelta is the query lookback delta. // Used to calculate the target_info sample timestamp interval. LookbackDelta time.Duration + // Add type and unit labels to the metrics. + EnableTypeAndUnitLabels bool } // NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and @@ -548,9 +550,10 @@ func NewOTLPWriteHandler(logger *slog.Logger, _ prometheus.Registerer, appendabl logger: logger, appendable: appendable, }, - config: configFunc, - allowDeltaTemporality: opts.NativeDelta, - lookbackDelta: opts.LookbackDelta, + config: configFunc, + allowDeltaTemporality: opts.NativeDelta, + lookbackDelta: opts.LookbackDelta, + enableTypeAndUnitLabels: opts.EnableTypeAndUnitLabels, } wh := &otlpWriteHandler{logger: logger, defaultConsumer: ex} @@ -585,9 +588,10 @@ func NewOTLPWriteHandler(logger *slog.Logger, _ prometheus.Registerer, appendabl type rwExporter struct { *writeHandler - config func() config.Config - allowDeltaTemporality bool - lookbackDelta time.Duration + config func() config.Config + allowDeltaTemporality bool + lookbackDelta time.Duration + enableTypeAndUnitLabels bool } func (rw *rwExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { @@ -601,9 +605,10 @@ func (rw *rwExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) er PromoteResourceAttributes: otlptranslator.NewPromoteResourceAttributes(otlpCfg), KeepIdentifyingResourceAttributes: otlpCfg.KeepIdentifyingResourceAttributes, ConvertHistogramsToNHCB: otlpCfg.ConvertHistogramsToNHCB, + PromoteScopeMetadata: otlpCfg.PromoteScopeMetadata, AllowDeltaTemporality: rw.allowDeltaTemporality, LookbackDelta: rw.lookbackDelta, - PromoteScopeMetadata: otlpCfg.PromoteScopeMetadata, + EnableTypeAndUnitLabels: rw.enableTypeAndUnitLabels, }) if err != nil { rw.logger.Warn("Error translating OTLP metrics to Prometheus write request", "err", err) diff --git a/storage/remote/write_test.go b/storage/remote/write_test.go index e6049fa8bb..cfe19345f6 100644 --- a/storage/remote/write_test.go +++ b/storage/remote/write_test.go @@ -384,12 +384,13 @@ func TestOTLPWriteHandler(t *testing.T) { timestamp := time.Now() exportRequest := generateOTLPWriteRequest(timestamp) for _, testCase := range []struct { - name string - otlpCfg config.OTLPConfig - expectedSamples []mockSample + name string + otlpCfg config.OTLPConfig + typeAndUnitLabels bool + expectedSamples []mockSample }{ { - name: "NoTranslation", + name: "NoTranslation/NoTypeAndUnitLabels", otlpCfg: config.OTLPConfig{ TranslationStrategy: config.NoTranslation, }, @@ -415,13 +416,42 @@ func TestOTLPWriteHandler(t *testing.T) { }, }, { - name: "UnderscoreEscapingWithSuffixes", + name: "NoTranslation/WithTypeAndUnitLabels", + otlpCfg: config.OTLPConfig{ + TranslationStrategy: config.NoTranslation, + }, + typeAndUnitLabels: true, + expectedSamples: []mockSample{ + { + l: labels.New(labels.Label{Name: "__name__", Value: "test.counter"}, + labels.Label{Name: "__type__", Value: "counter"}, + labels.Label{Name: "__unit__", Value: "bytes"}, + labels.Label{Name: "foo.bar", Value: "baz"}, + labels.Label{Name: "instance", Value: "test-instance"}, + labels.Label{Name: "job", Value: "test-service"}), + t: timestamp.UnixMilli(), + v: 10.0, + }, + { + l: labels.New( + labels.Label{Name: "__name__", Value: "target_info"}, + labels.Label{Name: "host.name", Value: "test-host"}, + labels.Label{Name: "instance", Value: "test-instance"}, + labels.Label{Name: "job", Value: "test-service"}, + ), + t: timestamp.UnixMilli(), + v: 1, + }, + }, + }, + { + name: "UnderscoreEscapingWithSuffixes/NoTypeAndUnitLabels", otlpCfg: config.OTLPConfig{ TranslationStrategy: config.UnderscoreEscapingWithSuffixes, }, expectedSamples: []mockSample{ { - l: labels.New(labels.Label{Name: "__name__", Value: "test_counter_total"}, + l: labels.New(labels.Label{Name: "__name__", Value: "test_counter_bytes_total"}, labels.Label{Name: "foo_bar", Value: "baz"}, labels.Label{Name: "instance", Value: "test-instance"}, labels.Label{Name: "job", Value: "test-service"}), @@ -467,13 +497,71 @@ func TestOTLPWriteHandler(t *testing.T) { }, }, { - name: "NoUTF8EscapingWithSuffixes", + name: "UnderscoreEscapingWithSuffixes/WithTypeAndUnitLabels", + otlpCfg: config.OTLPConfig{ + TranslationStrategy: config.UnderscoreEscapingWithSuffixes, + }, + typeAndUnitLabels: true, + expectedSamples: []mockSample{ + { + l: labels.New(labels.Label{Name: "__name__", Value: "test_counter_bytes_total"}, + labels.Label{Name: "__type__", Value: "counter"}, + labels.Label{Name: "__unit__", Value: "bytes"}, + labels.Label{Name: "foo_bar", Value: "baz"}, + labels.Label{Name: "instance", Value: "test-instance"}, + labels.Label{Name: "job", Value: "test-service"}), + t: timestamp.UnixMilli(), + v: 10.0, + }, + { + l: labels.New( + labels.Label{Name: "__name__", Value: "target_info"}, + labels.Label{Name: "host_name", Value: "test-host"}, + labels.Label{Name: "instance", Value: "test-instance"}, + labels.Label{Name: "job", Value: "test-service"}, + ), + t: timestamp.UnixMilli(), + v: 1, + }, + }, + }, + { + name: "NoUTF8EscapingWithSuffixes/NoTypeAndUnitLabels", otlpCfg: config.OTLPConfig{ TranslationStrategy: config.NoUTF8EscapingWithSuffixes, }, expectedSamples: []mockSample{ { - l: labels.New(labels.Label{Name: "__name__", Value: "test.counter_total"}, + l: labels.New(labels.Label{Name: "__name__", Value: "test.counter_bytes_total"}, + labels.Label{Name: "foo.bar", Value: "baz"}, + labels.Label{Name: "instance", Value: "test-instance"}, + labels.Label{Name: "job", Value: "test-service"}), + t: timestamp.UnixMilli(), + v: 10.0, + }, + { + l: labels.New( + labels.Label{Name: "__name__", Value: "target_info"}, + labels.Label{Name: "host.name", Value: "test-host"}, + labels.Label{Name: "instance", Value: "test-instance"}, + labels.Label{Name: "job", Value: "test-service"}, + ), + t: timestamp.UnixMilli(), + v: 1, + }, + }, + }, + { + name: "NoUTF8EscapingWithSuffixes/WithTypeAndUnitLabels", + otlpCfg: config.OTLPConfig{ + TranslationStrategy: config.NoUTF8EscapingWithSuffixes, + }, + typeAndUnitLabels: true, + expectedSamples: []mockSample{ + { + l: labels.New(labels.Label{Name: "__name__", Value: "test.counter_bytes_total"}, + labels.Label{Name: "__type__", Value: "counter"}, + labels.Label{Name: "__unit__", Value: "bytes"}, labels.Label{Name: "foo.bar", Value: "baz"}, labels.Label{Name: "instance", Value: "test-instance"}, labels.Label{Name: "job", Value: "test-service"}), @@ -494,7 +582,7 @@ func TestOTLPWriteHandler(t *testing.T) { }, } { t.Run(testCase.name, func(t *testing.T) { - appendable := handleOTLP(t, exportRequest, testCase.otlpCfg) + appendable := handleOTLP(t, exportRequest, testCase.otlpCfg, testCase.typeAndUnitLabels) for _, sample := range testCase.expectedSamples { requireContainsSample(t, appendable.samples, sample) } @@ -518,7 +606,7 @@ func requireContainsSample(t *testing.T, actual []mockSample, expected mockSampl "actual : %v", expected, actual)) } -func handleOTLP(t *testing.T, exportRequest pmetricotlp.ExportRequest, otlpCfg config.OTLPConfig) *mockAppendable { +func handleOTLP(t *testing.T, exportRequest pmetricotlp.ExportRequest, otlpCfg config.OTLPConfig, typeAndUnitLabels bool) *mockAppendable { buf, err := exportRequest.MarshalProto() require.NoError(t, err) @@ -531,7 +619,7 @@ func handleOTLP(t *testing.T, exportRequest pmetricotlp.ExportRequest, otlpCfg c return config.Config{ OTLPConfig: otlpCfg, } - }, OTLPOptions{}) + }, OTLPOptions{EnableTypeAndUnitLabels: typeAndUnitLabels}) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -559,6 +647,7 @@ func generateOTLPWriteRequest(timestamp time.Time) pmetricotlp.ExportRequest { counterMetric := scopeMetric.Metrics().AppendEmpty() counterMetric.SetName("test.counter") counterMetric.SetDescription("test-counter-description") + counterMetric.SetUnit("By") counterMetric.SetEmptySum() counterMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) counterMetric.Sum().SetIsMonotonic(true) @@ -579,6 +668,7 @@ func generateOTLPWriteRequest(timestamp time.Time) pmetricotlp.ExportRequest { gaugeMetric := scopeMetric.Metrics().AppendEmpty() gaugeMetric.SetName("test.gauge") gaugeMetric.SetDescription("test-gauge-description") + gaugeMetric.SetUnit("By") gaugeMetric.SetEmptyGauge() gaugeDataPoint := gaugeMetric.Gauge().DataPoints().AppendEmpty() @@ -590,6 +680,7 @@ func generateOTLPWriteRequest(timestamp time.Time) pmetricotlp.ExportRequest { histogramMetric := scopeMetric.Metrics().AppendEmpty() histogramMetric.SetName("test.histogram") histogramMetric.SetDescription("test-histogram-description") + histogramMetric.SetUnit("By") histogramMetric.SetEmptyHistogram() histogramMetric.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) @@ -605,6 +696,7 @@ func generateOTLPWriteRequest(timestamp time.Time) pmetricotlp.ExportRequest { exponentialHistogramMetric := scopeMetric.Metrics().AppendEmpty() exponentialHistogramMetric.SetName("test.exponential.histogram") exponentialHistogramMetric.SetDescription("test-exponential-histogram-description") + exponentialHistogramMetric.SetUnit("By") exponentialHistogramMetric.SetEmptyExponentialHistogram() exponentialHistogramMetric.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 5002fad27e..b48212b2fd 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -265,6 +265,7 @@ func NewAPI( otlpEnabled, otlpDeltaToCumulative, otlpNativeDeltaIngestion bool, ctZeroIngestionEnabled bool, lookbackDelta time.Duration, + enableTypeAndUnitLabels bool, ) *API { a := &API{ QueryEngine: qe, @@ -312,9 +313,10 @@ func NewAPI( } if otlpEnabled { a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, registerer, ap, configFunc, remote.OTLPOptions{ - ConvertDelta: otlpDeltaToCumulative, - NativeDelta: otlpNativeDeltaIngestion, - LookbackDelta: lookbackDelta, + ConvertDelta: otlpDeltaToCumulative, + NativeDelta: otlpNativeDeltaIngestion, + LookbackDelta: lookbackDelta, + EnableTypeAndUnitLabels: enableTypeAndUnitLabels, }) } diff --git a/web/api/v1/errors_test.go b/web/api/v1/errors_test.go index b3e95e2243..dc685305e4 100644 --- a/web/api/v1/errors_test.go +++ b/web/api/v1/errors_test.go @@ -146,6 +146,7 @@ func createPrometheusAPI(t *testing.T, q storage.SampleAndChunkQueryable) *route false, false, 5*time.Minute, + false, ) promRouter := route.New().WithPrefix("/api/v1") diff --git a/web/web.go b/web/web.go index 7280255f8b..cd16a91967 100644 --- a/web/web.go +++ b/web/web.go @@ -293,6 +293,7 @@ type Options struct { NativeOTLPDeltaIngestion bool IsAgent bool CTZeroIngestionEnabled bool + EnableTypeAndUnitLabels bool AppName string AcceptRemoteWriteProtoMsgs []config.RemoteWriteProtoMsg @@ -393,6 +394,7 @@ func New(logger *slog.Logger, o *Options) *Handler { o.NativeOTLPDeltaIngestion, o.CTZeroIngestionEnabled, o.LookbackDelta, + o.EnableTypeAndUnitLabels, ) if o.RoutePrefix != "/" {