Add support for chained changes.

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2025-03-27 10:01:31 +00:00
parent f959bf8c7c
commit 54d255ef4c
10 changed files with 366 additions and 158 deletions

View file

@ -1270,7 +1270,7 @@ func (ev *evaluator) rangeEval(ctx context.Context, prepSeries func(labels.Label
// If this could be an instant query, shortcut so as not to change sort order.
if ev.endTimestamp == ev.startTimestamp {
if !ev.enableDelayedNameRemoval && result.ContainsSameLabelset() {
ev.errorf("vector cannot contain metrics with the same labelset")
ev.errorf("vector cannot contain metrics with the same labelset %v", result.String())
}
mat := make(Matrix, len(result))
for i, s := range result {
@ -1291,7 +1291,7 @@ func (ev *evaluator) rangeEval(ctx context.Context, prepSeries func(labels.Label
ss, ok := seriess[h]
if ok {
if ss.ts == ts { // If we've seen this output series before at this timestamp, it's a duplicate.
ev.errorf("vector cannot contain metrics with the same labelset")
ev.errorf("vector cannot contain metrics with the same labelset %v", result.String())
}
ss.ts = ts
} else {
@ -1891,7 +1891,7 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value,
}
if !ev.enableDelayedNameRemoval && mat.ContainsSameLabelset() {
ev.errorf("vector cannot contain metrics with the same labelset")
ev.errorf("vector cannot contain metrics with the same labelset %v", mat.String())
}
return mat, warnings
@ -1915,7 +1915,7 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value,
}
}
if !ev.enableDelayedNameRemoval && mat.ContainsSameLabelset() {
ev.errorf("vector cannot contain metrics with the same labelset")
ev.errorf("vector cannot contain metrics with the same labelset %v", mat.String())
}
}
return mat, ws
@ -3445,7 +3445,7 @@ func (ev *evaluator) cleanupMetricLabels(v parser.Value) {
}
}
if mat.ContainsSameLabelset() {
ev.errorf("vector cannot contain metrics with the same labelset")
ev.errorf("vector cannot contain metrics with the same labelset %v", mat.String())
}
} else if v.Type() == parser.ValueTypeVector {
vec := v.(Vector)
@ -3455,7 +3455,7 @@ func (ev *evaluator) cleanupMetricLabels(v parser.Value) {
}
}
if vec.ContainsSameLabelset() {
ev.errorf("vector cannot contain metrics with the same labelset")
ev.errorf("vector cannot contain metrics with the same labelset %v", vec.String())
}
}
}

View file

