diff --git a/promql/engine.go b/promql/engine.go index 1e3bb355f6..03e31ac115 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1270,7 +1270,7 @@ func (ev *evaluator) rangeEval(ctx context.Context, prepSeries func(labels.Label // If this could be an instant query, shortcut so as not to change sort order. if ev.endTimestamp == ev.startTimestamp { if !ev.enableDelayedNameRemoval && result.ContainsSameLabelset() { - ev.errorf("vector cannot contain metrics with the same labelset") + ev.errorf("vector cannot contain metrics with the same labelset %v", result.String()) } mat := make(Matrix, len(result)) for i, s := range result { @@ -1291,7 +1291,7 @@ func (ev *evaluator) rangeEval(ctx context.Context, prepSeries func(labels.Label ss, ok := seriess[h] if ok { if ss.ts == ts { // If we've seen this output series before at this timestamp, it's a duplicate. - ev.errorf("vector cannot contain metrics with the same labelset") + ev.errorf("vector cannot contain metrics with the same labelset %v", result.String()) } ss.ts = ts } else { @@ -1891,7 +1891,7 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value, } if !ev.enableDelayedNameRemoval && mat.ContainsSameLabelset() { - ev.errorf("vector cannot contain metrics with the same labelset") + ev.errorf("vector cannot contain metrics with the same labelset %v", mat.String()) } return mat, warnings @@ -1915,7 +1915,7 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value, } } if !ev.enableDelayedNameRemoval && mat.ContainsSameLabelset() { - ev.errorf("vector cannot contain metrics with the same labelset") + ev.errorf("vector cannot contain metrics with the same labelset %v", mat.String()) } } return mat, ws @@ -3445,7 +3445,7 @@ func (ev *evaluator) cleanupMetricLabels(v parser.Value) { } } if mat.ContainsSameLabelset() { - ev.errorf("vector cannot contain metrics with the same labelset") + ev.errorf("vector cannot contain metrics with the same labelset %v", mat.String()) } } else if v.Type() == parser.ValueTypeVector { vec := v.(Vector) @@ -3455,7 +3455,7 @@ func (ev *evaluator) cleanupMetricLabels(v parser.Value) { } } if vec.ContainsSameLabelset() { - ev.errorf("vector cannot contain metrics with the same labelset") + ev.errorf("vector cannot contain metrics with the same labelset %v", vec.String()) } } } diff --git a/semconv/engine.go b/semconv/engine.go index 52dd0e1748..cbb737b035 100644 --- a/semconv/engine.go +++ b/semconv/engine.go @@ -3,6 +3,7 @@ package semconv import ( "fmt" "path" + "slices" "strconv" "strings" "sync" @@ -139,9 +140,10 @@ func (e *schemaEngine) getMetricID(schemaURL string, matchers matcherBuilder) (m } for _, id := range vid { - if !natural.Less(id.IntroVersion, schemaVersion) { - return id.ID, magicSuffix, nil + if natural.Less(schemaVersion, id.IntroVersion) { + continue } + return id.ID, magicSuffix, nil } return "", "", fmt.Errorf("can't find metric ID in %v entry for version %v", matchers.metric.String(), schemaVersion) } @@ -211,31 +213,40 @@ func (e *schemaEngine) FindVariants(schemaURL string, originalMatchers []*labels if err != nil { return nil, err } + + // Original selection (without schema url). + variants = append(variants, &variant{matchers: matchers.ToMatchers("")}) + sID, rev := mID.semanticID() - changes, ok := ch.MetricsChangelog[sID] - if !ok { - return nil, fmt.Errorf("schema is malformed or cache is not consistent; can't find changes for semantic ID %v", sID) - } - - variants = append(variants, &variant{ - matchers: matchers.ToMatchers(""), - }) + changes, _ := ch.MetricsChangelog[sID] if len(changes) == 0 { - // No changes, only one variant--the original metric. - return nil, nil + // Unfortunately this (!ok) might also mean the malformed schema or cache. + // __schema__id__ idea would be more robust here. + // We could expect non-changed things in changelog, but that would + // make changelog overly huge. + return variants, nil } - // Revision starts with 0, then 2,3,4... + // Changes are sorted from the newest to the oldest, so reverse this, so + // it's matches the revisions order. + slices.Reverse(changes) + + // Revision starts with 0, then 2,3,4..., uniform it (0,1,2,3...). if rev != 0 { rev-- } - // Changelog contains changes across revisions, traverse up and down. - variants, err = traverseChanges(changes, rev, true, matchers, magicSuffix, variants) + t := &changeTraverser{ + changes: changes, + magicSuffix: magicSuffix, + } + + // Changelog contains changes across revisions, traverse forward and backward. + variants, err = t.traverse(rev, false, matchers, resultTransform{}, variants) if err != nil { return nil, fmt.Errorf("can't traverse changes for semantic ID %v: %w", sID, err) } - variants, err = traverseChanges(changes, rev, false, matchers, magicSuffix, variants) + variants, err = t.traverse(rev, true, matchers, resultTransform{}, variants) if err != nil { return nil, fmt.Errorf("can't traverse changes for semantic ID %v: %w", sID, err) } @@ -245,7 +256,7 @@ func (e *schemaEngine) FindVariants(schemaURL string, originalMatchers []*labels type resultTransform struct { to metricGroupChange from metricGroupChange - vt *valueTransformer + vt valueTransformer magicSuffix string } @@ -254,26 +265,42 @@ type variant struct { result resultTransform } -// TODO(bwplotka): Fix known gap - we have to chain to's and from's for more traversal lenght than 1 (similar to what we do with matchers). -func traverseChanges(changes []change, rev int, up bool, b matcherBuilder, magicSuffix string, v []*variant) ([]*variant, error) { - var to, from metricGroupChange - if up { - if len(changes) <= rev { +type changeTraverser struct { + changes []change + magicSuffix string +} + +// traverse builds the matchers and result transformations for the variant to be queried. +// It then walks further with the new matchers and result transformation as the base for the next change in the chain. +// This allows handling multi-version variants. +// TODO(bwplotka): Consider refactoring for clarity, it's complex, transition to math operations vs semantics. +func (t *changeTraverser) traverse(revision int, newer bool, b matcherBuilder, r resultTransform, v []*variant) ([]*variant, error) { + var ( + to, from metricGroupChange + ) + // Changes are sorted from the oldest to the newest. + if newer { + if len(t.changes) <= revision { return v, nil } - to = changes[rev].Forward - from = changes[rev].Backward - rev++ // up starts with the current rev, and then goes up. + // We are at the changes from older to newer revision, so to match the new version we + // have to take the existing matchers forward. + to = t.changes[revision].Forward + from = t.changes[revision].Backward + revision++ } else { - rev-- - if rev < 0 { + revision-- + if revision < 0 { return v, nil } - to = changes[rev].Backward - from = changes[rev].Forward + // We are at the changes from newer to older revision, so to match the old version we + // have to take the existing matchers backward. + to = t.changes[revision].Backward + from = t.changes[revision].Forward } - // Transform matchers. + // Transform matchers first. We have the `b` from the last traversal with potentially + // already transformed matchers, so just add new changes in. if to.MetricName != "" { b.metric.Name = to.MetricName } @@ -285,7 +312,7 @@ func traverseChanges(changes []change, rev int, up bool, b matcherBuilder, magic aFrom := from.Attributes[a] // TODO(bwplotka): In current logic, tag MUST be specified, // otherwise the engine would need to fetch full metric definition and - // to get the tag -> ID of attribute (or separate attribute tag to IDs index). + // to get the tag -> ID of attribute (or separate attribute tag -> IDs index). for m := range b.other { // Find the attribute under the "old" name. if b.other[m].Name == aFrom.Tag { @@ -303,25 +330,68 @@ func traverseChanges(changes []change, rev int, up bool, b matcherBuilder, magic } } - var vt *valueTransformer - if from.ValuePromQL != "" { + // Prepare result transformations. Here we don't have the series data yet, so + // we need to prepare the full "from", "to" that will be used on the result. + // Transformation [from -> to] is for matchers, for results we need to revert that. + to, from = from, to + + // Update "to", so the final state. For final state, we prioritize the old values. + if r.to.MetricName == "" { + r.to.MetricName = to.MetricName + } + if r.to.Unit == "" { + r.to.Unit = to.Unit + } + for _, newAttr := range to.Attributes { + var found bool + // To find relation we are checking the "from" part of the existing result. + for _, oldAttr := range r.from.Attributes { + if newAttr.Tag == oldAttr.Tag { + found = true + break + } + } + if found { + continue + } + r.to.Attributes = append(r.to.Attributes, newAttr) + } + + // Update "from", so the current, expected data set for a queried variant. + // Here we prioritize the new values. + if from.MetricName != "" { + r.from.MetricName = from.MetricName + } + if from.Unit != "" { + r.from.Unit = from.Unit + } + for _, oldAttr := range r.from.Attributes { + var found bool + // To find relation we are checking the "to" part of the new result. + for _, newAttr := range to.Attributes { + if newAttr.Tag == oldAttr.Tag { + found = true + break + } + } + if found { + continue + } + from.Attributes = append(from.Attributes, oldAttr) + } + r.from.Attributes = from.Attributes + + // Value transformation for results just needs "to" to be accumulated. + if to.ValuePromQL != "" { var err error - vt, err = newValueTransformer(from.ValuePromQL) + r.vt, err = r.vt.AddPromQL(to.ValuePromQL) if err != nil { return nil, err } } - - return traverseChanges(changes, rev, up, b, magicSuffix, append(v, &variant{ - matchers: b.ToMatchers(magicSuffix), - result: resultTransform{ - // Transformation from -> to is for matchers, for results we need to revert that - // transformation, so below code uses to -> from. - from: to, - to: from, - vt: vt, - magicSuffix: magicSuffix, - }, + return t.traverse(revision, newer, b, r, append(v, &variant{ + matchers: b.ToMatchers(t.magicSuffix), + result: r, })) } diff --git a/semconv/engine_test.go b/semconv/engine_test.go index 6b638c78e8..1e004d086d 100644 --- a/semconv/engine_test.go +++ b/semconv/engine_test.go @@ -7,17 +7,22 @@ import ( "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/util/testutil" ) -func mustParsePromQL(p string) parser.Expr { - e := parser.NewParser(p) - expr, err := e.ParseExpr() +func mustNewValueTransformerFromPromQL(p string) valueTransformer { + ret, err := valueTransformer{}.AddPromQL(p) if err != nil { panic(err) } - return expr + return ret +} + +// clean removed fields not used in metricGroupChange for result +// transformations, so they are not updated. +func clean(m metricGroupChange) metricGroupChange { + m.ValuePromQL = "" + return m } func TestEngine_FindVariants(t *testing.T) { @@ -28,64 +33,87 @@ func TestEngine_FindVariants(t *testing.T) { expectedVariants []*variant expectedErr error }{ - // TODO(bwplotka): Add only original variant case. - { - schemaURL: "./testdata/1.1.0", matchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, "./testdata/1.1.0"), - labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds"), - labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), - }, - expectedVariants: []*variant{ - { - // Original. - matchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds"), - labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), - }, - }, - { - matchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds"), - labels.MustNewMatcher(labels.MatchEqual, "__unit__", "milliseconds"), - labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), - }, - result: resultTransform{ - to: testdataLatencyChanges[0].Forward, - from: testdataLatencyChanges[0].Backward, - vt: &valueTransformer{expr: mustParsePromQL("value{} / 1000")}, - }, - }, - }, - }, + // Only original. { schemaURL: "./testdata/1.0.0", matchers: []*labels.Matcher{ labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, "./testdata/1.0.0"), - labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds"), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_some_elements"), labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), }, expectedVariants: []*variant{ { // Original. matchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds"), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_some_elements"), labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), }, }, - { - matchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds"), - labels.MustNewMatcher(labels.MatchEqual, "__unit__", "seconds"), - labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), - }, - result: resultTransform{ - to: testdataLatencyChanges[0].Backward, - from: testdataLatencyChanges[0].Forward, - vt: &valueTransformer{expr: mustParsePromQL("value{} * 1000")}, - }, - }, }, }, + // Asking for my_app_latency_seconds.2 should give us original and one backward variant. + /* + { + schemaURL: "./testdata/1.1.0", matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, "./testdata/1.1.0"), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds"), + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + expectedVariants: []*variant{ + { + // Original. + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds"), + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + { + // Backward. + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds"), + labels.MustNewMatcher(labels.MatchEqual, "__unit__", "milliseconds"), + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + result: resultTransform{ + to: clean(testdataLatencyChanges[0].Forward), + from: clean(testdataLatencyChanges[0].Backward), + vt: mustNewValueTransformerFromPromQL("value{} / 1000"), + }, + }, + }, + }, + // Asking for my_app_latency_seconds should give us original and one forward variant. + { + schemaURL: "./testdata/1.0.0", matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, "./testdata/1.0.0"), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds"), + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + expectedVariants: []*variant{ + { + // Original. + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds"), + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + { + // Forward. + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds"), + labels.MustNewMatcher(labels.MatchEqual, "__unit__", "seconds"), + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + result: resultTransform{ + to: clean(testdataLatencyChanges[0].Backward), + from: clean(testdataLatencyChanges[0].Forward), + vt: mustNewValueTransformerFromPromQL("value{} * 1000"), + }, + }, + }, + }, + */ // TODO(bwplotka): Test ambiguous matcher errors etc. + // Asking for my_app_custom_elements.2 should give us the original and one backward and one forward variant. { schemaURL: "./testdata/1.1.0", matchers: []*labels.Matcher{ labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, "./testdata/1.1.0"), @@ -105,6 +133,7 @@ func TestEngine_FindVariants(t *testing.T) { }, }, { + // Backward. matchers: []*labels.Matcher{ labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_custom_elements_total"), labels.MustNewMatcher(labels.MatchNotEqual, "integer", "2"), @@ -112,8 +141,21 @@ func TestEngine_FindVariants(t *testing.T) { labels.MustNewMatcher(labels.MatchEqual, "fraction", "1.2"), }, result: resultTransform{ - to: testdataElementsChanges[0].Forward, - from: testdataElementsChanges[0].Backward, + to: testdataElementsChanges[1].Forward, + from: testdataElementsChanges[1].Backward, + }, + }, + { + // Forward. + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_custom_elements_changed_total"), + labels.MustNewMatcher(labels.MatchNotEqual, "my_number", "2"), + labels.MustNewMatcher(labels.MatchRegexp, "class", "FIRST|OTHER"), + labels.MustNewMatcher(labels.MatchEqual, "fraction", "1.2"), + }, + result: resultTransform{ + to: testdataElementsChanges[0].Backward, + from: testdataElementsChanges[0].Forward, }, }, }, diff --git a/semconv/gen_files_test.go b/semconv/gen_files_test.go index 5db9a15b96..964a5ee762 100644 --- a/semconv/gen_files_test.go +++ b/semconv/gen_files_test.go @@ -8,6 +8,10 @@ import ( var ( testdataElementsChanges = []change{ + { + Forward: metricGroupChange{MetricName: "", Unit: "", ValuePromQL: "", Attributes: []attribute{{Tag: "my_number"}}}, + Backward: metricGroupChange{MetricName: "", Unit: "", ValuePromQL: "", Attributes: []attribute{{Tag: "number"}}}, + }, { Forward: metricGroupChange{MetricName: "my_app_custom_elements_changed_total", Unit: "", ValuePromQL: "", Attributes: []attribute{{Tag: "number"}, {Tag: "class", Members: []attributeMember{{Value: "FIRST"}, {Value: "SECOND"}, {Value: "OTHER"}}}}}, Backward: metricGroupChange{MetricName: "my_app_custom_elements_total", Unit: "", ValuePromQL: "", Attributes: []attribute{{Tag: "integer"}, {Tag: "category", Members: []attributeMember{{Value: "first"}, {Value: "second"}, {Value: "other"}}}}}, @@ -15,8 +19,8 @@ var ( } testdataLatencyChanges = []change{ { - Forward: metricGroupChange{MetricName: "my_app_latency_seconds", Unit: "{second}", ValuePromQL: "value{} / 1000"}, Backward: metricGroupChange{MetricName: "my_app_latency_milliseconds", Unit: "{millisecond}", ValuePromQL: "value{} * 1000"}, + Forward: metricGroupChange{MetricName: "my_app_latency_seconds", Unit: "{second}", ValuePromQL: "value{} / 1000"}, }, } ) @@ -25,9 +29,8 @@ func TestFetchChangelog(t *testing.T) { expected := &changelog{ Version: 1, MetricsChangelog: map[semanticMetricID][]change{ - "my_app_custom_elements": testdataElementsChanges, - "my_app_latency": testdataLatencyChanges, - "my_app_some_elements_totals": nil, + "my_app_custom_elements": testdataElementsChanges, + "my_app_latency": testdataLatencyChanges, }, } @@ -58,6 +61,10 @@ func TestFetchIDs(t *testing.T) { IntroVersion: "1.1.0"}, }, "my_app_custom_elements_changed_total~elements.counter": { + { + ID: "my_app_custom_elements.3", + IntroVersion: "1.2.0", + }, { ID: "my_app_custom_elements.2", IntroVersion: "1.1.0", diff --git a/semconv/storage_test.go b/semconv/storage_test.go index 238666a4bd..2aaf55761b 100644 --- a/semconv/storage_test.go +++ b/semconv/storage_test.go @@ -2,10 +2,16 @@ package semconv import ( "context" + "fmt" "testing" + "time" "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/model/timestamp" + + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -154,7 +160,7 @@ func (s sample) Copy() chunks.Sample { return c } -func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample { +func selectSeries(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample { t.Helper() ss := q.Select(context.Background(), false, nil, matchers...) @@ -282,7 +288,7 @@ func TestAwareStorage(t *testing.T) { }) t.Run("backward", func(t *testing.T) { - onlyNewResult := query(t, notAware, + onlyNewResult := selectSeries(t, notAware, labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.1.0")), labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataElementsSeriesNew.MetricIdentity().Name), labels.MustNewMatcher(labels.MatchNotEqual, "number", "2"), @@ -292,7 +298,7 @@ func TestAwareStorage(t *testing.T) { require.Equal(t, map[string][]chunks.Sample{ `{__name__="my_app_custom_elements_changed_total", __schema_url__="` + testSchemaURL("1.1.0") + `", __type__="counter", class="FIRST", fraction="1.243", number="1", test="new"}`: testFSamples, }, onlyNewResult) - got := query(t, aware, + got := selectSeries(t, aware, // Without schema selector, semconv aware storage should have no effect. labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataElementsSeriesNew.MetricIdentity().Name), labels.MustNewMatcher(labels.MatchNotEqual, "number", "2"), @@ -301,7 +307,7 @@ func TestAwareStorage(t *testing.T) { ) require.Equal(t, onlyNewResult, got) - compatibleResult := query(t, aware, + compatibleResult := selectSeries(t, aware, labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testdataElementsSeriesNew.Get(schemaURLLabel)), labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataElementsSeriesNew.MetricIdentity().Name), labels.MustNewMatcher(labels.MatchNotEqual, "number", "2"), @@ -314,7 +320,7 @@ func TestAwareStorage(t *testing.T) { }, compatibleResult) }) t.Run("forward", func(t *testing.T) { - onlyOldResult := query(t, notAware, + onlyOldResult := selectSeries(t, notAware, labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testdataElementsSeriesOld.Get(schemaURLLabel)), labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataElementsSeriesOld.MetricIdentity().Name), labels.MustNewMatcher(labels.MatchNotEqual, "integer", "2"), @@ -324,7 +330,7 @@ func TestAwareStorage(t *testing.T) { require.Equal(t, map[string][]chunks.Sample{ `{__name__="my_app_custom_elements_total", __schema_url__="` + testSchemaURL("1.0.0") + `", __type__="counter", category="first", fraction="1.243", integer="1", test="old"}`: testFSamples, }, onlyOldResult) - got := query(t, aware, + got := selectSeries(t, aware, // Without schema selector, semconv aware storage should have no effect. labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataElementsSeriesOld.MetricIdentity().Name), labels.MustNewMatcher(labels.MatchNotEqual, "integer", "2"), @@ -333,7 +339,7 @@ func TestAwareStorage(t *testing.T) { ) require.Equal(t, onlyOldResult, got) - compatibleResult := query(t, aware, + compatibleResult := selectSeries(t, aware, labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testdataElementsSeriesOld.Get(schemaURLLabel)), labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataElementsSeriesOld.MetricIdentity().Name), labels.MustNewMatcher(labels.MatchNotEqual, "integer", "2"), @@ -381,7 +387,7 @@ func TestAwareStorage(t *testing.T) { t.Run("backward", func(t *testing.T) { t.Run("_bucket", func(t *testing.T) { - onlyNewResult := query(t, notAware, + onlyNewResult := selectSeries(t, notAware, labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.1.0")), labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_bucket"), labels.MustNewMatcher(labels.MatchEqual, "code", "200"), @@ -389,14 +395,14 @@ func TestAwareStorage(t *testing.T) { require.Equal(t, map[string][]chunks.Sample{ `{__name__="my_app_latency_seconds_bucket", __schema_url__="` + testSchemaURL("1.1.0") + `", __type__="histogram", __unit__="seconds", code="200", le="10", test="new"}`: testFSamples, }, onlyNewResult) - got := query(t, aware, + got := selectSeries(t, aware, // Without schema selector, semconv aware storage should have no effect. labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_bucket"), labels.MustNewMatcher(labels.MatchEqual, "code", "200"), ) require.Equal(t, onlyNewResult, got) - compatibleResult := query(t, aware, + compatibleResult := selectSeries(t, aware, labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.1.0")), labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_bucket"), labels.MustNewMatcher(labels.MatchEqual, "code", "200"), @@ -407,7 +413,7 @@ func TestAwareStorage(t *testing.T) { }, compatibleResult) }) t.Run("_count", func(t *testing.T) { - onlyNewResult := query(t, notAware, + onlyNewResult := selectSeries(t, notAware, labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.1.0")), labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_count"), labels.MustNewMatcher(labels.MatchEqual, "code", "200"), @@ -415,14 +421,14 @@ func TestAwareStorage(t *testing.T) { require.Equal(t, map[string][]chunks.Sample{ `{__name__="my_app_latency_seconds_count", __schema_url__="` + testSchemaURL("1.1.0") + `", __type__="histogram", __unit__="seconds", code="200", test="new"}`: testFSamples, // TODO(bwplotka): Type and unit proposal is not really consistent with count/sum }, onlyNewResult) - got := query(t, aware, + got := selectSeries(t, aware, // Without schema selector, semconv aware storage should have no effect. labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_count"), labels.MustNewMatcher(labels.MatchEqual, "code", "200"), ) require.Equal(t, onlyNewResult, got) - compatibleResult := query(t, aware, + compatibleResult := selectSeries(t, aware, labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.1.0")), labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_count"), labels.MustNewMatcher(labels.MatchEqual, "code", "200"), @@ -433,7 +439,7 @@ func TestAwareStorage(t *testing.T) { }, compatibleResult) }) t.Run("_sum", func(t *testing.T) { - onlyNewResult := query(t, notAware, + onlyNewResult := selectSeries(t, notAware, labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.1.0")), labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_sum"), labels.MustNewMatcher(labels.MatchEqual, "code", "200"), @@ -441,14 +447,14 @@ func TestAwareStorage(t *testing.T) { require.Equal(t, map[string][]chunks.Sample{ `{__name__="my_app_latency_seconds_sum", __schema_url__="` + testSchemaURL("1.1.0") + `", __type__="histogram", __unit__="seconds", code="200", test="new"}`: testFSamples, // TODO(bwplotka): Type and unit proposal is not really consistent with count/sum }, onlyNewResult) - got := query(t, aware, + got := selectSeries(t, aware, // Without schema selector, semconv aware storage should have no effect. labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_sum"), labels.MustNewMatcher(labels.MatchEqual, "code", "200"), ) require.Equal(t, onlyNewResult, got) - compatibleResult := query(t, aware, + compatibleResult := selectSeries(t, aware, labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.1.0")), labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_sum"), labels.MustNewMatcher(labels.MatchEqual, "code", "200"), @@ -461,7 +467,7 @@ func TestAwareStorage(t *testing.T) { }) t.Run("forward", func(t *testing.T) { t.Run("_bucket", func(t *testing.T) { - onlyOldResult := query(t, notAware, + onlyOldResult := selectSeries(t, notAware, labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.0.0")), labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_bucket"), labels.MustNewMatcher(labels.MatchEqual, "code", "200"), @@ -469,14 +475,14 @@ func TestAwareStorage(t *testing.T) { require.Equal(t, map[string][]chunks.Sample{ `{__name__="my_app_latency_milliseconds_bucket", __schema_url__="` + testSchemaURL("1.0.0") + `", __type__="histogram", __unit__="milliseconds", code="200", le="10000", test="old"}`: testFSamples, }, onlyOldResult) - got := query(t, aware, + got := selectSeries(t, aware, // Without schema selector, semconv aware storage should have no effect. labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_bucket"), labels.MustNewMatcher(labels.MatchEqual, "code", "200"), ) require.Equal(t, onlyOldResult, got) - compatibleResult := query(t, aware, + compatibleResult := selectSeries(t, aware, labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.0.0")), labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_bucket"), labels.MustNewMatcher(labels.MatchEqual, "code", "200"), @@ -487,7 +493,7 @@ func TestAwareStorage(t *testing.T) { }, compatibleResult) }) t.Run("_count", func(t *testing.T) { - onlyOldResult := query(t, notAware, + onlyOldResult := selectSeries(t, notAware, labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.0.0")), labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_count"), labels.MustNewMatcher(labels.MatchEqual, "code", "200"), @@ -495,14 +501,14 @@ func TestAwareStorage(t *testing.T) { require.Equal(t, map[string][]chunks.Sample{ `{__name__="my_app_latency_milliseconds_count", __schema_url__="` + testSchemaURL("1.0.0") + `", __type__="histogram", __unit__="milliseconds", code="200", test="old"}`: testFSamples, // TODO(bwplotka): Type and unit proposal is not really consistent with count/sum }, onlyOldResult) - got := query(t, aware, + got := selectSeries(t, aware, // Without schema selector, semconv aware storage should have no effect. labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_count"), labels.MustNewMatcher(labels.MatchEqual, "code", "200"), ) require.Equal(t, onlyOldResult, got) - compatibleResult := query(t, aware, + compatibleResult := selectSeries(t, aware, labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.0.0")), labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_count"), labels.MustNewMatcher(labels.MatchEqual, "code", "200"), @@ -513,7 +519,7 @@ func TestAwareStorage(t *testing.T) { }, compatibleResult) }) t.Run("_sum", func(t *testing.T) { - onlyOldResult := query(t, notAware, + onlyOldResult := selectSeries(t, notAware, labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.0.0")), labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_sum"), labels.MustNewMatcher(labels.MatchEqual, "code", "200"), @@ -521,14 +527,14 @@ func TestAwareStorage(t *testing.T) { require.Equal(t, map[string][]chunks.Sample{ `{__name__="my_app_latency_milliseconds_sum", __schema_url__="` + testSchemaURL("1.0.0") + `", __type__="histogram", __unit__="milliseconds", code="200", test="old"}`: testFSamples, // TODO(bwplotka): Type and unit proposal is not really consistent with count/sum }, onlyOldResult) - got := query(t, aware, + got := selectSeries(t, aware, // Without schema selector, semconv aware storage should have no effect. labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_sum"), labels.MustNewMatcher(labels.MatchEqual, "code", "200"), ) require.Equal(t, onlyOldResult, got) - compatibleResult := query(t, aware, + compatibleResult := selectSeries(t, aware, labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.0.0")), labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_sum"), labels.MustNewMatcher(labels.MatchEqual, "code", "200"), @@ -558,7 +564,7 @@ func TestAwareStorage(t *testing.T) { }) t.Run("backward", func(t *testing.T) { - onlyNewResult := query(t, notAware, + onlyNewResult := selectSeries(t, notAware, labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.1.0")), labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataLatencySeriesNew.MetricIdentity().Name), labels.MustNewMatcher(labels.MatchEqual, "code", testdataLatencySeriesNew.Get("code")), @@ -566,14 +572,14 @@ func TestAwareStorage(t *testing.T) { require.Equal(t, map[string][]chunks.Sample{ `{__name__="my_app_latency_seconds", __schema_url__="` + testSchemaURL("1.1.0") + `", __type__="histogram", __unit__="seconds", code="200", test="new"}`: testNHCBSamples, }, onlyNewResult) - got := query(t, aware, + got := selectSeries(t, aware, // Without schema selector, semconv aware storage should have no effect. labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataLatencySeriesNew.MetricIdentity().Name), labels.MustNewMatcher(labels.MatchEqual, "code", testdataLatencySeriesNew.Get("code")), ) require.Equal(t, onlyNewResult, got) - compatibleResult := query(t, aware, + compatibleResult := selectSeries(t, aware, labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.1.0")), labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataLatencySeriesNew.MetricIdentity().Name), labels.MustNewMatcher(labels.MatchEqual, "code", testdataLatencySeriesNew.Get("code")), @@ -584,7 +590,7 @@ func TestAwareStorage(t *testing.T) { }, compatibleResult) }) t.Run("forward", func(t *testing.T) { - onlyOldResult := query(t, notAware, + onlyOldResult := selectSeries(t, notAware, labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.0.0")), labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataLatencySeriesOld.MetricIdentity().Name), labels.MustNewMatcher(labels.MatchEqual, "code", testdataLatencySeriesOld.Get("code")), @@ -592,14 +598,14 @@ func TestAwareStorage(t *testing.T) { require.Equal(t, map[string][]chunks.Sample{ `{__name__="my_app_latency_milliseconds", __schema_url__="` + testSchemaURL("1.0.0") + `", __type__="histogram", __unit__="milliseconds", code="200", test="old"}`: testNHCBSamples, }, onlyOldResult) - got := query(t, aware, + got := selectSeries(t, aware, // Without schema selector, semconv aware storage should have no effect. labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataLatencySeriesOld.MetricIdentity().Name), labels.MustNewMatcher(labels.MatchEqual, "code", testdataLatencySeriesOld.Get("code")), ) require.Equal(t, onlyOldResult, got) - compatibleResult := query(t, aware, + compatibleResult := selectSeries(t, aware, labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.0.0")), labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataLatencySeriesOld.MetricIdentity().Name), labels.MustNewMatcher(labels.MatchEqual, "code", testdataLatencySeriesOld.Get("code")), @@ -611,3 +617,62 @@ func TestAwareStorage(t *testing.T) { }) }) } + +func TestAwareStorage_PromQL_OverlappingSeries(t *testing.T) { + const samples = 10 + + testFSamples := make([]chunks.Sample, samples) + for i := range samples { + testFSamples[i] = sample{ + t: int64(i), + f: float64(i), + } + } + + // Remove the test label so the series are identical after semconv conversion. + seriesOld := labels.NewBuilder(testdataElementsSeriesOld).Del("test").Labels() + seriesNew := labels.NewBuilder(testdataElementsSeriesNew).Del("test").Labels() + + ctx := context.TODO() + e := promql.NewEngine(promql.EngineOpts{MaxSamples: 100, Timeout: 30 * time.Second}) + + t.Run("rate with overlapping samples", func(t *testing.T) { + db := openTestDB(t, nil, []appendSeries{ + {series: seriesOld, samples: testFSamples[:5]}, + {series: seriesNew, samples: testFSamples[5:]}, + }) + + q, err := e.NewInstantQuery( + ctx, AwareStorage(db), nil, + fmt.Sprintf("rate(%v{__schema_url__=%q}[10])", seriesOld.MetricIdentity().Name, testSchemaURL("1.0.0")), timestamp.Time(10), + ) + require.NoError(t, err) + + t.Cleanup(q.Close) + + res := q.Exec(ctx) + require.NoError(t, res.Err) + require.NoError(t, nil, res.Warnings.AsErrors()) + require.Equal(t, "{category=\"first\", fraction=\"1.243\", integer=\"1\"} => 0.9999999999999998 @[10]", res.String()) + }) + + t.Run("rate with duplicate samples", func(t *testing.T) { + db := openTestDB(t, nil, []appendSeries{ + {series: seriesOld, samples: testFSamples}, + {series: seriesNew, samples: testFSamples}, + }) + + q, err := e.NewInstantQuery( + ctx, AwareStorage(db), nil, + fmt.Sprintf("rate(%v{__schema_url__=%q}[10])", seriesOld.MetricIdentity().Name, testSchemaURL("1.0.0")), timestamp.Time(10), + ) + require.NoError(t, err) + + t.Cleanup(q.Close) + + res := q.Exec(ctx) + require.NoError(t, res.Err) + require.NoError(t, nil, res.Warnings.AsErrors()) + require.Equal(t, "{category=\"first\", fraction=\"1.243\", integer=\"1\"} => 0.9999999999999998 @[10]", res.String()) + }) +} diff --git a/semconv/testdata/changelog.yaml b/semconv/testdata/changelog.yaml index b26c4cc11f..ad44c78d03 100644 --- a/semconv/testdata/changelog.yaml +++ b/semconv/testdata/changelog.yaml @@ -1,9 +1,11 @@ # Version of this file. version: 1 -# changelog contains all changes made to elements with the same semantic id. +# changelog contains all changes made to elements with the same semantic id +# sorted from the newest to the oldest. metrics_changelog: my_app_latency: + # my_app_latency vs my_app_latency.2 - forward: metric_name: my_app_latency_seconds unit: "{second}" @@ -14,6 +16,14 @@ metrics_changelog: value_promql: "value{} * 1000" my_app_custom_elements: + # my_app_custom_elements.2 vs my_app_custom_elements.3 + - forward: + attributes: + - tag: "my_number" + backward: + attributes: + - tag: "number" + # my_app_custom_elements vs my_app_custom_elements.2 - forward: metric_name: my_app_custom_elements_changed_total attributes: @@ -32,5 +42,3 @@ metrics_changelog: - value: "first" - value: "second" - value: "other" - - my_app_some_elements_totals: diff --git a/semconv/testdata/ids.yaml b/semconv/testdata/ids.yaml index cb93dba2a1..f8ef98ae00 100644 --- a/semconv/testdata/ids.yaml +++ b/semconv/testdata/ids.yaml @@ -1,12 +1,14 @@ # Version of this file. version: 1 -# map from identity of an element to its id(s). +# map from identity of an element to its id(s), ordered from the newest to the oldest. metrics_ids: my_app_latency_seconds~seconds.histogram: - id: "my_app_latency.2" - intro_version: "1.1.0" # When introduced. + intro_version: "1.1.0" my_app_custom_elements_changed_total~elements.counter: + - id: "my_app_custom_elements.3" + intro_version: "1.2.0" - id: "my_app_custom_elements.2" intro_version: "1.1.0" my_app_latency_milliseconds~milliseconds.histogram: diff --git a/semconv/value.go b/semconv/value.go index 6b7457a3e3..a491fde43a 100644 --- a/semconv/value.go +++ b/semconv/value.go @@ -7,28 +7,28 @@ import ( ) type valueTransformer struct { - expr parser.Expr + expr []parser.Expr } -func newValueTransformer(toPromQL string) (*valueTransformer, error) { +func (vt valueTransformer) AddPromQL(toPromQL string) (valueTransformer, error) { p := parser.NewParser(toPromQL) expr, err := p.ParseExpr() if err != nil { - return nil, fmt.Errorf("can't parse %v: %w", toPromQL, err) + return vt, fmt.Errorf("can't parse %v: %w", toPromQL, err) } + // Validate it. if _, err = transform(expr, 0); err != nil { - return nil, err + return vt, err } - - return &valueTransformer{expr: expr}, nil + vt.expr = append(vt.expr, expr) + return vt, nil } -func (t *valueTransformer) Transform(v float64) float64 { - if t == nil { - return v // Noop. +func (vt valueTransformer) Transform(v float64) float64 { + for _, e := range vt.expr { + // We did what we could and tested transform in constructor, skipping error here. + v, _ = transform(e, v) } - // We did what we could and tested transform in constructor, skipping here. - v, _ = transform(t.expr, v) return v } diff --git a/semconv/value_test.go b/semconv/value_test.go index 84536a6389..b835a503bd 100644 --- a/semconv/value_test.go +++ b/semconv/value_test.go @@ -7,18 +7,25 @@ import ( ) func TestValueTransformer(t *testing.T) { - v, err := newValueTransformer("value{} / 1000") + v, err := valueTransformer{}.AddPromQL("value{} / 1000") require.NoError(t, err) require.Equal(t, float64(4), v.Transform(4000)) require.Equal(t, float64(4), v.Transform(4000)) - v, err = newValueTransformer("whatever{foo=\"bar\"} * 1024") + v, err = valueTransformer{}.AddPromQL("whatever{foo=\"bar\"} * 1024") require.NoError(t, err) require.Equal(t, float64(81920), v.Transform(80)) require.Equal(t, float64(81920), v.Transform(80)) - v, err = newValueTransformer("a{} + 15 - 44") + v, err = valueTransformer{}.AddPromQL("a{} + 15 - 44") require.NoError(t, err) require.Equal(t, float64(-27), v.Transform(2)) require.Equal(t, float64(-27), v.Transform(2)) + + // Chain things up. + v, err = valueTransformer{}.AddPromQL("value{} / 1000") + require.NoError(t, err) + v, err = v.AddPromQL("whatever{foo=\"bar\"} * 1024") + require.NoError(t, err) + require.Equal(t, float64(2048), v.Transform(2000)) } diff --git a/storage/merge.go b/storage/merge.go index bc70ceea55..f477fae59b 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -408,13 +408,20 @@ func (c *genericMergeSeriesSet) Next() bool { func (c *genericMergeSeriesSet) At() Labels { if len(c.currentSets) == 1 { - return c.currentSets[0].At() + at := c.currentSets[0].At() + fmt.Println("DEBUG: (one) Returning", at.Labels(), "from", at) + return at } series := make([]Labels, 0, len(c.currentSets)) for _, seriesSet := range c.currentSets { - series = append(series, seriesSet.At()) + at := seriesSet.At() + fmt.Println("DEBUG: Got", at.Labels(), "from", at) + series = append(series, at) } - return c.mergeFunc(series...) + // DEBUG. + l := c.mergeFunc(series...) + fmt.Println("DEBUG: Returning", l.Labels(), "from", l) + return l } func (c *genericMergeSeriesSet) Err() error {