diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 638082549d..8a4902a8d1 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -68,6 +68,7 @@ import ( "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/scrape" + "github.com/prometheus/prometheus/semconv" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/tracing" @@ -214,6 +215,8 @@ type flagConfig struct { promqlEnableDelayedNameRemoval bool promslogConfig promslog.Config + + enableSemconvVersionedRead bool } // setFeatureListOptions sets the corresponding options from the featureList. @@ -283,6 +286,9 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error { case "type-and-unit-labels": c.scrape.EnableTypeAndUnitLabels = true logger.Info("Experimental type and unit labels enabled") + case "semconv-versioned-read": + c.enableSemconvVersionedRead = true + logger.Info("Experimental semconv versioned read enabled") default: logger.Warn("Unknown option for --enable-feature", "option", o) } @@ -728,6 +734,10 @@ func main() { fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) ) + if cfg.enableSemconvVersionedRead { + fanoutStorage = semconv.AwareStorage(fanoutStorage) + } + var ( ctxWeb, cancelWeb = context.WithCancel(context.Background()) ctxRule = context.Background() diff --git a/go.mod b/go.mod index a2abdabf7a..60591bd26e 100644 --- a/go.mod +++ b/go.mod @@ -162,6 +162,7 @@ require ( github.com/knadh/koanf/v2 v2.1.2 // indirect github.com/kylelemons/godebug v1.1.0 // indirect github.com/mailru/easyjson v0.7.7 // indirect + github.com/maruel/natural v1.1.1 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mdlayher/socket v0.4.1 // indirect diff --git a/go.sum b/go.sum index b379fb0d25..25e57e4e9f 100644 --- a/go.sum +++ b/go.sum @@ -323,6 +323,8 @@ github.com/linode/linodego v1.47.0 h1:6MFNCyzWbr8Rhl4r7d5DwZLwxvFIsM4ARH6W0KS/R0 github.com/linode/linodego v1.47.0/go.mod h1:vyklQRzZUWhFVBZdYx4dcYJU/gG9yKB9VUcUs6ub0Lk= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/maruel/natural v1.1.1 h1:Hja7XhhmvEFhcByqDoHz9QZbkWey+COd9xWfCfn1ioo= +github.com/maruel/natural v1.1.1/go.mod h1:v+Rfd79xlw1AgVBjbO0BEQmptqb5HvL/k9GRHB7ZKEg= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= diff --git a/semconv/engine.go b/semconv/engine.go new file mode 100644 index 0000000000..c94ab74ac9 --- /dev/null +++ b/semconv/engine.go @@ -0,0 +1,451 @@ +package semconv + +import ( + "fmt" + "path" + "strconv" + "strings" + "sync" + "time" + + "github.com/maruel/natural" + "github.com/prometheus/common/model" + + "github.com/prometheus/prometheus/model/histogram" + + "github.com/prometheus/prometheus/tsdb/chunkenc" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" +) + +const cacheTTL = 1 * time.Hour + +type schemaEngine struct { + // TODO(bwplotka): Implement GC logic for ttl and limits. + cachedIDs map[string]*ids + cacheIDsMu sync.RWMutex + cachedChangelog map[string]*changelog + cacheChangelogMu sync.RWMutex +} + +func newSchemaEngine() *schemaEngine { + return &schemaEngine{ + cachedIDs: map[string]*ids{}, + cachedChangelog: map[string]*changelog{}, + } +} + +type matcherBuilder struct { + metric labels.MetricIdentity + other []*labels.Matcher +} + +func newMatcherBuilder(matchers []*labels.Matcher) (matcherBuilder, error) { + var b matcherBuilder + for _, m := range matchers { + switch m.Name { + case labels.MetricName: + if m.Type != labels.MatchEqual { + return b, fmt.Errorf("__name__ matcher must be equal") + } + b.metric.Name = m.Value + case "__type__": + if m.Type != labels.MatchEqual { + return b, fmt.Errorf("__type__ matcher must be equal") + } + b.metric.Type = model.MetricType(m.Value) + case "__unit__": + if m.Type != labels.MatchEqual { + return b, fmt.Errorf("__unit__ matcher must be equal") + } + b.metric.Unit = m.Value + case schemaURLLabel: + // Skip it as we will be querying to different versions? We could + // make regex for registry dir at least, if that helps. + default: + b.other = append(b.other, m) + } + } + return b, nil +} + +// ToMatchers returns a copy of matchers based on builder details. +func (b matcherBuilder) ToMatchers(extraNameSuffix string) []*labels.Matcher { + ret := make([]*labels.Matcher, 0, len(b.other)+3) + + if b.metric.Name != "" { + ret = append(ret, &labels.Matcher{ + Name: model.MetricNameLabel, + Type: labels.MatchEqual, + Value: b.metric.Name + extraNameSuffix, + }) + } + if b.metric.Type != "" && b.metric.Type != model.MetricTypeUnknown { + ret = append(ret, &labels.Matcher{ + Name: "__type__", + Type: labels.MatchEqual, + Value: string(b.metric.Type), + }) + } + if b.metric.Unit != "" { + ret = append(ret, &labels.Matcher{ + Name: "__unit__", + Type: labels.MatchEqual, + Value: b.metric.Unit, + }) + } + return append(ret, b.other...) +} + +func (e *schemaEngine) getMetricID(schemaURL string, matchers matcherBuilder) (metricID, string, error) { + schemaVersion := path.Base(schemaURL) + + // TODO(bwplotka): This assumes such a file structure is part of the spec. + ids, err := e.fetchIDs(schemaIDsURL(schemaURL)) + if err != nil { + return "", "", fmt.Errorf("based on __schema_url__=%v; %w", schemaURL, err) + } + + var ( + vid []versionedID + magicSuffix string + ) + for _, suffix := range []string{"", "_bucket", "_count", "_sum"} { + magicSuffix = suffix + m := matchers.metric + m.Name = strings.TrimSuffix(m.Name, magicSuffix) + + var ok bool + vid, ok = ids.MetricsIDs[m.String()] + if !ok { + // Try non-unit search. + val, ok := ids.uniqueNameTypeToIdentity[m.String()] + if !ok { + // Try just name search. + val, ok = ids.uniqueNameToIdentity[m.Name] + if !ok { + // Try different suffix. + continue + } + } + if val == "" { + return "", "", fmt.Errorf("ambigous metric ID lookup for %v metric; use __type__ and __unit__ for more specific selection", m.String()) + } + vid = ids.MetricsIDs[val] + break + } + } + if len(vid) == 0 { + return "", "", fmt.Errorf("can't find metric ID in %v entry for version %v; this metric (with or without magic suffixes) is not part of this schema registry", matchers.metric.String(), schemaVersion) + } + + for _, id := range vid { + if !natural.Less(id.IntroVersion, schemaVersion) { + return id.ID, magicSuffix, nil + } + } + return "", "", fmt.Errorf("can't find metric ID in %v entry for version %v", matchers.metric.String(), schemaVersion) +} + +func (e *schemaEngine) fetchIDs(schemaIDsURL string) (_ *ids, err error) { + e.cacheIDsMu.RLock() + ids, ok := e.cachedIDs[schemaIDsURL] + e.cacheIDsMu.RUnlock() + if ok && time.Now().Sub(ids.fetchTime) < cacheTTL { + return ids, nil + } + // Expired or missing. + ids, err = fetchIDs(schemaIDsURL) + if err != nil { + return nil, err + } + e.cacheIDsMu.Lock() + e.cachedIDs[schemaIDsURL] = ids + e.cacheIDsMu.Unlock() + return ids, nil +} + +func (e *schemaEngine) fetchChangelog(schemaChangelogURL string) (_ *changelog, err error) { + e.cacheChangelogMu.RLock() + ch, ok := e.cachedChangelog[schemaChangelogURL] + e.cacheChangelogMu.RUnlock() + if ok && time.Now().Sub(ch.fetchTime) < cacheTTL { + return ch, nil + } + // Expired or missing. + ch, err = fetchChangelog(schemaChangelogURL) + if err != nil { + return nil, err + } + e.cacheChangelogMu.Lock() + e.cachedChangelog[schemaChangelogURL] = ch + e.cacheChangelogMu.Unlock() + return ch, nil +} + +func schemaChangelogURL(schemaURL string) string { + // NOTE(bwplotka): Be careful with path as it cleans potential http:// to http:/ + dir, _ := path.Split(schemaURL) + return fmt.Sprintf("%v/latest/.gen/changelog.yaml", dir) +} + +func schemaIDsURL(schemaURL string) string { + // NOTE(bwplotka): Be careful with path as it cleans potential http:// to http:/ + dir, _ := path.Split(schemaURL) + return fmt.Sprintf("%v/latest/.gen/ids.yaml", dir) +} + +// FindVariants returns all variants for a single schematized (referenced by schema_url) metric. +// It returns error if the given matchers does not point to a single metric or if schema or variants couldn't +// be detected. +func (e *schemaEngine) FindVariants(schemaURL string, originalMatchers []*labels.Matcher) (variants []*variant, _ error) { + matchers, err := newMatcherBuilder(originalMatchers) + if err != nil { + return nil, err + } + + mID, magicSuffix, err := e.getMetricID(schemaURL, matchers) + if err != nil { + return nil, fmt.Errorf("getMetricID: %w", err) + } + ch, err := e.fetchChangelog(schemaChangelogURL(schemaURL)) + if err != nil { + return nil, err + } + sID, rev := mID.semanticID() + changes, ok := ch.MetricsChangelog[sID] + if !ok { + return nil, fmt.Errorf("schema is malformed or cache is not consistent; can't find changes for semantic ID %v", sID) + } + + variants = append(variants, &variant{ + matchers: matchers.ToMatchers(""), + }) + if len(changes) == 0 { + // No changes, only one variant--the original metric. + return nil, nil + } + + // Revision starts with 0, then 2,3,4... + if rev != 0 { + rev-- + } + + // Changelog contains changes across revisions, traverse up and down. + variants, err = traverseChanges(changes, rev, true, matchers, magicSuffix, variants) + if err != nil { + return nil, fmt.Errorf("can't traverse changes for semantic ID %v: %w", sID, err) + } + variants, err = traverseChanges(changes, rev, false, matchers, magicSuffix, variants) + if err != nil { + return nil, fmt.Errorf("can't traverse changes for semantic ID %v: %w", sID, err) + } + return variants, nil +} + +type resultTransform struct { + to metricGroupChange + from metricGroupChange + vt *valueTransformer + magicSuffix string +} + +type variant struct { + matchers []*labels.Matcher + result resultTransform +} + +// TODO(bwplotka): Fix known gap - we have to chain to's and from's for more traversal lenght than 1 (similar to what we do with matchers). +func traverseChanges(changes []change, rev int, up bool, b matcherBuilder, magicSuffix string, v []*variant) ([]*variant, error) { + var to, from metricGroupChange + if up { + if len(changes) <= rev { + return v, nil + } + to = changes[rev].Forward + from = changes[rev].Backward + rev++ // up starts with the current rev, and then goes up. + } else { + rev-- + if rev < 0 { + return v, nil + } + to = changes[rev].Backward + from = changes[rev].Forward + } + + // Transform matchers. + if to.MetricName != "" { + b.metric.Name = to.MetricName + } + if to.Unit != "" { + b.metric.Unit = to.DirectUnit() + } + for a := range to.Attributes { + aTo := to.Attributes[a] + aFrom := from.Attributes[a] + // TODO(bwplotka): In current logic, tag MUST be specified, + // otherwise the engine would need to fetch full metric definition and + // to get the tag -> ID of attribute (or separate attribute tag to IDs index). + for m := range b.other { + // Find the attribute under the "old" name. + if b.other[m].Name == aFrom.Tag { + old := b.other[m] + value := b.other[m].Value + for member := range aTo.Members { + if old.Matches(aFrom.Members[member].Value) { + // TODO(bwplotka): Pretty yolo e.g. should we also replace partial use in regex? + value = strings.Replace(value, aFrom.Members[member].Value, aTo.Members[member].Value, -1) + } + } + b.other[m] = labels.MustNewMatcher(old.Type, aTo.Tag, value) + break + } + } + } + + var vt *valueTransformer + if from.ValuePromQL != "" { + var err error + vt, err = newValueTransformer(from.ValuePromQL) + if err != nil { + return nil, err + } + } + + return traverseChanges(changes, rev, up, b, magicSuffix, append(v, &variant{ + matchers: b.ToMatchers(magicSuffix), + result: resultTransform{ + // Transformation from -> to is for matchers, for results we need to revert that + // transformation, so below code uses to -> from. + from: to, + to: from, + vt: vt, + magicSuffix: magicSuffix, + }, + })) +} + +type transformingSeriesSet struct { + storage.SeriesSet + + result resultTransform +} + +// SeriesSet returns variant SeriesSet that transforms data on the fly +// based on variant to and from change details. +func (v *variant) SeriesSet(s storage.SeriesSet) storage.SeriesSet { + return &transformingSeriesSet{SeriesSet: s, result: v.result} +} + +type transformingSeries struct { + storage.Series + + lbls labels.Labels + + result resultTransform +} + +func (s *transformingSeriesSet) At() storage.Series { + at := s.SeriesSet.At() + return &transformingSeries{Series: at, lbls: at.Labels(), result: s.result} +} + +func (s *transformingSeries) Labels() labels.Labels { + typ := s.lbls.MetricIdentity().Type + + builder := labels.NewBuilder(s.lbls) + builder.Range(func(l labels.Label) { + + nameswitch: + switch l.Name { + case labels.MetricName: + if s.result.to.MetricName != "" { + builder.Set(l.Name, s.result.to.MetricName+s.result.magicSuffix) + } + case "__type__": + return + case schemaURLLabel: + // Explicitly remove __schema_url__ as that would be misleading. + builder.Del(l.Name) + case "__unit__": + if s.result.to.Unit != "" { + builder.Set(l.Name, strings.Trim(s.result.to.Unit, "{}")) + } + case "le": + if typ == model.MetricTypeHistogram { + val, err := strconv.ParseFloat(l.Value, 64) + if err != nil { + fmt.Println("ERROR", err) + } + builder.Set(l.Name, model.FloatString(s.result.vt.Transform(val)).String()) + } + default: + for a := range s.result.to.Attributes { + if l.Name != s.result.from.Attributes[a].Tag { + continue + } + builder.Del(l.Name) + for m := range s.result.from.Attributes[a].Members { + if l.Value != s.result.from.Attributes[a].Members[m].Value { + continue + } + builder.Set(s.result.to.Attributes[a].Tag, s.result.to.Attributes[a].Members[m].Value) + break nameswitch + } + builder.Set(s.result.to.Attributes[a].Tag, l.Value) + break nameswitch + } + } + }) + return builder.Labels() +} + +type transformingIterator struct { + chunkenc.Iterator + + typ model.MetricType + result resultTransform +} + +func (s *transformingSeries) Iterator(i chunkenc.Iterator) chunkenc.Iterator { + return &transformingIterator{Iterator: s.Series.Iterator(i), typ: s.lbls.MetricIdentity().Type, result: s.result} +} + +func (i *transformingIterator) At() (int64, float64) { + t, v := i.Iterator.At() + // TODO(bwplotka): Do the same for summaries. + if i.typ == model.MetricTypeHistogram && (i.result.magicSuffix == "_count" || i.result.magicSuffix == "_bucket") { + return t, v + } + return t, i.result.vt.Transform(v) +} + +func (i *transformingIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { + t, hist := i.Iterator.AtHistogram(h) + // TODO: You can't really scale native histograms with exponential scheme. Handle this (error, approx, validation). + + if hist.UsesCustomBuckets() { + hist = hist.Copy() + hist.Sum = i.result.vt.Transform(hist.Sum) + for cvi := range hist.CustomValues { + hist.CustomValues[cvi] = i.result.vt.Transform(hist.CustomValues[cvi]) + } + } + return t, hist +} + +func (i *transformingIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { + t, hist := i.Iterator.AtFloatHistogram(fh) + // TODO: You can't really scale native histograms with exponential scheme. Handle this (error, approx, validation). + + if hist.UsesCustomBuckets() { + hist = hist.Copy() + hist.Sum = i.result.vt.Transform(hist.Sum) + for cvi := range hist.CustomValues { + hist.CustomValues[cvi] = i.result.vt.Transform(hist.CustomValues[cvi]) + } + } + return t, hist +} diff --git a/semconv/engine_test.go b/semconv/engine_test.go new file mode 100644 index 0000000000..16ff3049c6 --- /dev/null +++ b/semconv/engine_test.go @@ -0,0 +1,127 @@ +package semconv + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/util/testutil" +) + +func TestEngine_FindVariants(t *testing.T) { + for _, tcase := range []struct { + schemaURL string + matchers []*labels.Matcher + + expectedVariants []*variant + expectedErr error + }{ + // TODO(bwplotka): Add only original variant case. + { + schemaURL: "./testdata/v1.1.0", matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, "./testdata/v1.1.0"), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds"), + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + expectedVariants: []*variant{ + { + // Original. + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds"), + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + { + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds"), + labels.MustNewMatcher(labels.MatchEqual, "__unit__", "milliseconds"), + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + result: resultTransform{ + to: testdataLatencyChanges[0].Forward, + from: testdataLatencyChanges[0].Backward, + }, + }, + }, + }, + { + schemaURL: "./testdata/v1.0.0", matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, "./testdata/v1.0.0"), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds"), + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + expectedVariants: []*variant{ + { + // Original. + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds"), + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + { + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds"), + labels.MustNewMatcher(labels.MatchEqual, "__unit__", "seconds"), + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + result: resultTransform{ + to: testdataLatencyChanges[0].Backward, + from: testdataLatencyChanges[0].Forward, + }, + }, + }, + }, + // TODO(bwplotka): Test ambiguous matcher errors etc. + { + schemaURL: "./testdata/v1.1.0", matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, "./testdata/v1.1.0"), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_custom_elements_changed_total"), + labels.MustNewMatcher(labels.MatchNotEqual, "number", "2"), + labels.MustNewMatcher(labels.MatchRegexp, "class", "FIRST|OTHER"), + labels.MustNewMatcher(labels.MatchEqual, "fraction", "1.2"), + }, + expectedVariants: []*variant{ + { + // Original. + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_custom_elements_changed_total"), + labels.MustNewMatcher(labels.MatchNotEqual, "number", "2"), + labels.MustNewMatcher(labels.MatchRegexp, "class", "FIRST|OTHER"), + labels.MustNewMatcher(labels.MatchEqual, "fraction", "1.2"), + }, + }, + { + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_custom_elements_total"), + labels.MustNewMatcher(labels.MatchNotEqual, "integer", "2"), + labels.MustNewMatcher(labels.MatchRegexp, "category", "first|other"), + labels.MustNewMatcher(labels.MatchEqual, "fraction", "1.2"), + }, + result: resultTransform{ + to: testdataElementsChanges[0].Forward, + from: testdataElementsChanges[0].Backward, + }, + }, + }, + }, + } { + + t.Run("", func(t *testing.T) { + e := newSchemaEngine() + got, err := e.FindVariants(tcase.schemaURL, tcase.matchers) + if tcase.expectedErr != nil { + require.ErrorContains(t, err, tcase.expectedErr.Error()) + return + } + require.NoError(t, err) + testutil.RequireEqualWithOptions(t, tcase.expectedVariants, got, []cmp.Option{ + cmp.Comparer(func(a, b *labels.Matcher) bool { + return a.Name == b.Name && a.Type == b.Type && a.Value == b.Value + }), + cmp.AllowUnexported(variant{}), + }) + }) + } +} diff --git a/semconv/gen_files.go b/semconv/gen_files.go new file mode 100644 index 0000000000..2b1551909f --- /dev/null +++ b/semconv/gen_files.go @@ -0,0 +1,160 @@ +package semconv + +import ( + "fmt" + "io" + "net/http" + "os" + "strconv" + "strings" + "time" + + "gopkg.in/yaml.v3" +) + +type metricID string + +type semanticMetricID string + +func (id metricID) semanticID() (_ semanticMetricID, revision int) { + parts := strings.Split(string(id), ".") + if len(parts) == 0 { + return semanticMetricID(id), 0 + } + var err error + revision, err = strconv.Atoi(parts[len(parts)-1]) + if err != nil { + return semanticMetricID(id), 0 + } + // Number, assume revision. + return semanticMetricID(strings.Join(parts[:len(parts)-1], ".")), revision +} + +type changelog struct { + Version int `yaml:"version"` + + MetricsChangelog map[semanticMetricID][]change `yaml:"metrics_changelog"` + + fetchTime time.Time +} + +type change struct { + Forward metricGroupChange + Backward metricGroupChange +} + +// metricGroupChange represents semconv metric group. +// NOTE(bwplotka): Only implementing fields that matter for querying. +type metricGroupChange struct { + MetricName string `yaml:"metric_name"` + Unit string `yaml:"unit"` + ValuePromQL string `yaml:"value_promql"` + Attributes []attribute `yaml:"attributes"` +} + +func (m metricGroupChange) DirectUnit() string { + return strings.TrimSuffix(strings.TrimPrefix(m.Unit, "{"), "}") +} + +type attribute struct { + Tag string `yaml:"tag"` + Members []attributeMember `yaml:"members"` +} + +type attributeMember struct { + Value string `yaml:"value"` +} + +func fetchChangelog(schemaChangelogURL string) (_ *changelog, err error) { + ch := &changelog{} + if err := fetchAndUnmarshal(schemaChangelogURL, ch); err != nil { + return nil, err + } + return ch, nil +} + +type ids struct { + Version int `yaml:"version"` + + MetricsIDs map[string][]versionedID `yaml:"metrics_ids"` + uniqueNameToIdentity map[string]string + uniqueNameTypeToIdentity map[string]string + + fetchTime time.Time +} + +func fetchIDs(schemaIDsURL string) (_ *ids, err error) { + i := &ids{ + uniqueNameTypeToIdentity: make(map[string]string), + uniqueNameToIdentity: make(map[string]string), + } + if err := fetchAndUnmarshal(schemaIDsURL, i); err != nil { + return nil, err + } + + for id := range i.MetricsIDs { + var name, nameType string + // Parse identity in a form of name~unit.type or name.type. + parts := strings.Split(id, "~") + if len(parts) > 1 { + name = parts[0] + + unitAndType := strings.TrimPrefix(id, name+"~") + parts = strings.Split(unitAndType, ".") + nameType = name + "." + parts[len(parts)-1] + } else { + parts := strings.Split(id, ".") + name = parts[0] + nameType = id + } + + if _, ok := i.uniqueNameToIdentity[name]; ok { + // Not unique, put sentinel "" which will trigger error. + i.uniqueNameToIdentity[name] = "" + } else { + i.uniqueNameToIdentity[name] = id + } + + if _, ok := i.uniqueNameTypeToIdentity[nameType]; ok { + // Not unique, put sentinel "" which will trigger error. + i.uniqueNameTypeToIdentity[nameType] = "" + } else { + i.uniqueNameTypeToIdentity[nameType] = id + } + } + return i, nil +} + +type versionedID struct { + ID metricID `yaml:"id"` + IntroVersion string `yaml:"intro_version"` +} + +func fetchAndUnmarshal[T any](url string, out *T) (err error) { + var b []byte + if strings.HasPrefix(url, "http") { + resp, err := http.Get(url) + if err != nil { + return fmt.Errorf("http fetch %v: %w", url, err) + } + if resp.StatusCode/100 != 2 { + // TODO(bwplotka): Print potential body? + return fmt.Errorf("http fetch %v, got non-200 status: %v", url, resp.StatusCode) + } + + // TODO(bwplotka): Add limit. + b, err = io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("read all from http %v: %w", url, err) + } + } else { + b, err = os.ReadFile(url) + if err != nil { + return fmt.Errorf("read all from file %v: %w", url, err) + } + } + if err := yaml.Unmarshal(b, out); err != nil { + return err + } + return nil +} diff --git a/semconv/gen_files_test.go b/semconv/gen_files_test.go new file mode 100644 index 0000000000..41350b2cb7 --- /dev/null +++ b/semconv/gen_files_test.go @@ -0,0 +1,125 @@ +package semconv + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +var ( + testdataElementsChanges = []change{ + { + Forward: metricGroupChange{MetricName: "my_app_custom_elements_changed_total", Unit: "", ValuePromQL: "", Attributes: []attribute{{Tag: "number"}, {Tag: "class", Members: []attributeMember{{Value: "FIRST"}, {Value: "SECOND"}, {Value: "OTHER"}}}}}, + Backward: metricGroupChange{MetricName: "my_app_custom_elements_total", Unit: "", ValuePromQL: "", Attributes: []attribute{{Tag: "integer"}, {Tag: "category", Members: []attributeMember{{Value: "first"}, {Value: "second"}, {Value: "other"}}}}}, + }, + } + testdataLatencyChanges = []change{ + { + Forward: metricGroupChange{MetricName: "my_app_latency_seconds_total", Unit: "{seconds}", ValuePromQL: "$old / 1000"}, + Backward: metricGroupChange{MetricName: "my_app_latency_milliseconds_total", Unit: "{milliseconds}", ValuePromQL: "$new * 1000"}, + }, + } +) + +func TestFetchChangelog(t *testing.T) { + expected := &changelog{ + Version: 1, + MetricsChangelog: map[semanticMetricID][]change{ + "my_app_custom_elements": testdataElementsChanges, + "my_app_latency": testdataLatencyChanges, + "my_app_some_elements_totals": nil, + }, + } + + t.Run("local", func(t *testing.T) { + got, err := fetchChangelog("./testdata/latest/.gen/changelog.yaml") + require.NoError(t, err) + require.Equal(t, expected, got) + }) + // TODO(bwplotka): Move to something Prometheus owns e.g. internal Prometheus repo path. + t.Run("http", func(t *testing.T) { + got, err := fetchChangelog("https://raw.githubusercontent.com/bwplotka/metric-rename-demo/refs/heads/diff/my-org/semconv/v1.1.0/.gen/changelog.yaml") + require.NoError(t, err) + require.Equal(t, expected, got) + }) + t.Run("http-custom", func(t *testing.T) { + got, err := fetchChangelog("https://bwplotka.dev/semconv/latest/.gen/changelog.yaml") + require.NoError(t, err) + require.Equal(t, expected, got) + }) +} + +func TestFetchIDs(t *testing.T) { + expected := &ids{ + Version: 1, + MetricsIDs: map[string][]versionedID{ + "my_app_custom_elements_changed_total.counter": { + { + ID: "my_app_custom_elements.2", + IntroVersion: "v1.1.0", + }, + }, + "my_app_custom_elements_total.counter": { + { + ID: "my_app_custom_elements", + IntroVersion: "v1.0.0", + }, + }, + "my_app_latency_milliseconds_total~milliseconds.histogram": { + { + ID: "my_app_latency", + IntroVersion: "v1.0.0", + }, + }, + "my_app_latency_seconds_total~seconds.histogram": { + {ID: "my_app_latency.2", + IntroVersion: "v1.1.0"}, + }, + "my_app_some_elements_totals~gauge": { + { + ID: "my_app_some_elements", + IntroVersion: "v1.0.0", + }, + }, + }, + uniqueNameToIdentity: map[string]string{ + "my_app_custom_elements_changed_total": "my_app_custom_elements_changed_total.counter", + "my_app_custom_elements_total": "my_app_custom_elements_total.counter", + "my_app_latency_milliseconds_total": "my_app_latency_milliseconds_total~milliseconds.histogram", + "my_app_latency_seconds_total": "my_app_latency_seconds_total~seconds.histogram", + "my_app_some_elements_totals": "my_app_some_elements_totals.gauge", + }, + uniqueNameTypeToIdentity: map[string]string{ + "my_app_custom_elements_changed_total.counter": "my_app_custom_elements_changed_total.counter", + "my_app_custom_elements_total.counter": "my_app_custom_elements_total.counter", + "my_app_latency_milliseconds_total.histogram": "my_app_latency_milliseconds_total~milliseconds.histogram", + "my_app_latency_seconds_total.histogram": "my_app_latency_seconds_total~seconds.histogram", + "my_app_some_elements_totals.gauge": "my_app_some_elements_totals.gauge", + }, + } + + t.Run("local", func(t *testing.T) { + got, err := fetchIDs("./testdata/latest/.gen/ids.yaml") + require.NoError(t, err) + require.Equal(t, expected, got) + }) + t.Run("http", func(t *testing.T) { + got, err := fetchIDs("https://raw.githubusercontent.com/bwplotka/metric-rename-demo/refs/heads/diff/my-org/semconv/latest/.gen/ids.yaml") + require.NoError(t, err) + require.Equal(t, expected, got) + }) +} + +func TestSemanticMetricID(t *testing.T) { + gotID, gotRev := metricID("my_app_custom_elements").semanticID() + require.Equal(t, semanticMetricID("my_app_custom_elements"), gotID) + require.Equal(t, 0, gotRev) + + gotID, gotRev = metricID("my_app_custom_elements.yolo").semanticID() + require.Equal(t, semanticMetricID("my_app_custom_elements.yolo"), gotID) + require.Equal(t, 0, gotRev) + + gotID, gotRev = metricID("my_app_custom_elements.2").semanticID() + require.Equal(t, semanticMetricID("my_app_custom_elements"), gotID) + require.Equal(t, 2, gotRev) +} diff --git a/semconv/storage.go b/semconv/storage.go new file mode 100644 index 0000000000..b71bec17ef --- /dev/null +++ b/semconv/storage.go @@ -0,0 +1,117 @@ +package semconv + +import ( + "context" + "errors" + "fmt" + "sync" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/annotations" +) + +const schemaURLLabel = "__schema_url__" + +// AwareStorage wraps given storage with a semconv awareness that +// performs versioned read when __schema_url__ matcher is provided. +// TODO(bwplotka): Technically we only need Querier? +func AwareStorage(s storage.Storage) storage.Storage { + return &awareStorage{Storage: s, engine: newSchemaEngine()} +} + +type awareStorage struct { + storage.Storage + + engine *schemaEngine +} + +type awareQuerier struct { + storage.Querier + + engine *schemaEngine +} + +func (s *awareStorage) Querier(mint, maxt int64) (storage.Querier, error) { + q, err := s.Storage.Querier(mint, maxt) + if err != nil { + return nil, err + } + return &awareQuerier{Querier: q, engine: s.engine}, nil +} + +type annotatedSeriesSet struct { + storage.SeriesSet + + warning string +} + +func annotateSeriesSet(s storage.SeriesSet, warning string) storage.SeriesSet { + return &annotatedSeriesSet{warning: warning, SeriesSet: s} +} + +func (s *annotatedSeriesSet) Warnings() annotations.Annotations { + got := s.SeriesSet.Warnings() + return got.Add(errors.New(s.warning)) +} + +func (q *awareQuerier) Select(ctx context.Context, sort bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + var schemaURL string + + for _, m := range matchers { + if m.Name != schemaURLLabel { + continue + } + if schemaURL != "" { + return annotateSeriesSet( + q.Querier.Select(ctx, sort, hints, matchers...), + fmt.Sprintf("schema: __schema_url__ matcher was used more than once, schematization logic is skipped for %v", matchers), + ) + } + if m.Type != labels.MatchEqual { + return annotateSeriesSet( + q.Querier.Select(ctx, sort, hints, matchers...), + fmt.Sprintf("schema: __schema_url__ matcher is ambigious (not equal type), schematization logic is skipped for %v", matchers), + ) + } + schemaURL = m.Value + } + if schemaURL == "" { + return q.Querier.Select(ctx, sort, hints, matchers...) + } + + variants, err := q.engine.FindVariants(schemaURL, matchers) + if err != nil { + return annotateSeriesSet( + q.Querier.Select(ctx, sort, hints, matchers...), + fmt.Errorf("schema: failed to find variants %w, schematization logic is skipped for %v", err, matchers).Error(), + ) + } + + var ( + wg sync.WaitGroup + seriesSetChan = make(chan storage.SeriesSet) + seriesSet = make([]storage.SeriesSet, 0, len(variants)) + ) + + // TODO(bwplotka): Async limit? + // Lookup alternative variants. + for _, v := range variants { + wg.Add(1) + go func(m []*labels.Matcher) { + defer wg.Done() + + // We need to sort for NewMergeSeriesSet to work. + seriesSetChan <- v.SeriesSet(q.Querier.Select(ctx, true, hints, m...)) + }(v.matchers) + } + go func() { + wg.Wait() + close(seriesSetChan) + }() + + for r := range seriesSetChan { + seriesSet = append(seriesSet, r) + } + return storage.NewMergeSeriesSet(seriesSet, 0, storage.ChainedSeriesMerge) +} diff --git a/semconv/storage_test.go b/semconv/storage_test.go new file mode 100644 index 0000000000..b3cb1940b7 --- /dev/null +++ b/semconv/storage_test.go @@ -0,0 +1,611 @@ +package semconv + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" +) + +func testSchemaURL(version string) string { + //return "https://bwplotka.dev/semconv/" + version + return "./testdata/" + version +} + +var ( + testdataElementsSeriesOld = labels.FromStrings( + "__name__", "my_app_custom_elements_total", + "__schema_url__", testSchemaURL("v1.0.0"), + "__type__", "counter", + "integer", "1", + "category", "first", + "fraction", "1.243", + "test", "old", + ) + testdataElementsSeriesNew = labels.FromStrings( + "__name__", "my_app_custom_elements_changed_total", + "__schema_url__", testSchemaURL("v1.1.0"), + "__type__", "counter", + "number", "1", + "class", "FIRST", + "fraction", "1.243", + "test", "new", + ) + testdataLatencySeriesOld = labels.FromStrings( + "__name__", "my_app_latency_milliseconds", + "__schema_url__", testSchemaURL("v1.0.0"), + "__type__", "histogram", + "__unit__", "milliseconds", + "code", "200", + "test", "old", + ) + testdataLatencySeriesNew = labels.FromStrings( + "__name__", "my_app_latency_seconds", + "__schema_url__", testSchemaURL("v1.1.0"), + "__type__", "histogram", + "__unit__", "seconds", + "code", "200", + "test", "new", + ) +) + +type appendSeries struct { + series labels.Labels + samples []chunks.Sample +} + +func openTestDB(t testing.TB, opts *tsdb.Options, dataToAppend []appendSeries) (db *tsdb.DB) { + t.Helper() + + tmpdir := t.TempDir() + if opts == nil { + opts = tsdb.DefaultOptions() + } + opts.EnableNativeHistograms = true + + db, err := tsdb.Open(tmpdir, nil, nil, opts, nil) + require.NoError(t, err) + t.Cleanup(func() { + _ = db.Close + }) + + // Append test data. + ctx := context.Background() + app := db.Appender(ctx) + for _, a := range dataToAppend { + for _, s := range a.samples { + if s.H() != nil || s.FH() != nil { + _, err = app.AppendHistogram(0, a.series, s.T(), s.H(), nil) + require.NoError(t, err) + } else { + _, err = app.Append(0, a.series, s.T(), s.F()) + require.NoError(t, err) + } + } + } + require.NoError(t, app.Commit()) + return db +} + +func testNHCB(i int) *histogram.Histogram { + return &histogram.Histogram{ + Schema: histogram.CustomBucketsSchema, + CounterResetHint: func() histogram.CounterResetHint { + if i == 0 { + return 0 + } + return 2 + }(), + Count: 10 + uint64(i), + Sum: 2.7 + float64(i), + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 4}, + {Offset: 0, Length: 0}, + {Offset: 0, Length: 3}, + }, + PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0 + int64(i)}, + CustomValues: []float64{5, 10, 20, 50, 100, 500}, + } +} + +type sample struct { + t int64 + f float64 + h *histogram.Histogram + fh *histogram.FloatHistogram +} + +func newSample(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample { + return sample{t, v, h, fh} +} + +func (s sample) T() int64 { return s.t } +func (s sample) F() float64 { return s.f } +func (s sample) H() *histogram.Histogram { return s.h } +func (s sample) FH() *histogram.FloatHistogram { return s.fh } + +func (s sample) Type() chunkenc.ValueType { + switch { + case s.h != nil: + return chunkenc.ValHistogram + case s.fh != nil: + return chunkenc.ValFloatHistogram + default: + return chunkenc.ValFloat + } +} + +func (s sample) Copy() chunks.Sample { + c := sample{t: s.t, f: s.f} + if s.h != nil { + c.h = s.h.Copy() + } + if s.fh != nil { + c.fh = s.fh.Copy() + } + return c +} + +func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample { + t.Helper() + + ss := q.Select(context.Background(), false, nil, matchers...) + + var it chunkenc.Iterator + result := map[string][]chunks.Sample{} + for ss.Next() { + series := ss.At() + + it = series.Iterator(it) + samples, err := storage.ExpandSamples(it, newSample) + require.NoError(t, err) + require.NoError(t, it.Err()) + + if len(samples) == 0 { + continue + } + + name := series.Labels().String() + result[name] = samples + } + require.NoError(t, ss.Err()) + require.Empty(t, ss.Warnings()) + + return result +} + +func scaleSamples(samples []chunks.Sample, up bool, value float64) []chunks.Sample { + ret := make([]chunks.Sample, len(samples)) + for i, s := range samples { + if fh := s.FH(); fh != nil { + if !fh.UsesCustomBuckets() { + panic("can't scale native histograms ") + } + fh = fh.Copy() + if up { + fh.Sum = fh.Sum * value + for cvi := range fh.CustomValues { + fh.CustomValues[cvi] = fh.CustomValues[cvi] * value + } + } else { + fh.Sum = fh.Sum / value + for cvi := range fh.CustomValues { + fh.CustomValues[cvi] = fh.CustomValues[cvi] / value + } + } + ret[i] = sample{t: s.T(), fh: fh} + continue + } + if h := s.H(); h != nil { + if !h.UsesCustomBuckets() { + panic("can't scale native histograms ") + } + h = h.Copy() + if up { + h.Sum = h.Sum * value + for cvi := range h.CustomValues { + h.CustomValues[cvi] = h.CustomValues[cvi] * value + } + } else { + h.Sum = h.Sum / value + for cvi := range h.CustomValues { + h.CustomValues[cvi] = h.CustomValues[cvi] / value + } + } + ret[i] = sample{t: s.T(), h: h} + continue + } + if up { + ret[i] = sample{t: s.T(), f: s.F() * value} + } else { + ret[i] = sample{t: s.T(), f: s.F() / value} + } + } + return ret +} + +func TestScaleSamples(t *testing.T) { + t.Run("nhcb", func(t *testing.T) { + testNHCBSamples := make([]chunks.Sample, 10) + for i := range 10 { + testNHCBSamples[i] = sample{ + t: int64(i), + h: testNHCB(i), + } + } + scaled := scaleSamples(testNHCBSamples, true, 1000) + require.NotEqual(t, testNHCBSamples, scaled) + }) +} + +func TestAwareStorage(t *testing.T) { + const samples = 10 + + testFSamples := make([]chunks.Sample, samples) + for i := range samples { + testFSamples[i] = sample{ + t: int64(i), + f: float64(i), + } + } + testNHCBSamples := make([]chunks.Sample, samples) + for i := range samples { + testNHCBSamples[i] = sample{ + t: int64(i), + h: testNHCB(i), + } + } + + t.Run("counter", func(t *testing.T) { + db := openTestDB(t, nil, []appendSeries{ + {series: testdataElementsSeriesOld, samples: testFSamples}, + {series: testdataElementsSeriesNew, samples: testFSamples}, + }) + + notAware, err := db.Querier(0, samples) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, notAware.Close()) + }) + aware, err := AwareStorage(db).Querier(0, samples) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, aware.Close()) + }) + + t.Run("backward", func(t *testing.T) { + onlyNewResult := query(t, notAware, + labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.1.0")), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataElementsSeriesNew.MetricIdentity().Name), + labels.MustNewMatcher(labels.MatchNotEqual, "number", "2"), + labels.MustNewMatcher(labels.MatchRegexp, "class", "FIRST|OTHER"), + labels.MustNewMatcher(labels.MatchEqual, "fraction", testdataElementsSeriesNew.Get("fraction")), + ) + require.Equal(t, map[string][]chunks.Sample{ + `{__name__="my_app_custom_elements_changed_total", __schema_url__="` + testSchemaURL("v1.1.0") + `", __type__="counter", class="FIRST", fraction="1.243", number="1", test="new"}`: testFSamples, + }, onlyNewResult) + got := query(t, aware, + // Without schema selector, semconv aware storage should have no effect. + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataElementsSeriesNew.MetricIdentity().Name), + labels.MustNewMatcher(labels.MatchNotEqual, "number", "2"), + labels.MustNewMatcher(labels.MatchRegexp, "class", "FIRST|OTHER"), + labels.MustNewMatcher(labels.MatchEqual, "fraction", testdataElementsSeriesNew.Get("fraction")), + ) + require.Equal(t, onlyNewResult, got) + + compatibleResult := query(t, aware, + labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testdataElementsSeriesNew.Get(schemaURLLabel)), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataElementsSeriesNew.MetricIdentity().Name), + labels.MustNewMatcher(labels.MatchNotEqual, "number", "2"), + labels.MustNewMatcher(labels.MatchRegexp, "class", "FIRST|OTHER"), + labels.MustNewMatcher(labels.MatchEqual, "fraction", testdataElementsSeriesNew.Get("fraction")), + ) + require.Equal(t, map[string][]chunks.Sample{ + `{__name__="my_app_custom_elements_changed_total", __type__="counter", class="FIRST", fraction="1.243", number="1", test="new"}`: testFSamples, + `{__name__="my_app_custom_elements_changed_total", __type__="counter", class="FIRST", fraction="1.243", number="1", test="old"}`: testFSamples, + }, compatibleResult) + }) + t.Run("forward", func(t *testing.T) { + onlyOldResult := query(t, notAware, + labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testdataElementsSeriesOld.Get(schemaURLLabel)), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataElementsSeriesOld.MetricIdentity().Name), + labels.MustNewMatcher(labels.MatchNotEqual, "integer", "2"), + labels.MustNewMatcher(labels.MatchRegexp, "category", "first|other"), + labels.MustNewMatcher(labels.MatchEqual, "fraction", testdataElementsSeriesOld.Get("fraction")), + ) + require.Equal(t, map[string][]chunks.Sample{ + `{__name__="my_app_custom_elements_total", __schema_url__="` + testSchemaURL("v1.0.0") + `", __type__="counter", category="first", fraction="1.243", integer="1", test="old"}`: testFSamples, + }, onlyOldResult) + got := query(t, aware, + // Without schema selector, semconv aware storage should have no effect. + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataElementsSeriesOld.MetricIdentity().Name), + labels.MustNewMatcher(labels.MatchNotEqual, "integer", "2"), + labels.MustNewMatcher(labels.MatchRegexp, "category", "first|other"), + labels.MustNewMatcher(labels.MatchEqual, "fraction", testdataElementsSeriesOld.Get("fraction")), + ) + require.Equal(t, onlyOldResult, got) + + compatibleResult := query(t, aware, + labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testdataElementsSeriesOld.Get(schemaURLLabel)), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataElementsSeriesOld.MetricIdentity().Name), + labels.MustNewMatcher(labels.MatchNotEqual, "integer", "2"), + labels.MustNewMatcher(labels.MatchRegexp, "category", "first|other"), + labels.MustNewMatcher(labels.MatchEqual, "fraction", testdataElementsSeriesOld.Get("fraction")), + ) + require.Equal(t, map[string][]chunks.Sample{ + `{__name__="my_app_custom_elements_total", __type__="counter", category="first", fraction="1.243", integer="1", test="old"}`: testFSamples, + `{__name__="my_app_custom_elements_total", __type__="counter", category="first", fraction="1.243", integer="1", test="new"}`: testFSamples, + }, compatibleResult) + }) + }) + t.Run("classic histogram", func(t *testing.T) { + var a []appendSeries + for _, m := range []labels.Labels{testdataLatencySeriesNew, testdataLatencySeriesOld} { + b := labels.NewBuilder(m) + if m.Get("test") == "new" { + b.Set("le", "10") + } else { + b.Set("le", "10000") + } + + b.Set("__name__", m.MetricIdentity().Name+"_bucket") + a = append(a, appendSeries{series: b.Labels(), samples: testFSamples}) + b = labels.NewBuilder(m) + b.Set("__name__", m.MetricIdentity().Name+"_count") + a = append(a, appendSeries{series: b.Labels(), samples: testFSamples}) + b = labels.NewBuilder(m) + b.Set("__name__", m.MetricIdentity().Name+"_sum") + a = append(a, appendSeries{series: b.Labels(), samples: testFSamples}) + } + db := openTestDB(t, nil, a) + + notAware, err := db.Querier(0, samples) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, notAware.Close()) + }) + aware, err := AwareStorage(db).Querier(0, samples) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, aware.Close()) + }) + + t.Run("backward", func(t *testing.T) { + t.Run("_bucket", func(t *testing.T) { + onlyNewResult := query(t, notAware, + labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.1.0")), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_bucket"), + labels.MustNewMatcher(labels.MatchEqual, "code", "200"), + ) + require.Equal(t, map[string][]chunks.Sample{ + `{__name__="my_app_latency_seconds_bucket", __schema_url__="` + testSchemaURL("v1.1.0") + `", __type__="histogram", __unit__="seconds", code="200", le="10", test="new"}`: testFSamples, + }, onlyNewResult) + got := query(t, aware, + // Without schema selector, semconv aware storage should have no effect. + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_bucket"), + labels.MustNewMatcher(labels.MatchEqual, "code", "200"), + ) + require.Equal(t, onlyNewResult, got) + + compatibleResult := query(t, aware, + labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.1.0")), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_bucket"), + labels.MustNewMatcher(labels.MatchEqual, "code", "200"), + ) + require.Equal(t, map[string][]chunks.Sample{ + `{__name__="my_app_latency_seconds_bucket", __type__="histogram", __unit__="seconds", code="200", le="10", test="new"}`: testFSamples, + `{__name__="my_app_latency_seconds_bucket", __type__="histogram", __unit__="seconds", code="200", le="10", test="old"}`: testFSamples, + }, compatibleResult) + }) + t.Run("_count", func(t *testing.T) { + onlyNewResult := query(t, notAware, + labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.1.0")), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_count"), + labels.MustNewMatcher(labels.MatchEqual, "code", "200"), + ) + require.Equal(t, map[string][]chunks.Sample{ + `{__name__="my_app_latency_seconds_count", __schema_url__="` + testSchemaURL("v1.1.0") + `", __type__="histogram", __unit__="seconds", code="200", test="new"}`: testFSamples, // TODO(bwplotka): Type and unit proposal is not really consistent with count/sum + }, onlyNewResult) + got := query(t, aware, + // Without schema selector, semconv aware storage should have no effect. + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_count"), + labels.MustNewMatcher(labels.MatchEqual, "code", "200"), + ) + require.Equal(t, onlyNewResult, got) + + compatibleResult := query(t, aware, + labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.1.0")), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_count"), + labels.MustNewMatcher(labels.MatchEqual, "code", "200"), + ) + require.Equal(t, map[string][]chunks.Sample{ + `{__name__="my_app_latency_seconds_count", __type__="histogram", __unit__="seconds", code="200", test="new"}`: testFSamples, + `{__name__="my_app_latency_seconds_count", __type__="histogram", __unit__="seconds", code="200", test="old"}`: testFSamples, + }, compatibleResult) + }) + t.Run("_sum", func(t *testing.T) { + onlyNewResult := query(t, notAware, + labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.1.0")), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_sum"), + labels.MustNewMatcher(labels.MatchEqual, "code", "200"), + ) + require.Equal(t, map[string][]chunks.Sample{ + `{__name__="my_app_latency_seconds_sum", __schema_url__="` + testSchemaURL("v1.1.0") + `", __type__="histogram", __unit__="seconds", code="200", test="new"}`: testFSamples, // TODO(bwplotka): Type and unit proposal is not really consistent with count/sum + }, onlyNewResult) + got := query(t, aware, + // Without schema selector, semconv aware storage should have no effect. + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_sum"), + labels.MustNewMatcher(labels.MatchEqual, "code", "200"), + ) + require.Equal(t, onlyNewResult, got) + + compatibleResult := query(t, aware, + labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.1.0")), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_sum"), + labels.MustNewMatcher(labels.MatchEqual, "code", "200"), + ) + require.Equal(t, map[string][]chunks.Sample{ + `{__name__="my_app_latency_seconds_sum", __type__="histogram", __unit__="seconds", code="200", test="new"}`: testFSamples, + `{__name__="my_app_latency_seconds_sum", __type__="histogram", __unit__="seconds", code="200", test="old"}`: scaleSamples(testFSamples, false, 1000), + }, compatibleResult) + }) + }) + t.Run("forward", func(t *testing.T) { + t.Run("_bucket", func(t *testing.T) { + onlyOldResult := query(t, notAware, + labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.0.0")), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_bucket"), + labels.MustNewMatcher(labels.MatchEqual, "code", "200"), + ) + require.Equal(t, map[string][]chunks.Sample{ + `{__name__="my_app_latency_milliseconds_bucket", __schema_url__="` + testSchemaURL("v1.0.0") + `", __type__="histogram", __unit__="milliseconds", code="200", le="10000", test="old"}`: testFSamples, + }, onlyOldResult) + got := query(t, aware, + // Without schema selector, semconv aware storage should have no effect. + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_bucket"), + labels.MustNewMatcher(labels.MatchEqual, "code", "200"), + ) + require.Equal(t, onlyOldResult, got) + + compatibleResult := query(t, aware, + labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.0.0")), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_bucket"), + labels.MustNewMatcher(labels.MatchEqual, "code", "200"), + ) + require.Equal(t, map[string][]chunks.Sample{ + `{__name__="my_app_latency_milliseconds_bucket", __type__="histogram", __unit__="milliseconds", code="200", le="10000", test="new"}`: testFSamples, + `{__name__="my_app_latency_milliseconds_bucket", __type__="histogram", __unit__="milliseconds", code="200", le="10000", test="old"}`: testFSamples, + }, compatibleResult) + }) + t.Run("_count", func(t *testing.T) { + onlyOldResult := query(t, notAware, + labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.0.0")), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_count"), + labels.MustNewMatcher(labels.MatchEqual, "code", "200"), + ) + require.Equal(t, map[string][]chunks.Sample{ + `{__name__="my_app_latency_milliseconds_count", __schema_url__="` + testSchemaURL("v1.0.0") + `", __type__="histogram", __unit__="milliseconds", code="200", test="old"}`: testFSamples, // TODO(bwplotka): Type and unit proposal is not really consistent with count/sum + }, onlyOldResult) + got := query(t, aware, + // Without schema selector, semconv aware storage should have no effect. + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_count"), + labels.MustNewMatcher(labels.MatchEqual, "code", "200"), + ) + require.Equal(t, onlyOldResult, got) + + compatibleResult := query(t, aware, + labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.0.0")), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_count"), + labels.MustNewMatcher(labels.MatchEqual, "code", "200"), + ) + require.Equal(t, map[string][]chunks.Sample{ + `{__name__="my_app_latency_milliseconds_count", __type__="histogram", __unit__="milliseconds", code="200", test="new"}`: testFSamples, + `{__name__="my_app_latency_milliseconds_count", __type__="histogram", __unit__="milliseconds", code="200", test="old"}`: testFSamples, + }, compatibleResult) + }) + t.Run("_sum", func(t *testing.T) { + onlyOldResult := query(t, notAware, + labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.0.0")), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_sum"), + labels.MustNewMatcher(labels.MatchEqual, "code", "200"), + ) + require.Equal(t, map[string][]chunks.Sample{ + `{__name__="my_app_latency_milliseconds_sum", __schema_url__="` + testSchemaURL("v1.0.0") + `", __type__="histogram", __unit__="milliseconds", code="200", test="old"}`: testFSamples, // TODO(bwplotka): Type and unit proposal is not really consistent with count/sum + }, onlyOldResult) + got := query(t, aware, + // Without schema selector, semconv aware storage should have no effect. + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_sum"), + labels.MustNewMatcher(labels.MatchEqual, "code", "200"), + ) + require.Equal(t, onlyOldResult, got) + + compatibleResult := query(t, aware, + labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.0.0")), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_sum"), + labels.MustNewMatcher(labels.MatchEqual, "code", "200"), + ) + require.Equal(t, map[string][]chunks.Sample{ + `{__name__="my_app_latency_milliseconds_sum", __type__="histogram", __unit__="milliseconds", code="200", test="old"}`: testFSamples, + `{__name__="my_app_latency_milliseconds_sum", __type__="histogram", __unit__="milliseconds", code="200", test="new"}`: scaleSamples(testFSamples, true, 1000), + }, compatibleResult) + }) + }) + }) + t.Run("native histogram", func(t *testing.T) { + db := openTestDB(t, nil, []appendSeries{ + {series: testdataLatencySeriesOld, samples: testNHCBSamples}, + {series: testdataLatencySeriesNew, samples: testNHCBSamples}, + }) + + notAware, err := db.Querier(0, samples) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, notAware.Close()) + }) + aware, err := AwareStorage(db).Querier(0, samples) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, aware.Close()) + }) + + t.Run("backward", func(t *testing.T) { + onlyNewResult := query(t, notAware, + labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.1.0")), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataLatencySeriesNew.MetricIdentity().Name), + labels.MustNewMatcher(labels.MatchEqual, "code", testdataLatencySeriesNew.Get("code")), + ) + require.Equal(t, map[string][]chunks.Sample{ + `{__name__="my_app_latency_seconds", __schema_url__="` + testSchemaURL("v1.1.0") + `", __type__="histogram", __unit__="seconds", code="200", test="new"}`: testNHCBSamples, + }, onlyNewResult) + got := query(t, aware, + // Without schema selector, semconv aware storage should have no effect. + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataLatencySeriesNew.MetricIdentity().Name), + labels.MustNewMatcher(labels.MatchEqual, "code", testdataLatencySeriesNew.Get("code")), + ) + require.Equal(t, onlyNewResult, got) + + compatibleResult := query(t, aware, + labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.1.0")), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataLatencySeriesNew.MetricIdentity().Name), + labels.MustNewMatcher(labels.MatchEqual, "code", testdataLatencySeriesNew.Get("code")), + ) + require.Equal(t, map[string][]chunks.Sample{ + `{__name__="my_app_latency_seconds", __type__="histogram", __unit__="seconds", code="200", test="new"}`: testNHCBSamples, + `{__name__="my_app_latency_seconds", __type__="histogram", __unit__="seconds", code="200", test="old"}`: scaleSamples(testNHCBSamples, false, 1000), + }, compatibleResult) + }) + t.Run("forward", func(t *testing.T) { + onlyOldResult := query(t, notAware, + labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.0.0")), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataLatencySeriesOld.MetricIdentity().Name), + labels.MustNewMatcher(labels.MatchEqual, "code", testdataLatencySeriesOld.Get("code")), + ) + require.Equal(t, map[string][]chunks.Sample{ + `{__name__="my_app_latency_milliseconds", __schema_url__="` + testSchemaURL("v1.0.0") + `", __type__="histogram", __unit__="milliseconds", code="200", test="old"}`: testNHCBSamples, + }, onlyOldResult) + got := query(t, aware, + // Without schema selector, semconv aware storage should have no effect. + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataLatencySeriesOld.MetricIdentity().Name), + labels.MustNewMatcher(labels.MatchEqual, "code", testdataLatencySeriesOld.Get("code")), + ) + require.Equal(t, onlyOldResult, got) + + compatibleResult := query(t, aware, + labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.0.0")), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataLatencySeriesOld.MetricIdentity().Name), + labels.MustNewMatcher(labels.MatchEqual, "code", testdataLatencySeriesOld.Get("code")), + ) + require.Equal(t, map[string][]chunks.Sample{ + `{__name__="my_app_latency_milliseconds", __type__="histogram", __unit__="milliseconds", code="200", test="new"}`: scaleSamples(testNHCBSamples, true, 1000), + `{__name__="my_app_latency_milliseconds", __type__="histogram", __unit__="milliseconds", code="200", test="old"}`: testNHCBSamples, + }, compatibleResult) + }) + }) +} diff --git a/semconv/testdata/latest/.gen/changelog.yaml b/semconv/testdata/latest/.gen/changelog.yaml new file mode 100644 index 0000000000..3659ec3aa8 --- /dev/null +++ b/semconv/testdata/latest/.gen/changelog.yaml @@ -0,0 +1,36 @@ +# Version of this file. +version: 1 + +# changelog contains all changes made to elements with the same semantic id. +metrics_changelog: + my_app_latency: + - forward: + metric_name: my_app_latency_seconds + unit: "{seconds}" + value_promql: "metric{} / 1000" + backward: + metric_name: my_app_latency_milliseconds + unit: "{milliseconds}" + value_promql: "metric{} * 1000" + + my_app_custom_elements: + - forward: + metric_name: my_app_custom_elements_changed_total + attributes: + - tag: "number" + - tag: "class" + members: + - value: "FIRST" + - value: "SECOND" + - value: "OTHER" + backward: + metric_name: my_app_custom_elements_total + attributes: + - tag: "integer" + - tag: "category" + members: + - value: "first" + - value: "second" + - value: "other" + + my_app_some_elements_totals: diff --git a/semconv/testdata/latest/.gen/ids.yaml b/semconv/testdata/latest/.gen/ids.yaml new file mode 100644 index 0000000000..f471dab84e --- /dev/null +++ b/semconv/testdata/latest/.gen/ids.yaml @@ -0,0 +1,20 @@ +# Version of this file. +version: 1 + +# map from identity of an element to its id(s). +metrics_ids: + my_app_latency_seconds~seconds.histogram: + - id: "my_app_latency.2" + intro_version: "v1.1.0" # When introduced. + my_app_custom_elements_changed_total.counter: + - id: "my_app_custom_elements.2" + intro_version: "v1.1.0" + my_app_latency_milliseconds~milliseconds.histogram: + - id: "my_app_latency" + intro_version: "v1.0.0" + my_app_custom_elements_total.counter: + - id: "my_app_custom_elements" + intro_version: "v1.0.0" + my_app_some_elements~gauge: + - id: "my_app_some_elements" + intro_version: "v1.0.0" diff --git a/semconv/testdata/v1.0.0/.gitkeep b/semconv/testdata/v1.0.0/.gitkeep new file mode 100644 index 0000000000..e69de29bb2 diff --git a/semconv/testdata/v1.1.0/.gitkeep b/semconv/testdata/v1.1.0/.gitkeep new file mode 100644 index 0000000000..e69de29bb2 diff --git a/semconv/value.go b/semconv/value.go new file mode 100644 index 0000000000..6b7457a3e3 --- /dev/null +++ b/semconv/value.go @@ -0,0 +1,65 @@ +package semconv + +import ( + "fmt" + + "github.com/prometheus/prometheus/promql/parser" +) + +type valueTransformer struct { + expr parser.Expr +} + +func newValueTransformer(toPromQL string) (*valueTransformer, error) { + p := parser.NewParser(toPromQL) + expr, err := p.ParseExpr() + if err != nil { + return nil, fmt.Errorf("can't parse %v: %w", toPromQL, err) + } + if _, err = transform(expr, 0); err != nil { + return nil, err + } + + return &valueTransformer{expr: expr}, nil +} + +func (t *valueTransformer) Transform(v float64) float64 { + if t == nil { + return v // Noop. + } + // We did what we could and tested transform in constructor, skipping here. + v, _ = transform(t.expr, v) + return v +} + +func transform(node parser.Expr, in float64) (float64, error) { + switch e := node.(type) { + case *parser.NumberLiteral: + return e.Val, nil + case *parser.VectorSelector: + return in, nil + case *parser.BinaryExpr: + lhs, err := transform(e.LHS, in) + if err != nil { + return 0, err + } + rhs, err := transform(e.RHS, in) + if err != nil { + return 0, err + } + switch e.Op { + case parser.ADD: + return lhs + rhs, nil + case parser.SUB: + return lhs - rhs, nil + case parser.MUL: + return lhs * rhs, nil + case parser.DIV: + return lhs / rhs, nil + default: + return 0, fmt.Errorf("binary operator %v not allowed", parser.ItemTypeStr[e.Op]) + } + default: + return 0, fmt.Errorf("PromQL node %v not allowed", e.Type()) + } +} diff --git a/semconv/value_test.go b/semconv/value_test.go new file mode 100644 index 0000000000..84536a6389 --- /dev/null +++ b/semconv/value_test.go @@ -0,0 +1,24 @@ +package semconv + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestValueTransformer(t *testing.T) { + v, err := newValueTransformer("value{} / 1000") + require.NoError(t, err) + require.Equal(t, float64(4), v.Transform(4000)) + require.Equal(t, float64(4), v.Transform(4000)) + + v, err = newValueTransformer("whatever{foo=\"bar\"} * 1024") + require.NoError(t, err) + require.Equal(t, float64(81920), v.Transform(80)) + require.Equal(t, float64(81920), v.Transform(80)) + + v, err = newValueTransformer("a{} + 15 - 44") + require.NoError(t, err) + require.Equal(t, float64(-27), v.Transform(2)) + require.Equal(t, float64(-27), v.Transform(2)) +}