@ -3,6 +3,7 @@ package semconv
import (
"fmt"
"path"
"slices"
"strconv"
"strings"
"sync"
@ -139,9 +140,10 @@ func (e *schemaEngine) getMetricID(schemaURL string, matchers matcherBuilder) (m
}
for _, id := range vid {
if !natural.Less(id.IntroVersion, schemaVersion) {
return id.ID, magicSuffix, nil
if natural.Less(schemaVersion, id.IntroVersion) {
continue
}
return id.ID, magicSuffix, nil
}
return "", "", fmt.Errorf("can't find metric ID in %v entry for version %v", matchers.metric.String(), schemaVersion)
}
@ -211,31 +213,40 @@ func (e *schemaEngine) FindVariants(schemaURL string, originalMatchers []*labels
if err != nil {
return nil, err
}
// Original selection (without schema url).
variants = append(variants, &variant{matchers: matchers.ToMatchers("")})
sID, rev := mID.semanticID()
changes, ok := ch.MetricsChangelog[sID]
if !ok {
return nil, fmt.Errorf("schema is malformed or cache is not consistent; can't find changes for semantic ID %v", sID)
}
variants = append(variants, &variant{
matchers: matchers.ToMatchers(""),
})
changes, _ := ch.MetricsChangelog[sID]
if len(changes) == 0 {
// No changes, only one variant--the original metric.
return nil, nil
// Unfortunately this (!ok) might also mean the malformed schema or cache.
// __schema__id__ idea would be more robust here.
// We could expect non-changed things in changelog, but that would
// make changelog overly huge.
return variants, nil
}
// Revision starts with 0, then 2,3,4...
// Changes are sorted from the newest to the oldest, so reverse this, so
// it's matches the revisions order.
slices.Reverse(changes)
// Revision starts with 0, then 2,3,4..., uniform it (0,1,2,3...).
if rev != 0 {
rev--
}
// Changelog contains changes across revisions, traverse up and down.
variants, err = traverseChanges(changes, rev, true, matchers, magicSuffix, variants)
t := &changeTraverser{
changes: changes,
magicSuffix: magicSuffix,
}
// Changelog contains changes across revisions, traverse forward and backward.
variants, err = t.traverse(rev, false, matchers, resultTransform{}, variants)
if err != nil {
return nil, fmt.Errorf("can't traverse changes for semantic ID %v: %w", sID, err)
}
variants, err = traverseChanges(changes, rev, false, matchers, magicSuffix, variants)
variants, err = t.traverse(rev, true, matchers, resultTransform{}, variants)
if err != nil {
return nil, fmt.Errorf("can't traverse changes for semantic ID %v: %w", sID, err)
}
@ -245,7 +256,7 @@ func (e *schemaEngine) FindVariants(schemaURL string, originalMatchers []*labels
type resultTransform struct {
to metricGroupChange
from metricGroupChange
vt *valueTransformer
vt valueTransformer
magicSuffix string
}
@ -254,26 +265,42 @@ type variant struct {
result resultTransform
}
// TODO(bwplotka): Fix known gap - we have to chain to's and from's for more traversal lenght than 1 (similar to what we do with matchers).
func traverseChanges(changes []change, rev int, up bool, b matcherBuilder, magicSuffix string, v []*variant) ([]*variant, error) {
var to, from metricGroupChange
if up {
if len(changes) <= rev {
type changeTraverser struct {
changes []change
magicSuffix string
}
// traverse builds the matchers and result transformations for the variant to be queried.
// It then walks further with the new matchers and result transformation as the base for the next change in the chain.
// This allows handling multi-version variants.
// TODO(bwplotka): Consider refactoring for clarity, it's complex, transition to math operations vs semantics.
func (t *changeTraverser) traverse(revision int, newer bool, b matcherBuilder, r resultTransform, v []*variant) ([]*variant, error) {
var (
to, from metricGroupChange
)
// Changes are sorted from the oldest to the newest.
if newer {
if len(t.changes) <= revision {
return v, nil
}
to = changes[rev].Forward
from = changes[rev].Backward
rev++ // up starts with the current rev, and then goes up.
// We are at the changes from older to newer revision, so to match the new version we
// have to take the existing matchers forward.
to = t.changes[revision].Forward
from = t.changes[revision].Backward
revision++
} else {
rev--
if rev < 0 {
revision--
if revision < 0 {
return v, nil
}
to = changes[rev].Backward
from = changes[rev].Forward
// We are at the changes from newer to older revision, so to match the old version we
// have to take the existing matchers backward.
to = t.changes[revision].Backward
from = t.changes[revision].Forward
}
// Transform matchers.
// Transform matchers first. We have the `b` from the last traversal with potentially
// already transformed matchers, so just add new changes in.
if to.MetricName != "" {
b.metric.Name = to.MetricName
}
@ -285,7 +312,7 @@ func traverseChanges(changes []change, rev int, up bool, b matcherBuilder, magic
aFrom := from.Attributes[a]
// TODO(bwplotka): In current logic, tag MUST be specified,
// otherwise the engine would need to fetch full metric definition and
// to get the tag -> ID of attribute (or separate attribute tag to IDs index).
// to get the tag -> ID of attribute (or separate attribute tag -> IDs index).
for m := range b.other {
// Find the attribute under the "old" name.
if b.other[m].Name == aFrom.Tag {
@ -303,25 +330,68 @@ func traverseChanges(changes []change, rev int, up bool, b matcherBuilder, magic
}
}
var vt *valueTransformer
if from.ValuePromQL != "" {
// Prepare result transformations. Here we don't have the series data yet, so
// we need to prepare the full "from", "to" that will be used on the result.
// Transformation [from -> to] is for matchers, for results we need to revert that.
to, from = from, to
// Update "to", so the final state. For final state, we prioritize the old values.
if r.to.MetricName == "" {
r.to.MetricName = to.MetricName
}
if r.to.Unit == "" {
r.to.Unit = to.Unit
}
for _, newAttr := range to.Attributes {
var found bool
// To find relation we are checking the "from" part of the existing result.
for _, oldAttr := range r.from.Attributes {
if newAttr.Tag == oldAttr.Tag {
found = true
break
}
}
if found {
continue
}
r.to.Attributes = append(r.to.Attributes, newAttr)
}
// Update "from", so the current, expected data set for a queried variant.
// Here we prioritize the new values.
if from.MetricName != "" {
r.from.MetricName = from.MetricName
}
if from.Unit != "" {
r.from.Unit = from.Unit
}
for _, oldAttr := range r.from.Attributes {
var found bool
// To find relation we are checking the "to" part of the new result.
for _, newAttr := range to.Attributes {
if newAttr.Tag == oldAttr.Tag {
found = true
break
}
}
if found {
continue
}
from.Attributes = append(from.Attributes, oldAttr)
}
r.from.Attributes = from.Attributes
// Value transformation for results just needs "to" to be accumulated.
if to.ValuePromQL != "" {
var err error
vt, err = newValueTransformer(from.ValuePromQL)
r.vt, err = r.vt.AddPromQL(to.ValuePromQL)
if err != nil {
return nil, err
}
}
return traverseChanges(changes, rev, up, b, magicSuffix, append(v, &variant{
matchers: b.ToMatchers(magicSuffix),
result: resultTransform{
// Transformation from -> to is for matchers, for results we need to revert that
// transformation, so below code uses to -> from.
from: to,
to: from,
vt: vt,
magicSuffix: magicSuffix,
},
return t.traverse(revision, newer, b, r, append(v, &variant{
matchers: b.ToMatchers(t.magicSuffix),
result: r,
}))
}

View file

@ -7,17 +7,22 @@ import (
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/util/testutil"
)
func mustParsePromQL(p string) parser.Expr {
e := parser.NewParser(p)
expr, err := e.ParseExpr()
func mustNewValueTransformerFromPromQL(p string) valueTransformer {
ret, err := valueTransformer{}.AddPromQL(p)
if err != nil {
panic(err)
}
return expr
return ret
}
// clean removed fields not used in metricGroupChange for result
// transformations, so they are not updated.
func clean(m metricGroupChange) metricGroupChange {
m.ValuePromQL = ""
return m
}
func TestEngine_FindVariants(t *testing.T) {
@ -28,64 +33,87 @@ func TestEngine_FindVariants(t *testing.T) {
expectedVariants []*variant
expectedErr error
}{
// TODO(bwplotka): Add only original variant case.
{
schemaURL: "./testdata/1.1.0", matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, "./testdata/1.1.0"),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds"),
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
expectedVariants: []*variant{
{
// Original.
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds"),
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
{
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds"),
labels.MustNewMatcher(labels.MatchEqual, "__unit__", "milliseconds"),
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
result: resultTransform{
to: testdataLatencyChanges[0].Forward,
from: testdataLatencyChanges[0].Backward,
vt: &valueTransformer{expr: mustParsePromQL("value{} / 1000")},
},
},
},
},
// Only original.
{
schemaURL: "./testdata/1.0.0", matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, "./testdata/1.0.0"),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds"),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_some_elements"),
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
expectedVariants: []*variant{
{
// Original.
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds"),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_some_elements"),
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
{
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds"),
labels.MustNewMatcher(labels.MatchEqual, "__unit__", "seconds"),
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
result: resultTransform{
to: testdataLatencyChanges[0].Backward,
from: testdataLatencyChanges[0].Forward,
vt: &valueTransformer{expr: mustParsePromQL("value{} * 1000")},
},
},
},
},
// Asking for my_app_latency_seconds.2 should give us original and one backward variant.
/*
{
schemaURL: "./testdata/1.1.0", matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, "./testdata/1.1.0"),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds"),
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
expectedVariants: []*variant{
{
// Original.
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds"),
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
{
// Backward.
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds"),
labels.MustNewMatcher(labels.MatchEqual, "__unit__", "milliseconds"),
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
result: resultTransform{
to: clean(testdataLatencyChanges[0].Forward),
from: clean(testdataLatencyChanges[0].Backward),
vt: mustNewValueTransformerFromPromQL("value{} / 1000"),
},
},
},
},
// Asking for my_app_latency_seconds should give us original and one forward variant.
{
schemaURL: "./testdata/1.0.0", matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, "./testdata/1.0.0"),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds"),
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
expectedVariants: []*variant{
{
// Original.
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds"),
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
{
// Forward.
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds"),
labels.MustNewMatcher(labels.MatchEqual, "__unit__", "seconds"),
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
result: resultTransform{
to: clean(testdataLatencyChanges[0].Backward),
from: clean(testdataLatencyChanges[0].Forward),
vt: mustNewValueTransformerFromPromQL("value{} * 1000"),
},
},
},
},
*/
// TODO(bwplotka): Test ambiguous matcher errors etc.
// Asking for my_app_custom_elements.2 should give us the original and one backward and one forward variant.
{
schemaURL: "./testdata/1.1.0", matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, "./testdata/1.1.0"),
@ -105,6 +133,7 @@ func TestEngine_FindVariants(t *testing.T) {
},
},
{
// Backward.
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_custom_elements_total"),
labels.MustNewMatcher(labels.MatchNotEqual, "integer", "2"),
@ -112,8 +141,21 @@ func TestEngine_FindVariants(t *testing.T) {
labels.MustNewMatcher(labels.MatchEqual, "fraction", "1.2"),
},
result: resultTransform{
to: testdataElementsChanges[0].Forward,
from: testdataElementsChanges[0].Backward,
to: testdataElementsChanges[1].Forward,
from: testdataElementsChanges[1].Backward,
},
},
{
// Forward.
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_custom_elements_changed_total"),
labels.MustNewMatcher(labels.MatchNotEqual, "my_number", "2"),
labels.MustNewMatcher(labels.MatchRegexp, "class", "FIRST|OTHER"),
labels.MustNewMatcher(labels.MatchEqual, "fraction", "1.2"),
},
result: resultTransform{
to: testdataElementsChanges[0].Backward,
from: testdataElementsChanges[0].Forward,
},
},
},

View file

@ -8,6 +8,10 @@ import (
var (
testdataElementsChanges = []change{
{
Forward: metricGroupChange{MetricName: "", Unit: "", ValuePromQL: "", Attributes: []attribute{{Tag: "my_number"}}},
Backward: metricGroupChange{MetricName: "", Unit: "", ValuePromQL: "", Attributes: []attribute{{Tag: "number"}}},
},
{
Forward: metricGroupChange{MetricName: "my_app_custom_elements_changed_total", Unit: "", ValuePromQL: "", Attributes: []attribute{{Tag: "number"}, {Tag: "class", Members: []attributeMember{{Value: "FIRST"}, {Value: "SECOND"}, {Value: "OTHER"}}}}},
Backward: metricGroupChange{MetricName: "my_app_custom_elements_total", Unit: "", ValuePromQL: "", Attributes: []attribute{{Tag: "integer"}, {Tag: "category", Members: []attributeMember{{Value: "first"}, {Value: "second"}, {Value: "other"}}}}},
@ -15,8 +19,8 @@ var (
}
testdataLatencyChanges = []change{
{
Forward: metricGroupChange{MetricName: "my_app_latency_seconds", Unit: "{second}", ValuePromQL: "value{} / 1000"},
Backward: metricGroupChange{MetricName: "my_app_latency_milliseconds", Unit: "{millisecond}", ValuePromQL: "value{} * 1000"},
Forward: metricGroupChange{MetricName: "my_app_latency_seconds", Unit: "{second}", ValuePromQL: "value{} / 1000"},
},
}
)
@ -25,9 +29,8 @@ func TestFetchChangelog(t *testing.T) {
expected := &changelog{
Version: 1,
MetricsChangelog: map[semanticMetricID][]change{
"my_app_custom_elements": testdataElementsChanges,
"my_app_latency": testdataLatencyChanges,
"my_app_some_elements_totals": nil,
"my_app_custom_elements": testdataElementsChanges,
"my_app_latency": testdataLatencyChanges,
},
}
@ -58,6 +61,10 @@ func TestFetchIDs(t *testing.T) {
IntroVersion: "1.1.0"},
},
"my_app_custom_elements_changed_total~elements.counter": {
{
ID: "my_app_custom_elements.3",
IntroVersion: "1.2.0",
},
{
ID: "my_app_custom_elements.2",
IntroVersion: "1.1.0",

View file

@ -2,10 +2,16 @@ package semconv
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
@ -154,7 +160,7 @@ func (s sample) Copy() chunks.Sample {
return c
}
func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample {
func selectSeries(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample {
t.Helper()
ss := q.Select(context.Background(), false, nil, matchers...)
@ -282,7 +288,7 @@ func TestAwareStorage(t *testing.T) {
})
t.Run("backward", func(t *testing.T) {
onlyNewResult := query(t, notAware,
onlyNewResult := selectSeries(t, notAware,
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.1.0")),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataElementsSeriesNew.MetricIdentity().Name),
labels.MustNewMatcher(labels.MatchNotEqual, "number", "2"),
@ -292,7 +298,7 @@ func TestAwareStorage(t *testing.T) {
require.Equal(t, map[string][]chunks.Sample{
`{__name__="my_app_custom_elements_changed_total", __schema_url__="` + testSchemaURL("1.1.0") + `", __type__="counter", class="FIRST", fraction="1.243", number="1", test="new"}`: testFSamples,
}, onlyNewResult)
got := query(t, aware,
got := selectSeries(t, aware,
// Without schema selector, semconv aware storage should have no effect.
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataElementsSeriesNew.MetricIdentity().Name),
labels.MustNewMatcher(labels.MatchNotEqual, "number", "2"),
@ -301,7 +307,7 @@ func TestAwareStorage(t *testing.T) {
)
require.Equal(t, onlyNewResult, got)
compatibleResult := query(t, aware,
compatibleResult := selectSeries(t, aware,
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testdataElementsSeriesNew.Get(schemaURLLabel)),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataElementsSeriesNew.MetricIdentity().Name),
labels.MustNewMatcher(labels.MatchNotEqual, "number", "2"),
@ -314,7 +320,7 @@ func TestAwareStorage(t *testing.T) {
}, compatibleResult)
})
t.Run("forward", func(t *testing.T) {
onlyOldResult := query(t, notAware,
onlyOldResult := selectSeries(t, notAware,
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testdataElementsSeriesOld.Get(schemaURLLabel)),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataElementsSeriesOld.MetricIdentity().Name),
labels.MustNewMatcher(labels.MatchNotEqual, "integer", "2"),
@ -324,7 +330,7 @@ func TestAwareStorage(t *testing.T) {
require.Equal(t, map[string][]chunks.Sample{
`{__name__="my_app_custom_elements_total", __schema_url__="` + testSchemaURL("1.0.0") + `", __type__="counter", category="first", fraction="1.243", integer="1", test="old"}`: testFSamples,
}, onlyOldResult)
got := query(t, aware,
got := selectSeries(t, aware,
// Without schema selector, semconv aware storage should have no effect.
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataElementsSeriesOld.MetricIdentity().Name),
labels.MustNewMatcher(labels.MatchNotEqual, "integer", "2"),
@ -333,7 +339,7 @@ func TestAwareStorage(t *testing.T) {
)
require.Equal(t, onlyOldResult, got)
compatibleResult := query(t, aware,
compatibleResult := selectSeries(t, aware,
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testdataElementsSeriesOld.Get(schemaURLLabel)),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataElementsSeriesOld.MetricIdentity().Name),
labels.MustNewMatcher(labels.MatchNotEqual, "integer", "2"),
@ -381,7 +387,7 @@ func TestAwareStorage(t *testing.T) {
t.Run("backward", func(t *testing.T) {
t.Run("_bucket", func(t *testing.T) {
onlyNewResult := query(t, notAware,
onlyNewResult := selectSeries(t, notAware,
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.1.0")),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_bucket"),
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
@ -389,14 +395,14 @@ func TestAwareStorage(t *testing.T) {
require.Equal(t, map[string][]chunks.Sample{
`{__name__="my_app_latency_seconds_bucket", __schema_url__="` + testSchemaURL("1.1.0") + `", __type__="histogram", __unit__="seconds", code="200", le="10", test="new"}`: testFSamples,
}, onlyNewResult)
got := query(t, aware,
got := selectSeries(t, aware,
// Without schema selector, semconv aware storage should have no effect.
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_bucket"),
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
)
require.Equal(t, onlyNewResult, got)
compatibleResult := query(t, aware,
compatibleResult := selectSeries(t, aware,
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.1.0")),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_bucket"),
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
@ -407,7 +413,7 @@ func TestAwareStorage(t *testing.T) {
}, compatibleResult)
})
t.Run("_count", func(t *testing.T) {
onlyNewResult := query(t, notAware,
onlyNewResult := selectSeries(t, notAware,
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.1.0")),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_count"),
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
@ -415,14 +421,14 @@ func TestAwareStorage(t *testing.T) {
require.Equal(t, map[string][]chunks.Sample{
`{__name__="my_app_latency_seconds_count", __schema_url__="` + testSchemaURL("1.1.0") + `", __type__="histogram", __unit__="seconds", code="200", test="new"}`: testFSamples, // TODO(bwplotka): Type and unit proposal is not really consistent with count/sum
}, onlyNewResult)
got := query(t, aware,
got := selectSeries(t, aware,
// Without schema selector, semconv aware storage should have no effect.
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_count"),
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
)
require.Equal(t, onlyNewResult, got)
compatibleResult := query(t, aware,
compatibleResult := selectSeries(t, aware,
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.1.0")),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_count"),
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
@ -433,7 +439,7 @@ func TestAwareStorage(t *testing.T) {
}, compatibleResult)
})
t.Run("_sum", func(t *testing.T) {
onlyNewResult := query(t, notAware,
onlyNewResult := selectSeries(t, notAware,
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.1.0")),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_sum"),
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
@ -441,14 +447,14 @@ func TestAwareStorage(t *testing.T) {
require.Equal(t, map[string][]chunks.Sample{
`{__name__="my_app_latency_seconds_sum", __schema_url__="` + testSchemaURL("1.1.0") + `", __type__="histogram", __unit__="seconds", code="200", test="new"}`: testFSamples, // TODO(bwplotka): Type and unit proposal is not really consistent with count/sum
}, onlyNewResult)
got := query(t, aware,
got := selectSeries(t, aware,
// Without schema selector, semconv aware storage should have no effect.
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_sum"),
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
)
require.Equal(t, onlyNewResult, got)
compatibleResult := query(t, aware,
compatibleResult := selectSeries(t, aware,
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.1.0")),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_sum"),
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
@ -461,7 +467,7 @@ func TestAwareStorage(t *testing.T) {
})
t.Run("forward", func(t *testing.T) {
t.Run("_bucket", func(t *testing.T) {
onlyOldResult := query(t, notAware,
onlyOldResult := selectSeries(t, notAware,
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.0.0")),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_bucket"),
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
@ -469,14 +475,14 @@ func TestAwareStorage(t *testing.T) {
require.Equal(t, map[string][]chunks.Sample{
`{__name__="my_app_latency_milliseconds_bucket", __schema_url__="` + testSchemaURL("1.0.0") + `", __type__="histogram", __unit__="milliseconds", code="200", le="10000", test="old"}`: testFSamples,
}, onlyOldResult)
got := query(t, aware,
got := selectSeries(t, aware,
// Without schema selector, semconv aware storage should have no effect.
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_bucket"),
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
)
require.Equal(t, onlyOldResult, got)
compatibleResult := query(t, aware,
compatibleResult := selectSeries(t, aware,
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.0.0")),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_bucket"),
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
@ -487,7 +493,7 @@ func TestAwareStorage(t *testing.T) {
}, compatibleResult)
})
t.Run("_count", func(t *testing.T) {
onlyOldResult := query(t, notAware,
onlyOldResult := selectSeries(t, notAware,
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.0.0")),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_count"),
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
@ -495,14 +501,14 @@ func TestAwareStorage(t *testing.T) {
require.Equal(t, map[string][]chunks.Sample{
`{__name__="my_app_latency_milliseconds_count", __schema_url__="` + testSchemaURL("1.0.0") + `", __type__="histogram", __unit__="milliseconds", code="200", test="old"}`: testFSamples, // TODO(bwplotka): Type and unit proposal is not really consistent with count/sum
}, onlyOldResult)
got := query(t, aware,
got := selectSeries(t, aware,
// Without schema selector, semconv aware storage should have no effect.
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_count"),
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
)
require.Equal(t, onlyOldResult, got)
compatibleResult := query(t, aware,
compatibleResult := selectSeries(t, aware,
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.0.0")),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_count"),
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
@ -513,7 +519,7 @@ func TestAwareStorage(t *testing.T) {
}, compatibleResult)
})
t.Run("_sum", func(t *testing.T) {
onlyOldResult := query(t, notAware,
onlyOldResult := selectSeries(t, notAware,
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.0.0")),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_sum"),
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
@ -521,14 +527,14 @@ func TestAwareStorage(t *testing.T) {
require.Equal(t, map[string][]chunks.Sample{
`{__name__="my_app_latency_milliseconds_sum", __schema_url__="` + testSchemaURL("1.0.0") + `", __type__="histogram", __unit__="milliseconds", code="200", test="old"}`: testFSamples, // TODO(bwplotka): Type and unit proposal is not really consistent with count/sum
}, onlyOldResult)
got := query(t, aware,
got := selectSeries(t, aware,
// Without schema selector, semconv aware storage should have no effect.
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_sum"),
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
)
require.Equal(t, onlyOldResult, got)
compatibleResult := query(t, aware,
compatibleResult := selectSeries(t, aware,
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.0.0")),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_sum"),
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
@ -558,7 +564,7 @@ func TestAwareStorage(t *testing.T) {
})
t.Run("backward", func(t *testing.T) {
onlyNewResult := query(t, notAware,
onlyNewResult := selectSeries(t, notAware,
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.1.0")),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataLatencySeriesNew.MetricIdentity().Name),
labels.MustNewMatcher(labels.MatchEqual, "code", testdataLatencySeriesNew.Get("code")),
@ -566,14 +572,14 @@ func TestAwareStorage(t *testing.T) {
require.Equal(t, map[string][]chunks.Sample{
`{__name__="my_app_latency_seconds", __schema_url__="` + testSchemaURL("1.1.0") + `", __type__="histogram", __unit__="seconds", code="200", test="new"}`: testNHCBSamples,
}, onlyNewResult)
got := query(t, aware,
got := selectSeries(t, aware,
// Without schema selector, semconv aware storage should have no effect.
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataLatencySeriesNew.MetricIdentity().Name),
labels.MustNewMatcher(labels.MatchEqual, "code", testdataLatencySeriesNew.Get("code")),
)
require.Equal(t, onlyNewResult, got)
compatibleResult := query(t, aware,
compatibleResult := selectSeries(t, aware,
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.1.0")),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataLatencySeriesNew.MetricIdentity().Name),
labels.MustNewMatcher(labels.MatchEqual, "code", testdataLatencySeriesNew.Get("code")),
@ -584,7 +590,7 @@ func TestAwareStorage(t *testing.T) {
}, compatibleResult)
})
t.Run("forward", func(t *testing.T) {
onlyOldResult := query(t, notAware,
onlyOldResult := selectSeries(t, notAware,
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.0.0")),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataLatencySeriesOld.MetricIdentity().Name),
labels.MustNewMatcher(labels.MatchEqual, "code", testdataLatencySeriesOld.Get("code")),
@ -592,14 +598,14 @@ func TestAwareStorage(t *testing.T) {
require.Equal(t, map[string][]chunks.Sample{
`{__name__="my_app_latency_milliseconds", __schema_url__="` + testSchemaURL("1.0.0") + `", __type__="histogram", __unit__="milliseconds", code="200", test="old"}`: testNHCBSamples,
}, onlyOldResult)
got := query(t, aware,
got := selectSeries(t, aware,
// Without schema selector, semconv aware storage should have no effect.
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataLatencySeriesOld.MetricIdentity().Name),
labels.MustNewMatcher(labels.MatchEqual, "code", testdataLatencySeriesOld.Get("code")),
)
require.Equal(t, onlyOldResult, got)
compatibleResult := query(t, aware,
compatibleResult := selectSeries(t, aware,
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("1.0.0")),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataLatencySeriesOld.MetricIdentity().Name),
labels.MustNewMatcher(labels.MatchEqual, "code", testdataLatencySeriesOld.Get("code")),
@ -611,3 +617,62 @@ func TestAwareStorage(t *testing.T) {
})
})
}
func TestAwareStorage_PromQL_OverlappingSeries(t *testing.T) {
const samples = 10
testFSamples := make([]chunks.Sample, samples)
for i := range samples {
testFSamples[i] = sample{
t: int64(i),
f: float64(i),
}
}
// Remove the test label so the series are identical after semconv conversion.
seriesOld := labels.NewBuilder(testdataElementsSeriesOld).Del("test").Labels()
seriesNew := labels.NewBuilder(testdataElementsSeriesNew).Del("test").Labels()
ctx := context.TODO()
e := promql.NewEngine(promql.EngineOpts{MaxSamples: 100, Timeout: 30 * time.Second})
t.Run("rate with overlapping samples", func(t *testing.T) {
db := openTestDB(t, nil, []appendSeries{
{series: seriesOld, samples: testFSamples[:5]},
{series: seriesNew, samples: testFSamples[5:]},
})
q, err := e.NewInstantQuery(
ctx, AwareStorage(db), nil,
fmt.Sprintf("rate(%v{__schema_url__=%q}[10])", seriesOld.MetricIdentity().Name, testSchemaURL("1.0.0")), timestamp.Time(10),
)
require.NoError(t, err)
t.Cleanup(q.Close)
res := q.Exec(ctx)
require.NoError(t, res.Err)
require.NoError(t, nil, res.Warnings.AsErrors())
require.Equal(t, "{category=\"first\", fraction=\"1.243\", integer=\"1\"} => 0.9999999999999998 @[10]", res.String())
})
t.Run("rate with duplicate samples", func(t *testing.T) {
db := openTestDB(t, nil, []appendSeries{
{series: seriesOld, samples: testFSamples},
{series: seriesNew, samples: testFSamples},
})
q, err := e.NewInstantQuery(
ctx, AwareStorage(db), nil,
fmt.Sprintf("rate(%v{__schema_url__=%q}[10])", seriesOld.MetricIdentity().Name, testSchemaURL("1.0.0")), timestamp.Time(10),
)
require.NoError(t, err)
t.Cleanup(q.Close)
res := q.Exec(ctx)
require.NoError(t, res.Err)
require.NoError(t, nil, res.Warnings.AsErrors())
require.Equal(t, "{category=\"first\", fraction=\"1.243\", integer=\"1\"} => 0.9999999999999998 @[10]", res.String())
})
}

View file

@ -1,9 +1,11 @@
# Version of this file.
version: 1
# changelog contains all changes made to elements with the same semantic id.
# changelog contains all changes made to elements with the same semantic id
# sorted from the newest to the oldest.
metrics_changelog:
my_app_latency:
# my_app_latency vs my_app_latency.2
- forward:
metric_name: my_app_latency_seconds
unit: "{second}"
@ -14,6 +16,14 @@ metrics_changelog:
value_promql: "value{} * 1000"
my_app_custom_elements:
# my_app_custom_elements.2 vs my_app_custom_elements.3
- forward:
attributes:
- tag: "my_number"
backward:
attributes:
- tag: "number"
# my_app_custom_elements vs my_app_custom_elements.2
- forward:
metric_name: my_app_custom_elements_changed_total
attributes:
@ -32,5 +42,3 @@ metrics_changelog:
- value: "first"
- value: "second"
- value: "other"
my_app_some_elements_totals:

View file

@ -1,12 +1,14 @@
# Version of this file.
version: 1
# map from identity of an element to its id(s).
# map from identity of an element to its id(s), ordered from the newest to the oldest.
metrics_ids:
my_app_latency_seconds~seconds.histogram:
- id: "my_app_latency.2"
intro_version: "1.1.0" # When introduced.
intro_version: "1.1.0"
my_app_custom_elements_changed_total~elements.counter:
- id: "my_app_custom_elements.3"
intro_version: "1.2.0"
- id: "my_app_custom_elements.2"
intro_version: "1.1.0"
my_app_latency_milliseconds~milliseconds.histogram:

View file

@ -7,28 +7,28 @@ import (
)
type valueTransformer struct {
expr parser.Expr
expr []parser.Expr
}
func newValueTransformer(toPromQL string) (*valueTransformer, error) {
func (vt valueTransformer) AddPromQL(toPromQL string) (valueTransformer, error) {
p := parser.NewParser(toPromQL)
expr, err := p.ParseExpr()
if err != nil {
return nil, fmt.Errorf("can't parse %v: %w", toPromQL, err)
return vt, fmt.Errorf("can't parse %v: %w", toPromQL, err)
}
// Validate it.
if _, err = transform(expr, 0); err != nil {
return nil, err
return vt, err
}
return &valueTransformer{expr: expr}, nil
vt.expr = append(vt.expr, expr)
return vt, nil
}
func (t *valueTransformer) Transform(v float64) float64 {
if t == nil {
return v // Noop.
func (vt valueTransformer) Transform(v float64) float64 {
for _, e := range vt.expr {
// We did what we could and tested transform in constructor, skipping error here.
v, _ = transform(e, v)
}
// We did what we could and tested transform in constructor, skipping here.
v, _ = transform(t.expr, v)
return v
}

View file

@ -7,18 +7,25 @@ import (
)
func TestValueTransformer(t *testing.T) {
v, err := newValueTransformer("value{} / 1000")
v, err := valueTransformer{}.AddPromQL("value{} / 1000")
require.NoError(t, err)
require.Equal(t, float64(4), v.Transform(4000))
require.Equal(t, float64(4), v.Transform(4000))
v, err = newValueTransformer("whatever{foo=\"bar\"} * 1024")
v, err = valueTransformer{}.AddPromQL("whatever{foo=\"bar\"} * 1024")
require.NoError(t, err)
require.Equal(t, float64(81920), v.Transform(80))
require.Equal(t, float64(81920), v.Transform(80))
v, err = newValueTransformer("a{} + 15 - 44")
v, err = valueTransformer{}.AddPromQL("a{} + 15 - 44")
require.NoError(t, err)
require.Equal(t, float64(-27), v.Transform(2))
require.Equal(t, float64(-27), v.Transform(2))
// Chain things up.
v, err = valueTransformer{}.AddPromQL("value{} / 1000")
require.NoError(t, err)
v, err = v.AddPromQL("whatever{foo=\"bar\"} * 1024")
require.NoError(t, err)
require.Equal(t, float64(2048), v.Transform(2000))
}

View file

@ -408,13 +408,20 @@ func (c *genericMergeSeriesSet) Next() bool {
func (c *genericMergeSeriesSet) At() Labels {
if len(c.currentSets) == 1 {
return c.currentSets[0].At()
at := c.currentSets[0].At()
fmt.Println("DEBUG: (one) Returning", at.Labels(), "from", at)
return at
}
series := make([]Labels, 0, len(c.currentSets))
for _, seriesSet := range c.currentSets {
series = append(series, seriesSet.At())
at := seriesSet.At()
fmt.Println("DEBUG: Got", at.Labels(), "from", at)
series = append(series, at)
}
return c.mergeFunc(series...)
// DEBUG.
l := c.mergeFunc(series...)
fmt.Println("DEBUG: Returning", l.Labels(), "from", l)
return l
}
func (c *genericMergeSeriesSet) Err() error {