feature: type-and-unit-labels (extended MetricIdentity)

Experimental implementation of https://github.com/prometheus/proposals/pull/39

Previous (unmerged) experiments:
* https://github.com/prometheus/prometheus/compare/main...dashpole:prometheus:type_and_unit_labels
* https://github.com/prometheus/prometheus/pull/16025

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2025-03-18 11:00:12 +00:00
parent b0227d1f16
commit f5c852af72
25 changed files with 404 additions and 80 deletions

View file

@ -280,6 +280,9 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
case "otlp-deltatocumulative":
c.web.ConvertOTLPDelta = true
logger.Info("Converting delta OTLP metrics to cumulative")
case "type-and-unit-labels":
c.scrape.EnableTypeAndUnitLabels = true
logger.Info("Experimental type and unit labels enabled")
default:
logger.Warn("Unknown option for --enable-feature", "option", o)
}

View file

@ -184,3 +184,24 @@ Enabling this _can_ have negative impact on performance, because the in-memory
state is mutex guarded. Cumulative-only OTLP requests are not affected.
[d2c]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/deltatocumulativeprocessor
## Type and Unit Labels
`--enable-feature=type-and-unit-labels`
When enabled, Prometheus will start injecting additional, special `__type__`
and `__unit__` labels that extends the existing `__name__` metric identity.
Those labels are injected from the metadata parts of OpenMetrics and other scrape expositions
, as well as Remote Write 2.0 and OTLP receive. All user provided labels with
`__type__` and `__unit__` will be dropped or overridden.
This is useful for users who:
* Want to be able to select metrics based on type or unit.
* Want to handle cases of series with the same metric name and different type and units.
e.g. native histogram migrations or OpenTelemetry metrics from OTLP endpoint, without translation.
In future more work is planned that will depend on this e.g. rich PromQL UX that helps
when wrong types are used on wrong functions, automatic renames, delta types and more.
See [proposal](https://github.com/prometheus/proposals/pull/39)

View file

@ -359,6 +359,25 @@ func (ls Labels) DropMetricName() Labels {
return ls
}
// DropMetricIdentity is like DropMetricName but drops all parts of MetricIdentity.
func (ls Labels) DropMetricIdentity() Labels {
rm := 0
for i, l := range ls {
if IsMetricIdentityLabel(l.Name) {
i := i - rm // Offsetting after removals.
if i == 0 { // Make common case fast with no allocations.
ls = ls[1:]
} else {
// Avoid modifying original Labels - use [:i:i] so that left slice would not
// have any spare capacity and append would have to allocate a new slice for the result.
ls = append(ls[:i:i], ls[i+1:]...)
}
rm++
}
}
return ls
}
// InternStrings calls intern on every string value inside ls, replacing them with what it returns.
func (ls *Labels) InternStrings(intern func(string) string) {
for i, l := range *ls {

View file

@ -18,13 +18,28 @@ import (
"encoding/json"
"slices"
"strconv"
"strings"
"unsafe"
"github.com/prometheus/common/model"
)
const (
MetricName = "__name__"
// MetricName is a special label name and selector for MetricIdentity.Name.
MetricName = "__name__"
// metricType is a special label name and selector for MetricIdentity.Type.
// Private to ensure __name__, __type__ and __unit__ are used together
// and remain extensible in Prometheus. See Labels.MetricIdentity,
// Builder.SetMetricIdentity and ScratchBuilder.AddMetricIdentity for access.
metricType = "__type__"
// MetricUnit is a special label name and selector for MetricIdentity.Unit,
// which in the past used to be stored in metadata.
// Private to ensure __name__, __type__ and __unit__ are used together
// and remain extensible in Prometheus. See Labels.MetricIdentity,
// Builder.SetMetricIdentity and ScratchBuilder.AddMetricIdentity for access.
metricUnit = "__unit__"
AlertName = "alertname"
BucketLabel = "le"
InstanceName = "instance"
@ -33,6 +48,42 @@ const (
sep = '\xff' // Used between labels in `Bytes` and `Hash`.
)
// IsMetricIdentityLabel returns true if the given label name is a special
// metric identity label.
func IsMetricIdentityLabel(name string) bool {
return name == MetricName || name == metricType || name == metricUnit
}
// MetricIdentity represents extended metric identity parts beyond the metric name.
// Each "time series" is identifiable by MetricIdentity and other labels e.g. job.
type MetricIdentity struct {
// Name represents metric name (not always the same as metric family, until we
// have native, structured metric representation for all types).
// Empty means nameless metric (e.g. result of the PromQL function).
Name string
// Type, empty ("") is equivalent to model.UnknownMetricType.
// In the past Prometheus used to be stored it n metadata.
Type model.MetricType
// Unit of the metric, regardless if encoded in the metric name. Empty means
// unitless metric (e.g. result of the PromQL function).
// In the past Prometheus used to be stored it n metadata.
Unit string
}
func (m MetricIdentity) String() string {
b := strings.Builder{}
b.WriteString(m.Name)
if m.Unit != "" {
b.WriteString("~")
b.WriteString(m.Unit)
}
if m.Type != "" && m.Type != model.MetricTypeUnknown {
b.WriteString(".")
b.WriteString(string(m.Type))
}
return b.String()
}
var seps = []byte{sep} // Used with Hash, which has no WriteByte method.
// Label is a key/value pair of strings.
@ -40,6 +91,65 @@ type Label struct {
Name, Value string
}
// MetricIdentity returns the metric identity parts.
func (ls Labels) MetricIdentity() MetricIdentity {
typ := model.MetricTypeUnknown
if got := ls.Get(metricType); got != "" {
typ = model.MetricType(got)
}
return MetricIdentity{
Name: ls.Get(MetricName),
Type: typ,
Unit: ls.Get(metricUnit),
}
}
// SetMetricIdentity injects metric identity parts into labels.
// Empty fields of the given MetricIdentity (or unknown metric type), will
// cause removal of the existing part labels.
func (b *Builder) SetMetricIdentity(mid MetricIdentity) *Builder {
b.Set(MetricName, mid.Name)
if mid.Type == model.MetricTypeUnknown {
// Unknown equals empty semantically, so remove the label on unknown too as per
// method signature comment.
mid.Type = ""
}
b.Set(metricType, string(mid.Type))
b.Set(metricUnit, mid.Unit)
return b
}
// IgnoreIdentityLabelsScratchBuilder is a wrapper over scratch builder
// that ignores subsequent additions of special metric identity labels.
type IgnoreIdentityLabelsScratchBuilder struct {
*ScratchBuilder
}
// Add a name/value pair, unless it's a special metric identity label e.g. __name__, __type__, __unit__.
// Note if you Add the same name twice you will get a duplicate label, which is invalid.
func (b IgnoreIdentityLabelsScratchBuilder) Add(name, value string) {
if IsMetricIdentityLabel(name) {
return
}
b.ScratchBuilder.Add(name, value)
}
// AddMetricIdentity adds metric identity parts into labels.
// Empty fields of the given MetricIdentity (or unknown metric type), will be ignored.
//
//nolint:revive // unexported type
func (b *ScratchBuilder) AddMetricIdentity(mid MetricIdentity) {
if mid.Name != "" {
b.Add(MetricName, mid.Name)
}
if mid.Type != "" && mid.Type != model.MetricTypeUnknown {
b.Add(metricType, string(mid.Type))
}
if mid.Unit != "" {
b.Add(metricUnit, mid.Unit)
}
}
func (ls Labels) String() string {
var bytea [1024]byte // On stack to avoid memory allocation while building the output.
b := bytes.NewBuffer(bytea[:0])

View file

@ -555,6 +555,7 @@ func (ls Labels) ReleaseStrings(release func(string)) {
}
// DropMetricName returns Labels with "__name__" removed.
// Deprecate: Use DropMetric instead to handle type and unit correctly.
func (ls Labels) DropMetricName() Labels {
for i := 0; i < len(ls.data); {
lName, i2 := decodeString(ls.syms, ls.data, i)
@ -574,6 +575,27 @@ func (ls Labels) DropMetricName() Labels {
return ls
}
// DropMetricIdentity is like DropMetricName but drops all parts of MetricIdentity.
func (ls Labels) DropMetricIdentity() Labels {
for i := 0; i < len(ls.data); {
lName, i2 := decodeString(ls.syms, ls.data, i)
_, i2 = decodeVarint(ls.data, i2)
if lName[0] > '_' { // Stop looking if we've gone past special labels.
break
}
if IsMetricIdentityLabel(lName) {
if i == 0 { // Make common case fast with no allocations.
ls.data = ls.data[i2:]
} else {
ls.data = ls.data[:i] + ls.data[i2:]
}
continue
}
i = i2
}
return ls
}
// Builder allows modifying Labels.
type Builder struct {
syms *SymbolTable

View file

@ -439,6 +439,28 @@ func (ls Labels) DropMetricName() Labels {
return ls
}
// DropMetricIdentity is like DropMetricName but drops all parts of MetricIdentity.
func (ls Labels) DropMetricIdentity() Labels {
for i := 0; i < len(ls.data); {
lName, i2 := decodeString(ls.data, i)
size, i2 := decodeSize(ls.data, i2)
i2 += size
if lName[0] > '_' { // Stop looking if we've gone past special labels.
break
}
if IsMetricIdentityLabel(lName) {
if i == 0 { // Make common case fast with no allocations.
ls.data = ls.data[i2:]
} else {
ls.data = ls.data[:i] + ls.data[i2:]
}
continue
}
i = i2
}
return ls
}
// InternStrings is a no-op because it would only save when the whole set of labels is identical.
func (ls *Labels) InternStrings(intern func(string) string) {
}

View file

@ -513,11 +513,23 @@ func TestLabels_DropMetricName(t *testing.T) {
require.True(t, Equal(FromStrings("aaa", "111"), FromStrings(MetricName, "myname", "aaa", "111").DropMetricName()))
original := FromStrings("__aaa__", "111", MetricName, "myname", "bbb", "222")
check := FromStrings("__aaa__", "111", MetricName, "myname", "bbb", "222")
check := original.Copy()
require.True(t, Equal(FromStrings("__aaa__", "111", "bbb", "222"), check.DropMetricName()))
require.True(t, Equal(original, check))
}
func TestLabels_DropMetricIdentity(t *testing.T) {
require.True(t, Equal(FromStrings("aaa", "111", "bbb", "222"), FromStrings("aaa", "111", "bbb", "222").DropMetricIdentity()))
require.True(t, Equal(FromStrings("aaa", "111"), FromStrings(MetricName, "myname", "aaa", "111").DropMetricIdentity()))
require.True(t, Equal(FromStrings("aaa", "111"), FromStrings(MetricName, "myname", metricType, string(model.MetricTypeCounter), "aaa", "111").DropMetricIdentity()))
require.True(t, Equal(FromStrings("aaa", "111"), FromStrings(MetricName, "myname", metricType, string(model.MetricTypeCounter), metricUnit, "seconds", "aaa", "111").DropMetricIdentity()))
original := FromStrings("__aaa__", "111", MetricName, "myname", "bbb", "222")
check := original.Copy()
require.True(t, Equal(FromStrings("__aaa__", "111", "bbb", "222"), check.DropMetricIdentity()))
require.True(t, Equal(original, check))
}
func ScratchBuilderForBenchmark() ScratchBuilder {
// (Only relevant to -tags dedupelabels: stuff the symbol table before adding the real labels, to avoid having everything fitting into 1 byte.)
b := NewScratchBuilder(256)

View file

@ -144,10 +144,12 @@ func benchParse(b *testing.B, data []byte, parser string) {
var newParserFn newParser
switch parser {
case "promtext":
newParserFn = NewPromParser
newParserFn = func(b []byte, st *labels.SymbolTable) Parser {
return NewPromParser(b, st, false)
}
case "promproto":
newParserFn = func(b []byte, st *labels.SymbolTable) Parser {
return NewProtobufParser(b, true, st)
return NewProtobufParser(b, true, false, st)
}
case "omtext":
newParserFn = func(b []byte, st *labels.SymbolTable) Parser {
@ -273,7 +275,7 @@ func BenchmarkCreatedTimestampPromProto(b *testing.B) {
data := createTestProtoBuf(b).Bytes()
st := labels.NewSymbolTable()
p := NewProtobufParser(data, true, st)
p := NewProtobufParser(data, true, false, st)
found := false
Inner:

View file

@ -51,11 +51,13 @@ type Parser interface {
// Type returns the metric name and type in the current entry.
// Must only be called after Next returned a type entry.
// The returned byte slices become invalid after the next call to Next.
// TODO(bwplotka): Once type-and-unit-labels stabilizes we could remove this method.
Type() ([]byte, model.MetricType)
// Unit returns the metric name and unit in the current entry.
// Must only be called after Next returned a unit entry.
// The returned byte slices become invalid after the next call to Next.
// TODO(bwplotka): Once type-and-unit-labels stabilizes we could remove this method.
Unit() ([]byte, []byte)
// Comment returns the text of the current comment.
@ -128,19 +130,20 @@ func extractMediaType(contentType, fallbackType string) (string, error) {
// An error may also be returned if fallbackType had to be used or there was some
// other error parsing the supplied Content-Type.
// If the returned parser is nil then the scrape must fail.
func New(b []byte, contentType, fallbackType string, parseClassicHistograms, skipOMCTSeries bool, st *labels.SymbolTable) (Parser, error) {
func New(b []byte, contentType, fallbackType string, parseClassicHistograms, skipOMCTSeries, enableTypeAndUnitLabels bool, st *labels.SymbolTable) (Parser, error) {
mediaType, err := extractMediaType(contentType, fallbackType)
// err may be nil or something we want to warn about.
switch mediaType {
case "application/openmetrics-text":
return NewOpenMetricsParser(b, st, func(o *openMetricsParserOptions) {
o.SkipCTSeries = skipOMCTSeries
o.skipCTSeries = skipOMCTSeries
o.enableTypeAndUnitLabels = enableTypeAndUnitLabels
}), err
case "application/vnd.google.protobuf":
return NewProtobufParser(b, parseClassicHistograms, st), err
return NewProtobufParser(b, parseClassicHistograms, enableTypeAndUnitLabels, st), err
case "text/plain":
return NewPromParser(b, st), err
return NewPromParser(b, st, enableTypeAndUnitLabels), err
default:
return nil, err
}

View file

@ -168,7 +168,7 @@ func TestNewParser(t *testing.T) {
fallbackProtoMediaType := tt.fallbackScrapeProtocol.HeaderMediaType()
p, err := New([]byte{}, tt.contentType, fallbackProtoMediaType, false, false, labels.NewSymbolTable())
p, err := New([]byte{}, tt.contentType, fallbackProtoMediaType, false, false, false, labels.NewSymbolTable())
tt.validateParser(t, p)
if tt.err == "" {
require.NoError(t, err)

View file

@ -599,7 +599,7 @@ func TestNHCBParser_NoNHCBWhenExponential(t *testing.T) {
func() (string, parserFactory, []int, parserOptions) {
factory := func(keepClassic bool) Parser {
inputBuf := createTestProtoBufHistogram(t)
return NewProtobufParser(inputBuf.Bytes(), keepClassic, labels.NewSymbolTable())
return NewProtobufParser(inputBuf.Bytes(), keepClassic, false, labels.NewSymbolTable())
}
return "ProtoBuf", factory, []int{1, 2, 3}, parserOptions{useUTF8sep: true, hasCreatedTimeStamp: true}
},
@ -613,7 +613,7 @@ func TestNHCBParser_NoNHCBWhenExponential(t *testing.T) {
func() (string, parserFactory, []int, parserOptions) {
factory := func(_ bool) Parser {
input := createTestPromHistogram()
return NewPromParser([]byte(input), labels.NewSymbolTable())
return NewPromParser([]byte(input), labels.NewSymbolTable(), false)
}
return "Prometheus", factory, []int{1}, parserOptions{}
},

View file

@ -81,10 +81,12 @@ type OpenMetricsParser struct {
mfNameLen int // length of metric family name to get from series.
text []byte
mtype model.MetricType
val float64
ts int64
hasTS bool
start int
unit string
val float64
ts int64
hasTS bool
start int
// offsets is a list of offsets into series that describe the positions
// of the metric name and label names and values for this series.
// p.offsets[0] is the start character of the metric name.
@ -106,12 +108,14 @@ type OpenMetricsParser struct {
ignoreExemplar bool
// visitedMFName is the metric family name of the last visited metric when peeking ahead
// for _created series during the execution of the CreatedTimestamp method.
visitedMFName []byte
skipCTSeries bool
visitedMFName []byte
skipCTSeries bool
enableTypeAndUnitLabels bool
}
type openMetricsParserOptions struct {
SkipCTSeries bool
skipCTSeries bool
enableTypeAndUnitLabels bool
}
type OpenMetricsOption func(*openMetricsParserOptions)
@ -125,7 +129,15 @@ type OpenMetricsOption func(*openMetricsParserOptions)
// best-effort compatibility.
func WithOMParserCTSeriesSkipped() OpenMetricsOption {
return func(o *openMetricsParserOptions) {
o.SkipCTSeries = true
o.skipCTSeries = true
}
}
// WithOMParserTypeAndUnitLabels enables type-and-unit-labels mode
// in which parser injects __type__ and __unit__ into labels.
func WithOMParserTypeAndUnitLabels() OpenMetricsOption {
return func(o *openMetricsParserOptions) {
o.enableTypeAndUnitLabels = true
}
}
@ -138,9 +150,10 @@ func NewOpenMetricsParser(b []byte, st *labels.SymbolTable, opts ...OpenMetricsO
}
parser := &OpenMetricsParser{
l: &openMetricsLexer{b: b},
builder: labels.NewScratchBuilderWithSymbolTable(st, 16),
skipCTSeries: options.SkipCTSeries,
l: &openMetricsLexer{b: b},
builder: labels.NewScratchBuilderWithSymbolTable(st, 16),
skipCTSeries: options.skipCTSeries,
enableTypeAndUnitLabels: options.enableTypeAndUnitLabels,
}
return parser
@ -187,7 +200,7 @@ func (p *OpenMetricsParser) Type() ([]byte, model.MetricType) {
// Must only be called after Next returned a unit entry.
// The returned byte slices become invalid after the next call to Next.
func (p *OpenMetricsParser) Unit() ([]byte, []byte) {
return p.l.b[p.offsets[0]:p.offsets[1]], p.text
return p.l.b[p.offsets[0]:p.offsets[1]], []byte(p.unit)
}
// Comment returns the text of the current comment.
@ -203,12 +216,24 @@ func (p *OpenMetricsParser) Labels(l *labels.Labels) {
p.builder.Reset()
metricName := unreplace(s[p.offsets[0]-p.start : p.offsets[1]-p.start])
p.builder.Add(labels.MetricName, metricName)
if p.enableTypeAndUnitLabels {
p.builder.AddMetricIdentity(labels.MetricIdentity{
Name: metricName,
Type: p.mtype,
Unit: p.unit,
})
} else {
p.builder.Add(labels.MetricName, metricName)
}
for i := 2; i < len(p.offsets); i += 4 {
a := p.offsets[i] - p.start
b := p.offsets[i+1] - p.start
label := unreplace(s[a:b])
if p.enableTypeAndUnitLabels && labels.IsMetricIdentityLabel(label) {
// Dropping user provided id labels if needed.
continue
}
c := p.offsets[i+2] - p.start
d := p.offsets[i+3] - p.start
value := normalizeFloatsInLabelValues(p.mtype, label, unreplace(s[c:d]))
@ -493,11 +518,11 @@ func (p *OpenMetricsParser) Next() (Entry, error) {
case tType:
return EntryType, nil
case tUnit:
p.unit = string(p.text)
m := yoloString(p.l.b[p.offsets[0]:p.offsets[1]])
u := yoloString(p.text)
if len(u) > 0 {
if !strings.HasSuffix(m, u) || len(m) < len(u)+1 || p.l.b[p.offsets[1]-len(u)-1] != '_' {
return EntryInvalid, fmt.Errorf("unit %q not a suffix of metric %q", u, m)
if len(p.unit) > 0 {
if !strings.HasSuffix(m, p.unit) || len(m) < len(p.unit)+1 || p.l.b[p.offsets[1]-len(p.unit)-1] != '_' {
return EntryInvalid, fmt.Errorf("unit %q not a suffix of metric %q", p.unit, m)
}
}
return EntryUnit, nil

View file

@ -468,7 +468,7 @@ foobar{quantile="0.99"} 150.1`
requireEntries(t, exp, got)
}
func TestUTF8OpenMetricsParse(t *testing.T) {
func TestOpenMetricsParse_UTF8(t *testing.T) {
input := `# HELP "go.gc_duration_seconds" A summary of the GC invocation durations.
# TYPE "go.gc_duration_seconds" summary
# UNIT "go.gc_duration_seconds" seconds
@ -554,6 +554,50 @@ choices}`, "strange©™\n'quoted' \"name\"", "6"),
requireEntries(t, exp, got)
}
func TestOpenMetricsParse_EnableTypeAndUnitLabels(t *testing.T) {
input := `# HELP "go.gc_duration_seconds" A summary of the GC invocation durations.
# TYPE "go.gc_duration_seconds" summary
# UNIT "go.gc_duration_seconds" seconds
{"go.gc_duration_seconds",quantile="0"} 4.9351e-05
{"go.gc_duration_seconds",quantile="0.25"} 7.424100000000001e-05
{"go.gc_duration_seconds_created"} 1520872607.123
{"go.gc_duration_seconds",quantile="0.5",a="b"} 8.3835e-05
`
input += "# EOF\n"
exp := []parsedEntry{
{
m: "go.gc_duration_seconds",
help: "A summary of the GC invocation durations.",
}, {
m: "go.gc_duration_seconds",
typ: model.MetricTypeSummary,
}, {
m: "go.gc_duration_seconds",
unit: "seconds",
}, {
m: `{"go.gc_duration_seconds",quantile="0"}`,
v: 4.9351e-05,
lset: labels.FromStrings("__name__", "go.gc_duration_seconds", "__type__", "summary", "__unit__", "seconds", "quantile", "0.0"),
ct: 1520872607123,
}, {
m: `{"go.gc_duration_seconds",quantile="0.25"}`,
v: 7.424100000000001e-05,
lset: labels.FromStrings("__name__", "go.gc_duration_seconds", "__type__", "summary", "__unit__", "seconds", "quantile", "0.25"),
ct: 1520872607123,
}, {
m: `{"go.gc_duration_seconds",quantile="0.5",a="b"}`,
v: 8.3835e-05,
lset: labels.FromStrings("__name__", "go.gc_duration_seconds", "__type__", "summary", "__unit__", "seconds", "quantile", "0.5", "a", "b"),
},
}
p := NewOpenMetricsParser([]byte(input), labels.NewSymbolTable(), WithOMParserCTSeriesSkipped(), WithOMParserTypeAndUnitLabels())
got := testParse(t, p)
requireEntries(t, exp, got)
}
func TestOpenMetricsParseErrors(t *testing.T) {
cases := []struct {
input string

View file

@ -160,16 +160,19 @@ type PromParser struct {
// of the metric name and label names and values for this series.
// p.offsets[0] is the start character of the metric name.
// p.offsets[1] is the end of the metric name.
// Subsequently, p.offsets is a pair of pair of offsets for the positions
// Subsequently, p.offsets is a pair of offsets for the positions
// of the label name and value start and end characters.
offsets []int
enableTypeAndUnitLabels bool
}
// NewPromParser returns a new parser of the byte slice.
func NewPromParser(b []byte, st *labels.SymbolTable) Parser {
func NewPromParser(b []byte, st *labels.SymbolTable, enableTypeAndUnitLabels bool) Parser {
return &PromParser{
l: &promlexer{b: append(b, '\n')},
builder: labels.NewScratchBuilderWithSymbolTable(st, 16),
l: &promlexer{b: append(b, '\n')},
builder: labels.NewScratchBuilderWithSymbolTable(st, 16),
enableTypeAndUnitLabels: enableTypeAndUnitLabels,
}
}
@ -229,12 +232,23 @@ func (p *PromParser) Labels(l *labels.Labels) {
p.builder.Reset()
metricName := unreplace(s[p.offsets[0]-p.start : p.offsets[1]-p.start])
p.builder.Add(labels.MetricName, metricName)
if p.enableTypeAndUnitLabels {
p.builder.AddMetricIdentity(labels.MetricIdentity{
Name: metricName,
Type: p.mtype,
})
} else {
p.builder.Add(labels.MetricName, metricName)
}
for i := 2; i < len(p.offsets); i += 4 {
a := p.offsets[i] - p.start
b := p.offsets[i+1] - p.start
label := unreplace(s[a:b])
if p.enableTypeAndUnitLabels && labels.IsMetricIdentityLabel(label) {
// Dropping user provided id labels if needed.
continue
}
c := p.offsets[i+2] - p.start
d := p.offsets[i+3] - p.start
value := normalizeFloatsInLabelValues(p.mtype, label, unreplace(s[c:d]))

View file

@ -199,7 +199,7 @@ testmetric{le="10"} 1`
},
}
p := NewPromParser([]byte(input), labels.NewSymbolTable())
p := NewPromParser([]byte(input), labels.NewSymbolTable(), false)
got := testParse(t, p)
requireEntries(t, exp, got)
}
@ -274,7 +274,7 @@ choices}`, "strange©™\n'quoted' \"name\"", "6"),
},
}
p := NewPromParser([]byte(input), labels.NewSymbolTable())
p := NewPromParser([]byte(input), labels.NewSymbolTable(), false)
got := testParse(t, p)
requireEntries(t, exp, got)
}
@ -355,7 +355,7 @@ func TestPromParseErrors(t *testing.T) {
}
for i, c := range cases {
p := NewPromParser([]byte(c.input), labels.NewSymbolTable())
p := NewPromParser([]byte(c.input), labels.NewSymbolTable(), false)
var err error
for err == nil {
_, err = p.Next()
@ -408,7 +408,7 @@ func TestPromNullByteHandling(t *testing.T) {
}
for i, c := range cases {
p := NewPromParser([]byte(c.input), labels.NewSymbolTable())
p := NewPromParser([]byte(c.input), labels.NewSymbolTable(), false)
var err error
for err == nil {
_, err = p.Next()

View file

@ -78,18 +78,20 @@ type ProtobufParser struct {
// Whether to also parse a classic histogram that is also present as a
// native histogram.
parseClassicHistograms bool
parseClassicHistograms bool
enableTypeAndUnitLabels bool
}
// NewProtobufParser returns a parser for the payload in the byte slice.
func NewProtobufParser(b []byte, parseClassicHistograms bool, st *labels.SymbolTable) Parser {
func NewProtobufParser(b []byte, parseClassicHistograms bool, enableTypeAndUnitLabels bool, st *labels.SymbolTable) Parser {
return &ProtobufParser{
dec: dto.NewMetricStreamingDecoder(b),
entryBytes: &bytes.Buffer{},
builder: labels.NewScratchBuilderWithSymbolTable(st, 16), // TODO(bwplotka): Try base builder.
state: EntryInvalid,
parseClassicHistograms: parseClassicHistograms,
state: EntryInvalid,
parseClassicHistograms: parseClassicHistograms,
enableTypeAndUnitLabels: enableTypeAndUnitLabels,
}
}
@ -552,10 +554,22 @@ func (p *ProtobufParser) Next() (Entry, error) {
// * p.fieldsDone depending on p.fieldPos.
func (p *ProtobufParser) onSeriesOrHistogramUpdate() error {
p.builder.Reset()
p.builder.Add(labels.MetricName, p.getMagicName())
if err := p.dec.Label(&p.builder); err != nil {
return err
if p.enableTypeAndUnitLabels {
_, typ := p.Type()
p.builder.AddMetricIdentity(labels.MetricIdentity{
Name: p.getMagicName(),
Type: typ,
Unit: p.dec.GetUnit(),
})
if err := p.dec.Label(labels.IgnoreIdentityLabelsScratchBuilder{ScratchBuilder: &p.builder}); err != nil {
return err
}
} else {
p.builder.Add(labels.MetricName, p.getMagicName())
if err := p.dec.Label(&p.builder); err != nil {
return err
}
}
if needed, name, value := p.getMagicLabel(); needed {

View file

@ -833,7 +833,7 @@ func TestProtobufParse(t *testing.T) {
}{
{
name: "ignore classic buckets of native histograms",
parser: NewProtobufParser(inputBuf.Bytes(), false, labels.NewSymbolTable()),
parser: NewProtobufParser(inputBuf.Bytes(), false, false, labels.NewSymbolTable()),
expected: []parsedEntry{
{
m: "go_build_info",
@ -1468,7 +1468,7 @@ func TestProtobufParse(t *testing.T) {
},
{
name: "parse classic and native buckets",
parser: NewProtobufParser(inputBuf.Bytes(), true, labels.NewSymbolTable()),
parser: NewProtobufParser(inputBuf.Bytes(), true, false, labels.NewSymbolTable()),
expected: []parsedEntry{
{
m: "go_build_info",

View file

@ -23,8 +23,6 @@ import (
proto "github.com/gogo/protobuf/proto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
)
type MetricStreamingDecoder struct {
@ -153,12 +151,16 @@ func (m *MetricStreamingDecoder) GetLabel() {
panic("don't use GetLabel, use Label instead")
}
type scratchBuilder interface {
Add(name, value string)
}
// Label parses labels into labels scratch builder. Metric name is missing
// given the protobuf metric model and has to be deduced from the metric family name.
// TODO: The method name intentionally hide MetricStreamingDecoder.Metric.Label
// field to avoid direct use (it's not parsed). In future generator will generate
// structs tailored for streaming decoding.
func (m *MetricStreamingDecoder) Label(b *labels.ScratchBuilder) error {
func (m *MetricStreamingDecoder) Label(b scratchBuilder) error {
for _, l := range m.labels {
if err := parseLabel(m.mData[l.start:l.end], b); err != nil {
return err
@ -169,7 +171,7 @@ func (m *MetricStreamingDecoder) Label(b *labels.ScratchBuilder) error {
// parseLabels is essentially LabelPair.Unmarshal but directly adding into scratch builder
// and reusing strings.
func parseLabel(dAtA []byte, b *labels.ScratchBuilder) error {
func parseLabel(dAtA []byte, b scratchBuilder) error {
var name, value string
l := len(dAtA)
iNdEx := 0

View file

@ -1765,7 +1765,7 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value,
it.Reset(chkIter)
metric := selVS.Series[i].Labels()
if !ev.enableDelayedNameRemoval && dropName {
metric = metric.DropMetricName()
metric = metric.DropMetricIdentity()
}
ss := Series{
Metric: metric,
@ -1904,7 +1904,7 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value,
if e.Op == parser.SUB {
for i := range mat {
if !ev.enableDelayedNameRemoval {
mat[i].Metric = mat[i].Metric.DropMetricName()
mat[i].Metric = mat[i].Metric.DropMetricIdentity()
}
mat[i].DropName = true
for j := range mat[i].Floats {
@ -2653,7 +2653,7 @@ func (ev *evaluator) VectorBinop(op parser.ItemType, lhs, rhs Vector, matching *
}
metric := resultMetric(ls.Metric, rs.Metric, op, matching, enh)
if !ev.enableDelayedNameRemoval && returnBool {
metric = metric.DropMetricName()
metric = metric.DropMetricIdentity()
}
insertedSigs, exists := matchedSigs[sig]
if matching.Card == parser.CardOneToOne {
@ -2720,8 +2720,9 @@ func resultMetric(lhs, rhs labels.Labels, op parser.ItemType, matching *parser.V
}
str := string(enh.lblResultBuf)
if shouldDropMetricName(op) {
enh.lb.Del(labels.MetricName)
if shouldDropMetricIdentity(op) {
// Setting to empty fields will cause the deletion of those.
enh.lb.SetMetricIdentity(labels.MetricIdentity{})
}
if matching.Card == parser.CardOneToOne {
@ -2780,9 +2781,9 @@ func (ev *evaluator) VectorscalarBinop(op parser.ItemType, lhs Vector, rhs Scala
if keep {
lhsSample.F = float
lhsSample.H = histogram
if shouldDropMetricName(op) || returnBool {
if shouldDropMetricIdentity(op) || returnBool {
if !ev.enableDelayedNameRemoval {
lhsSample.Metric = lhsSample.Metric.DropMetricName()
lhsSample.Metric = lhsSample.Metric.DropMetricIdentity()
}
lhsSample.DropName = true
}
@ -3440,7 +3441,7 @@ func (ev *evaluator) cleanupMetricLabels(v parser.Value) {
mat := v.(Matrix)
for i := range mat {
if mat[i].DropName {
mat[i].Metric = mat[i].Metric.DropMetricName()
mat[i].Metric = mat[i].Metric.DropMetricIdentity()
}
}
if mat.ContainsSameLabelset() {
@ -3450,7 +3451,7 @@ func (ev *evaluator) cleanupMetricLabels(v parser.Value) {
vec := v.(Vector)
for i := range vec {
if vec[i].DropName {
vec[i].Metric = vec[i].Metric.DropMetricName()
vec[i].Metric = vec[i].Metric.DropMetricIdentity()
}
}
if vec.ContainsSameLabelset() {
@ -3552,9 +3553,9 @@ func btos(b bool) float64 {
return 0
}
// shouldDropMetricName returns whether the metric name should be dropped in the
// shouldDropMetricIdentity returns whether the metric name, type and unit should be dropped in the
// result of the op operation.
func shouldDropMetricName(op parser.ItemType) bool {
func shouldDropMetricIdentity(op parser.ItemType) bool {
switch op {
case parser.ADD, parser.SUB, parser.DIV, parser.MUL, parser.POW, parser.MOD, parser.ATAN2:
return true

View file

@ -578,7 +578,7 @@ func clamp(vec Vector, minVal, maxVal float64, enh *EvalNodeHelper) (Vector, ann
continue
}
if !enh.enableDelayedNameRemoval {
el.Metric = el.Metric.DropMetricName()
el.Metric = el.Metric.DropMetricIdentity()
}
enh.Out = append(enh.Out, Sample{
Metric: el.Metric,
@ -630,7 +630,7 @@ func funcRound(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper
}
f := math.Floor(el.F*toNearestInverse+0.5) / toNearestInverse
if !enh.enableDelayedNameRemoval {
el.Metric = el.Metric.DropMetricName()
el.Metric = el.Metric.DropMetricIdentity()
}
enh.Out = append(enh.Out, Sample{
Metric: el.Metric,
@ -1014,7 +1014,7 @@ func simpleFunc(vals []parser.Value, enh *EvalNodeHelper, f func(float64) float6
for _, el := range vals[0].(Vector) {
if el.H == nil { // Process only float samples.
if !enh.enableDelayedNameRemoval {
el.Metric = el.Metric.DropMetricName()
el.Metric = el.Metric.DropMetricIdentity()
}
enh.Out = append(enh.Out, Sample{
Metric: el.Metric,
@ -1164,7 +1164,7 @@ func funcTimestamp(vals []parser.Value, _ parser.Expressions, enh *EvalNodeHelpe
vec := vals[0].(Vector)
for _, el := range vec {
if !enh.enableDelayedNameRemoval {
el.Metric = el.Metric.DropMetricName()
el.Metric = el.Metric.DropMetricIdentity()
}
enh.Out = append(enh.Out, Sample{
Metric: el.Metric,
@ -1294,7 +1294,7 @@ func funcHistogramCount(vals []parser.Value, _ parser.Expressions, enh *EvalNode
continue
}
if !enh.enableDelayedNameRemoval {
sample.Metric = sample.Metric.DropMetricName()
sample.Metric = sample.Metric.DropMetricIdentity()
}
enh.Out = append(enh.Out, Sample{
Metric: sample.Metric,
@ -1315,7 +1315,7 @@ func funcHistogramSum(vals []parser.Value, _ parser.Expressions, enh *EvalNodeHe
continue
}
if !enh.enableDelayedNameRemoval {
sample.Metric = sample.Metric.DropMetricName()
sample.Metric = sample.Metric.DropMetricIdentity()
}
enh.Out = append(enh.Out, Sample{
Metric: sample.Metric,
@ -1336,7 +1336,7 @@ func funcHistogramAvg(vals []parser.Value, _ parser.Expressions, enh *EvalNodeHe
continue
}
if !enh.enableDelayedNameRemoval {
sample.Metric = sample.Metric.DropMetricName()
sample.Metric = sample.Metric.DropMetricIdentity()
}
enh.Out = append(enh.Out, Sample{
Metric: sample.Metric,
@ -1379,7 +1379,7 @@ func funcHistogramStdDev(vals []parser.Value, _ parser.Expressions, enh *EvalNod
variance += cVariance
variance /= sample.H.Count
if !enh.enableDelayedNameRemoval {
sample.Metric = sample.Metric.DropMetricName()
sample.Metric = sample.Metric.DropMetricIdentity()
}
enh.Out = append(enh.Out, Sample{
Metric: sample.Metric,
@ -1422,7 +1422,7 @@ func funcHistogramStdVar(vals []parser.Value, _ parser.Expressions, enh *EvalNod
variance += cVariance
variance /= sample.H.Count
if !enh.enableDelayedNameRemoval {
sample.Metric = sample.Metric.DropMetricName()
sample.Metric = sample.Metric.DropMetricIdentity()
}
enh.Out = append(enh.Out, Sample{
Metric: sample.Metric,
@ -1445,7 +1445,7 @@ func funcHistogramFraction(vals []parser.Value, _ parser.Expressions, enh *EvalN
continue
}
if !enh.enableDelayedNameRemoval {
sample.Metric = sample.Metric.DropMetricName()
sample.Metric = sample.Metric.DropMetricIdentity()
}
enh.Out = append(enh.Out, Sample{
Metric: sample.Metric,
@ -1518,7 +1518,7 @@ func funcHistogramQuantile(vals []parser.Value, args parser.Expressions, enh *Ev
}
if !enh.enableDelayedNameRemoval {
sample.Metric = sample.Metric.DropMetricName()
sample.Metric = sample.Metric.DropMetricIdentity()
}
enh.Out = append(enh.Out, Sample{
Metric: sample.Metric,
@ -1536,7 +1536,7 @@ func funcHistogramQuantile(vals []parser.Value, args parser.Expressions, enh *Ev
}
if !enh.enableDelayedNameRemoval {
mb.metric = mb.metric.DropMetricName()
mb.metric = mb.metric.DropMetricIdentity()
}
enh.Out = append(enh.Out, Sample{
@ -1754,7 +1754,7 @@ func dateWrapper(vals []parser.Value, enh *EvalNodeHelper, f func(time.Time) flo
}
t := time.Unix(int64(el.F), 0).UTC()
if !enh.enableDelayedNameRemoval {
el.Metric = el.Metric.DropMetricName()
el.Metric = el.Metric.DropMetricIdentity()
}
enh.Out = append(enh.Out, Sample{
Metric: el.Metric,

View file

@ -61,7 +61,7 @@ const (
var symbolTable = labels.NewSymbolTable()
func fuzzParseMetricWithContentType(in []byte, contentType string) int {
p, warning := textparse.New(in, contentType, "", false, false, symbolTable)
p, warning := textparse.New(in, contentType, "", false, false, false, symbolTable)
if p == nil || warning != nil {
// An invalid content type is being passed, which should not happen
// in this context.

View file

@ -87,6 +87,9 @@ type Options struct {
// Option to enable the ingestion of native histograms.
EnableNativeHistogramsIngestion bool
// EnableTypeAndUnitLabels
EnableTypeAndUnitLabels bool
// Optional HTTP client options to use when scraping.
HTTPClientOptions []config_util.HTTPClientOption

View file

@ -195,6 +195,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
opts.convertClassicHistToNHCB,
options.EnableNativeHistogramsIngestion,
options.EnableCreatedTimestampZeroIngestion,
options.EnableTypeAndUnitLabels,
options.ExtraMetrics,
options.AppendMetadata,
opts.target,
@ -916,6 +917,7 @@ type scrapeLoop struct {
// Feature flagged options.
enableNativeHistogramIngestion bool
enableCTZeroIngestion bool
enableTypeAndUnitLabels bool
appender func(ctx context.Context) storage.Appender
symbolTable *labels.SymbolTable
@ -1223,6 +1225,7 @@ func newScrapeLoop(ctx context.Context,
convertClassicHistToNHCB bool,
enableNativeHistogramIngestion bool,
enableCTZeroIngestion bool,
enableTypeAndUnitLabels bool,
reportExtraMetrics bool,
appendMetadataToWAL bool,
target *Target,
@ -1279,6 +1282,7 @@ func newScrapeLoop(ctx context.Context,
convertClassicHistToNHCB: convertClassicHistToNHCB,
enableNativeHistogramIngestion: enableNativeHistogramIngestion,
enableCTZeroIngestion: enableCTZeroIngestion,
enableTypeAndUnitLabels: enableTypeAndUnitLabels,
reportExtraMetrics: reportExtraMetrics,
appendMetadataToWAL: appendMetadataToWAL,
metrics: metrics,
@ -1604,7 +1608,7 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
return
}
p, err := textparse.New(b, contentType, sl.fallbackScrapeProtocol, sl.alwaysScrapeClassicHist, sl.enableCTZeroIngestion, sl.symbolTable)
p, err := textparse.New(b, contentType, sl.fallbackScrapeProtocol, sl.alwaysScrapeClassicHist, sl.enableCTZeroIngestion, sl.enableTypeAndUnitLabels, sl.symbolTable)
if p == nil {
sl.l.Error(
"Failed to determine correct type of scrape target.",

View file

@ -958,6 +958,7 @@ func newBasicScrapeLoopWithFallback(t testing.TB, ctx context.Context, scraper s
false,
false,
false,
false,
true,
nil,
false,
@ -1105,6 +1106,7 @@ func TestScrapeLoopRun(t *testing.T) {
false,
false,
false,
false,
nil,
false,
scrapeMetrics,
@ -1252,6 +1254,7 @@ func TestScrapeLoopMetadata(t *testing.T) {
false,
false,
false,
false,
nil,
false,
scrapeMetrics,
@ -1978,7 +1981,7 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
fakeRef := storage.SeriesRef(1)
expValue := float64(1)
metric := []byte(`metric{n="1"} 1`)
p, warning := textparse.New(metric, "text/plain", "", false, false, labels.NewSymbolTable())
p, warning := textparse.New(metric, "text/plain", "", false, false, false, labels.NewSymbolTable())
require.NotNil(t, p)
require.NoError(t, warning)

View file

@ -392,7 +392,7 @@ func TestFederationWithNativeHistograms(t *testing.T) {
require.Equal(t, http.StatusOK, res.Code)
body, err := io.ReadAll(res.Body)
require.NoError(t, err)
p := textparse.NewProtobufParser(body, false, labels.NewSymbolTable())
p := textparse.NewProtobufParser(body, false, false, labels.NewSymbolTable())
var actVec promql.Vector
metricFamilies := 0
l := labels.Labels{}