mirror of
https://github.com/prometheus/prometheus.git
synced 2026-06-03 13:42:14 -04:00
semconv: Versioned Read feature.
Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
parent
f5c852af72
commit
07990ec2bd
15 changed files with 1749 additions and 0 deletions
|
|
@ -68,6 +68,7 @@ import (
|
|||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"github.com/prometheus/prometheus/rules"
|
||||
"github.com/prometheus/prometheus/scrape"
|
||||
"github.com/prometheus/prometheus/semconv"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/storage/remote"
|
||||
"github.com/prometheus/prometheus/tracing"
|
||||
|
|
@ -214,6 +215,8 @@ type flagConfig struct {
|
|||
promqlEnableDelayedNameRemoval bool
|
||||
|
||||
promslogConfig promslog.Config
|
||||
|
||||
enableSemconvVersionedRead bool
|
||||
}
|
||||
|
||||
// setFeatureListOptions sets the corresponding options from the featureList.
|
||||
|
|
@ -283,6 +286,9 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
|
|||
case "type-and-unit-labels":
|
||||
c.scrape.EnableTypeAndUnitLabels = true
|
||||
logger.Info("Experimental type and unit labels enabled")
|
||||
case "semconv-versioned-read":
|
||||
c.enableSemconvVersionedRead = true
|
||||
logger.Info("Experimental semconv versioned read enabled")
|
||||
default:
|
||||
logger.Warn("Unknown option for --enable-feature", "option", o)
|
||||
}
|
||||
|
|
@ -728,6 +734,10 @@ func main() {
|
|||
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
|
||||
)
|
||||
|
||||
if cfg.enableSemconvVersionedRead {
|
||||
fanoutStorage = semconv.AwareStorage(fanoutStorage)
|
||||
}
|
||||
|
||||
var (
|
||||
ctxWeb, cancelWeb = context.WithCancel(context.Background())
|
||||
ctxRule = context.Background()
|
||||
|
|
|
|||
1
go.mod
1
go.mod
|
|
@ -162,6 +162,7 @@ require (
|
|||
github.com/knadh/koanf/v2 v2.1.2 // indirect
|
||||
github.com/kylelemons/godebug v1.1.0 // indirect
|
||||
github.com/mailru/easyjson v0.7.7 // indirect
|
||||
github.com/maruel/natural v1.1.1 // indirect
|
||||
github.com/mattn/go-colorable v0.1.13 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/mdlayher/socket v0.4.1 // indirect
|
||||
|
|
|
|||
2
go.sum
2
go.sum
|
|
@ -323,6 +323,8 @@ github.com/linode/linodego v1.47.0 h1:6MFNCyzWbr8Rhl4r7d5DwZLwxvFIsM4ARH6W0KS/R0
|
|||
github.com/linode/linodego v1.47.0/go.mod h1:vyklQRzZUWhFVBZdYx4dcYJU/gG9yKB9VUcUs6ub0Lk=
|
||||
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
|
||||
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
|
||||
github.com/maruel/natural v1.1.1 h1:Hja7XhhmvEFhcByqDoHz9QZbkWey+COd9xWfCfn1ioo=
|
||||
github.com/maruel/natural v1.1.1/go.mod h1:v+Rfd79xlw1AgVBjbO0BEQmptqb5HvL/k9GRHB7ZKEg=
|
||||
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
|
||||
github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
|
||||
github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
|
||||
|
|
|
|||
451
semconv/engine.go
Normal file
451
semconv/engine.go
Normal file
|
|
@ -0,0 +1,451 @@
|
|||
package semconv
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/maruel/natural"
|
||||
"github.com/prometheus/common/model"
|
||||
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
)
|
||||
|
||||
const cacheTTL = 1 * time.Hour
|
||||
|
||||
type schemaEngine struct {
|
||||
// TODO(bwplotka): Implement GC logic for ttl and limits.
|
||||
cachedIDs map[string]*ids
|
||||
cacheIDsMu sync.RWMutex
|
||||
cachedChangelog map[string]*changelog
|
||||
cacheChangelogMu sync.RWMutex
|
||||
}
|
||||
|
||||
func newSchemaEngine() *schemaEngine {
|
||||
return &schemaEngine{
|
||||
cachedIDs: map[string]*ids{},
|
||||
cachedChangelog: map[string]*changelog{},
|
||||
}
|
||||
}
|
||||
|
||||
type matcherBuilder struct {
|
||||
metric labels.MetricIdentity
|
||||
other []*labels.Matcher
|
||||
}
|
||||
|
||||
func newMatcherBuilder(matchers []*labels.Matcher) (matcherBuilder, error) {
|
||||
var b matcherBuilder
|
||||
for _, m := range matchers {
|
||||
switch m.Name {
|
||||
case labels.MetricName:
|
||||
if m.Type != labels.MatchEqual {
|
||||
return b, fmt.Errorf("__name__ matcher must be equal")
|
||||
}
|
||||
b.metric.Name = m.Value
|
||||
case "__type__":
|
||||
if m.Type != labels.MatchEqual {
|
||||
return b, fmt.Errorf("__type__ matcher must be equal")
|
||||
}
|
||||
b.metric.Type = model.MetricType(m.Value)
|
||||
case "__unit__":
|
||||
if m.Type != labels.MatchEqual {
|
||||
return b, fmt.Errorf("__unit__ matcher must be equal")
|
||||
}
|
||||
b.metric.Unit = m.Value
|
||||
case schemaURLLabel:
|
||||
// Skip it as we will be querying to different versions? We could
|
||||
// make regex for registry dir at least, if that helps.
|
||||
default:
|
||||
b.other = append(b.other, m)
|
||||
}
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// ToMatchers returns a copy of matchers based on builder details.
|
||||
func (b matcherBuilder) ToMatchers(extraNameSuffix string) []*labels.Matcher {
|
||||
ret := make([]*labels.Matcher, 0, len(b.other)+3)
|
||||
|
||||
if b.metric.Name != "" {
|
||||
ret = append(ret, &labels.Matcher{
|
||||
Name: model.MetricNameLabel,
|
||||
Type: labels.MatchEqual,
|
||||
Value: b.metric.Name + extraNameSuffix,
|
||||
})
|
||||
}
|
||||
if b.metric.Type != "" && b.metric.Type != model.MetricTypeUnknown {
|
||||
ret = append(ret, &labels.Matcher{
|
||||
Name: "__type__",
|
||||
Type: labels.MatchEqual,
|
||||
Value: string(b.metric.Type),
|
||||
})
|
||||
}
|
||||
if b.metric.Unit != "" {
|
||||
ret = append(ret, &labels.Matcher{
|
||||
Name: "__unit__",
|
||||
Type: labels.MatchEqual,
|
||||
Value: b.metric.Unit,
|
||||
})
|
||||
}
|
||||
return append(ret, b.other...)
|
||||
}
|
||||
|
||||
func (e *schemaEngine) getMetricID(schemaURL string, matchers matcherBuilder) (metricID, string, error) {
|
||||
schemaVersion := path.Base(schemaURL)
|
||||
|
||||
// TODO(bwplotka): This assumes such a file structure is part of the spec.
|
||||
ids, err := e.fetchIDs(schemaIDsURL(schemaURL))
|
||||
if err != nil {
|
||||
return "", "", fmt.Errorf("based on __schema_url__=%v; %w", schemaURL, err)
|
||||
}
|
||||
|
||||
var (
|
||||
vid []versionedID
|
||||
magicSuffix string
|
||||
)
|
||||
for _, suffix := range []string{"", "_bucket", "_count", "_sum"} {
|
||||
magicSuffix = suffix
|
||||
m := matchers.metric
|
||||
m.Name = strings.TrimSuffix(m.Name, magicSuffix)
|
||||
|
||||
var ok bool
|
||||
vid, ok = ids.MetricsIDs[m.String()]
|
||||
if !ok {
|
||||
// Try non-unit search.
|
||||
val, ok := ids.uniqueNameTypeToIdentity[m.String()]
|
||||
if !ok {
|
||||
// Try just name search.
|
||||
val, ok = ids.uniqueNameToIdentity[m.Name]
|
||||
if !ok {
|
||||
// Try different suffix.
|
||||
continue
|
||||
}
|
||||
}
|
||||
if val == "" {
|
||||
return "", "", fmt.Errorf("ambigous metric ID lookup for %v metric; use __type__ and __unit__ for more specific selection", m.String())
|
||||
}
|
||||
vid = ids.MetricsIDs[val]
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(vid) == 0 {
|
||||
return "", "", fmt.Errorf("can't find metric ID in %v entry for version %v; this metric (with or without magic suffixes) is not part of this schema registry", matchers.metric.String(), schemaVersion)
|
||||
}
|
||||
|
||||
for _, id := range vid {
|
||||
if !natural.Less(id.IntroVersion, schemaVersion) {
|
||||
return id.ID, magicSuffix, nil
|
||||
}
|
||||
}
|
||||
return "", "", fmt.Errorf("can't find metric ID in %v entry for version %v", matchers.metric.String(), schemaVersion)
|
||||
}
|
||||
|
||||
func (e *schemaEngine) fetchIDs(schemaIDsURL string) (_ *ids, err error) {
|
||||
e.cacheIDsMu.RLock()
|
||||
ids, ok := e.cachedIDs[schemaIDsURL]
|
||||
e.cacheIDsMu.RUnlock()
|
||||
if ok && time.Now().Sub(ids.fetchTime) < cacheTTL {
|
||||
return ids, nil
|
||||
}
|
||||
// Expired or missing.
|
||||
ids, err = fetchIDs(schemaIDsURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e.cacheIDsMu.Lock()
|
||||
e.cachedIDs[schemaIDsURL] = ids
|
||||
e.cacheIDsMu.Unlock()
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
func (e *schemaEngine) fetchChangelog(schemaChangelogURL string) (_ *changelog, err error) {
|
||||
e.cacheChangelogMu.RLock()
|
||||
ch, ok := e.cachedChangelog[schemaChangelogURL]
|
||||
e.cacheChangelogMu.RUnlock()
|
||||
if ok && time.Now().Sub(ch.fetchTime) < cacheTTL {
|
||||
return ch, nil
|
||||
}
|
||||
// Expired or missing.
|
||||
ch, err = fetchChangelog(schemaChangelogURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e.cacheChangelogMu.Lock()
|
||||
e.cachedChangelog[schemaChangelogURL] = ch
|
||||
e.cacheChangelogMu.Unlock()
|
||||
return ch, nil
|
||||
}
|
||||
|
||||
func schemaChangelogURL(schemaURL string) string {
|
||||
// NOTE(bwplotka): Be careful with path as it cleans potential http:// to http:/
|
||||
dir, _ := path.Split(schemaURL)
|
||||
return fmt.Sprintf("%v/latest/.gen/changelog.yaml", dir)
|
||||
}
|
||||
|
||||
func schemaIDsURL(schemaURL string) string {
|
||||
// NOTE(bwplotka): Be careful with path as it cleans potential http:// to http:/
|
||||
dir, _ := path.Split(schemaURL)
|
||||
return fmt.Sprintf("%v/latest/.gen/ids.yaml", dir)
|
||||
}
|
||||
|
||||
// FindVariants returns all variants for a single schematized (referenced by schema_url) metric.
|
||||
// It returns error if the given matchers does not point to a single metric or if schema or variants couldn't
|
||||
// be detected.
|
||||
func (e *schemaEngine) FindVariants(schemaURL string, originalMatchers []*labels.Matcher) (variants []*variant, _ error) {
|
||||
matchers, err := newMatcherBuilder(originalMatchers)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mID, magicSuffix, err := e.getMetricID(schemaURL, matchers)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getMetricID: %w", err)
|
||||
}
|
||||
ch, err := e.fetchChangelog(schemaChangelogURL(schemaURL))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sID, rev := mID.semanticID()
|
||||
changes, ok := ch.MetricsChangelog[sID]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("schema is malformed or cache is not consistent; can't find changes for semantic ID %v", sID)
|
||||
}
|
||||
|
||||
variants = append(variants, &variant{
|
||||
matchers: matchers.ToMatchers(""),
|
||||
})
|
||||
if len(changes) == 0 {
|
||||
// No changes, only one variant--the original metric.
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Revision starts with 0, then 2,3,4...
|
||||
if rev != 0 {
|
||||
rev--
|
||||
}
|
||||
|
||||
// Changelog contains changes across revisions, traverse up and down.
|
||||
variants, err = traverseChanges(changes, rev, true, matchers, magicSuffix, variants)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't traverse changes for semantic ID %v: %w", sID, err)
|
||||
}
|
||||
variants, err = traverseChanges(changes, rev, false, matchers, magicSuffix, variants)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't traverse changes for semantic ID %v: %w", sID, err)
|
||||
}
|
||||
return variants, nil
|
||||
}
|
||||
|
||||
type resultTransform struct {
|
||||
to metricGroupChange
|
||||
from metricGroupChange
|
||||
vt *valueTransformer
|
||||
magicSuffix string
|
||||
}
|
||||
|
||||
type variant struct {
|
||||
matchers []*labels.Matcher
|
||||
result resultTransform
|
||||
}
|
||||
|
||||
// TODO(bwplotka): Fix known gap - we have to chain to's and from's for more traversal lenght than 1 (similar to what we do with matchers).
|
||||
func traverseChanges(changes []change, rev int, up bool, b matcherBuilder, magicSuffix string, v []*variant) ([]*variant, error) {
|
||||
var to, from metricGroupChange
|
||||
if up {
|
||||
if len(changes) <= rev {
|
||||
return v, nil
|
||||
}
|
||||
to = changes[rev].Forward
|
||||
from = changes[rev].Backward
|
||||
rev++ // up starts with the current rev, and then goes up.
|
||||
} else {
|
||||
rev--
|
||||
if rev < 0 {
|
||||
return v, nil
|
||||
}
|
||||
to = changes[rev].Backward
|
||||
from = changes[rev].Forward
|
||||
}
|
||||
|
||||
// Transform matchers.
|
||||
if to.MetricName != "" {
|
||||
b.metric.Name = to.MetricName
|
||||
}
|
||||
if to.Unit != "" {
|
||||
b.metric.Unit = to.DirectUnit()
|
||||
}
|
||||
for a := range to.Attributes {
|
||||
aTo := to.Attributes[a]
|
||||
aFrom := from.Attributes[a]
|
||||
// TODO(bwplotka): In current logic, tag MUST be specified,
|
||||
// otherwise the engine would need to fetch full metric definition and
|
||||
// to get the tag -> ID of attribute (or separate attribute tag to IDs index).
|
||||
for m := range b.other {
|
||||
// Find the attribute under the "old" name.
|
||||
if b.other[m].Name == aFrom.Tag {
|
||||
old := b.other[m]
|
||||
value := b.other[m].Value
|
||||
for member := range aTo.Members {
|
||||
if old.Matches(aFrom.Members[member].Value) {
|
||||
// TODO(bwplotka): Pretty yolo e.g. should we also replace partial use in regex?
|
||||
value = strings.Replace(value, aFrom.Members[member].Value, aTo.Members[member].Value, -1)
|
||||
}
|
||||
}
|
||||
b.other[m] = labels.MustNewMatcher(old.Type, aTo.Tag, value)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var vt *valueTransformer
|
||||
if from.ValuePromQL != "" {
|
||||
var err error
|
||||
vt, err = newValueTransformer(from.ValuePromQL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return traverseChanges(changes, rev, up, b, magicSuffix, append(v, &variant{
|
||||
matchers: b.ToMatchers(magicSuffix),
|
||||
result: resultTransform{
|
||||
// Transformation from -> to is for matchers, for results we need to revert that
|
||||
// transformation, so below code uses to -> from.
|
||||
from: to,
|
||||
to: from,
|
||||
vt: vt,
|
||||
magicSuffix: magicSuffix,
|
||||
},
|
||||
}))
|
||||
}
|
||||
|
||||
type transformingSeriesSet struct {
|
||||
storage.SeriesSet
|
||||
|
||||
result resultTransform
|
||||
}
|
||||
|
||||
// SeriesSet returns variant SeriesSet that transforms data on the fly
|
||||
// based on variant to and from change details.
|
||||
func (v *variant) SeriesSet(s storage.SeriesSet) storage.SeriesSet {
|
||||
return &transformingSeriesSet{SeriesSet: s, result: v.result}
|
||||
}
|
||||
|
||||
type transformingSeries struct {
|
||||
storage.Series
|
||||
|
||||
lbls labels.Labels
|
||||
|
||||
result resultTransform
|
||||
}
|
||||
|
||||
func (s *transformingSeriesSet) At() storage.Series {
|
||||
at := s.SeriesSet.At()
|
||||
return &transformingSeries{Series: at, lbls: at.Labels(), result: s.result}
|
||||
}
|
||||
|
||||
func (s *transformingSeries) Labels() labels.Labels {
|
||||
typ := s.lbls.MetricIdentity().Type
|
||||
|
||||
builder := labels.NewBuilder(s.lbls)
|
||||
builder.Range(func(l labels.Label) {
|
||||
|
||||
nameswitch:
|
||||
switch l.Name {
|
||||
case labels.MetricName:
|
||||
if s.result.to.MetricName != "" {
|
||||
builder.Set(l.Name, s.result.to.MetricName+s.result.magicSuffix)
|
||||
}
|
||||
case "__type__":
|
||||
return
|
||||
case schemaURLLabel:
|
||||
// Explicitly remove __schema_url__ as that would be misleading.
|
||||
builder.Del(l.Name)
|
||||
case "__unit__":
|
||||
if s.result.to.Unit != "" {
|
||||
builder.Set(l.Name, strings.Trim(s.result.to.Unit, "{}"))
|
||||
}
|
||||
case "le":
|
||||
if typ == model.MetricTypeHistogram {
|
||||
val, err := strconv.ParseFloat(l.Value, 64)
|
||||
if err != nil {
|
||||
fmt.Println("ERROR", err)
|
||||
}
|
||||
builder.Set(l.Name, model.FloatString(s.result.vt.Transform(val)).String())
|
||||
}
|
||||
default:
|
||||
for a := range s.result.to.Attributes {
|
||||
if l.Name != s.result.from.Attributes[a].Tag {
|
||||
continue
|
||||
}
|
||||
builder.Del(l.Name)
|
||||
for m := range s.result.from.Attributes[a].Members {
|
||||
if l.Value != s.result.from.Attributes[a].Members[m].Value {
|
||||
continue
|
||||
}
|
||||
builder.Set(s.result.to.Attributes[a].Tag, s.result.to.Attributes[a].Members[m].Value)
|
||||
break nameswitch
|
||||
}
|
||||
builder.Set(s.result.to.Attributes[a].Tag, l.Value)
|
||||
break nameswitch
|
||||
}
|
||||
}
|
||||
})
|
||||
return builder.Labels()
|
||||
}
|
||||
|
||||
type transformingIterator struct {
|
||||
chunkenc.Iterator
|
||||
|
||||
typ model.MetricType
|
||||
result resultTransform
|
||||
}
|
||||
|
||||
func (s *transformingSeries) Iterator(i chunkenc.Iterator) chunkenc.Iterator {
|
||||
return &transformingIterator{Iterator: s.Series.Iterator(i), typ: s.lbls.MetricIdentity().Type, result: s.result}
|
||||
}
|
||||
|
||||
func (i *transformingIterator) At() (int64, float64) {
|
||||
t, v := i.Iterator.At()
|
||||
// TODO(bwplotka): Do the same for summaries.
|
||||
if i.typ == model.MetricTypeHistogram && (i.result.magicSuffix == "_count" || i.result.magicSuffix == "_bucket") {
|
||||
return t, v
|
||||
}
|
||||
return t, i.result.vt.Transform(v)
|
||||
}
|
||||
|
||||
func (i *transformingIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) {
|
||||
t, hist := i.Iterator.AtHistogram(h)
|
||||
// TODO: You can't really scale native histograms with exponential scheme. Handle this (error, approx, validation).
|
||||
|
||||
if hist.UsesCustomBuckets() {
|
||||
hist = hist.Copy()
|
||||
hist.Sum = i.result.vt.Transform(hist.Sum)
|
||||
for cvi := range hist.CustomValues {
|
||||
hist.CustomValues[cvi] = i.result.vt.Transform(hist.CustomValues[cvi])
|
||||
}
|
||||
}
|
||||
return t, hist
|
||||
}
|
||||
|
||||
func (i *transformingIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
|
||||
t, hist := i.Iterator.AtFloatHistogram(fh)
|
||||
// TODO: You can't really scale native histograms with exponential scheme. Handle this (error, approx, validation).
|
||||
|
||||
if hist.UsesCustomBuckets() {
|
||||
hist = hist.Copy()
|
||||
hist.Sum = i.result.vt.Transform(hist.Sum)
|
||||
for cvi := range hist.CustomValues {
|
||||
hist.CustomValues[cvi] = i.result.vt.Transform(hist.CustomValues[cvi])
|
||||
}
|
||||
}
|
||||
return t, hist
|
||||
}
|
||||
127
semconv/engine_test.go
Normal file
127
semconv/engine_test.go
Normal file
|
|
@ -0,0 +1,127 @@
|
|||
package semconv
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
)
|
||||
|
||||
func TestEngine_FindVariants(t *testing.T) {
|
||||
for _, tcase := range []struct {
|
||||
schemaURL string
|
||||
matchers []*labels.Matcher
|
||||
|
||||
expectedVariants []*variant
|
||||
expectedErr error
|
||||
}{
|
||||
// TODO(bwplotka): Add only original variant case.
|
||||
{
|
||||
schemaURL: "./testdata/v1.1.0", matchers: []*labels.Matcher{
|
||||
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, "./testdata/v1.1.0"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
|
||||
},
|
||||
expectedVariants: []*variant{
|
||||
{
|
||||
// Original.
|
||||
matchers: []*labels.Matcher{
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
|
||||
},
|
||||
},
|
||||
{
|
||||
matchers: []*labels.Matcher{
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "__unit__", "milliseconds"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
|
||||
},
|
||||
result: resultTransform{
|
||||
to: testdataLatencyChanges[0].Forward,
|
||||
from: testdataLatencyChanges[0].Backward,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
schemaURL: "./testdata/v1.0.0", matchers: []*labels.Matcher{
|
||||
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, "./testdata/v1.0.0"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
|
||||
},
|
||||
expectedVariants: []*variant{
|
||||
{
|
||||
// Original.
|
||||
matchers: []*labels.Matcher{
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
|
||||
},
|
||||
},
|
||||
{
|
||||
matchers: []*labels.Matcher{
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "__unit__", "seconds"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
|
||||
},
|
||||
result: resultTransform{
|
||||
to: testdataLatencyChanges[0].Backward,
|
||||
from: testdataLatencyChanges[0].Forward,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
// TODO(bwplotka): Test ambiguous matcher errors etc.
|
||||
{
|
||||
schemaURL: "./testdata/v1.1.0", matchers: []*labels.Matcher{
|
||||
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, "./testdata/v1.1.0"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_custom_elements_changed_total"),
|
||||
labels.MustNewMatcher(labels.MatchNotEqual, "number", "2"),
|
||||
labels.MustNewMatcher(labels.MatchRegexp, "class", "FIRST|OTHER"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "fraction", "1.2"),
|
||||
},
|
||||
expectedVariants: []*variant{
|
||||
{
|
||||
// Original.
|
||||
matchers: []*labels.Matcher{
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_custom_elements_changed_total"),
|
||||
labels.MustNewMatcher(labels.MatchNotEqual, "number", "2"),
|
||||
labels.MustNewMatcher(labels.MatchRegexp, "class", "FIRST|OTHER"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "fraction", "1.2"),
|
||||
},
|
||||
},
|
||||
{
|
||||
matchers: []*labels.Matcher{
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_custom_elements_total"),
|
||||
labels.MustNewMatcher(labels.MatchNotEqual, "integer", "2"),
|
||||
labels.MustNewMatcher(labels.MatchRegexp, "category", "first|other"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "fraction", "1.2"),
|
||||
},
|
||||
result: resultTransform{
|
||||
to: testdataElementsChanges[0].Forward,
|
||||
from: testdataElementsChanges[0].Backward,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
} {
|
||||
|
||||
t.Run("", func(t *testing.T) {
|
||||
e := newSchemaEngine()
|
||||
got, err := e.FindVariants(tcase.schemaURL, tcase.matchers)
|
||||
if tcase.expectedErr != nil {
|
||||
require.ErrorContains(t, err, tcase.expectedErr.Error())
|
||||
return
|
||||
}
|
||||
require.NoError(t, err)
|
||||
testutil.RequireEqualWithOptions(t, tcase.expectedVariants, got, []cmp.Option{
|
||||
cmp.Comparer(func(a, b *labels.Matcher) bool {
|
||||
return a.Name == b.Name && a.Type == b.Type && a.Value == b.Value
|
||||
}),
|
||||
cmp.AllowUnexported(variant{}),
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
160
semconv/gen_files.go
Normal file
160
semconv/gen_files.go
Normal file
|
|
@ -0,0 +1,160 @@
|
|||
package semconv
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
type metricID string
|
||||
|
||||
type semanticMetricID string
|
||||
|
||||
func (id metricID) semanticID() (_ semanticMetricID, revision int) {
|
||||
parts := strings.Split(string(id), ".")
|
||||
if len(parts) == 0 {
|
||||
return semanticMetricID(id), 0
|
||||
}
|
||||
var err error
|
||||
revision, err = strconv.Atoi(parts[len(parts)-1])
|
||||
if err != nil {
|
||||
return semanticMetricID(id), 0
|
||||
}
|
||||
// Number, assume revision.
|
||||
return semanticMetricID(strings.Join(parts[:len(parts)-1], ".")), revision
|
||||
}
|
||||
|
||||
type changelog struct {
|
||||
Version int `yaml:"version"`
|
||||
|
||||
MetricsChangelog map[semanticMetricID][]change `yaml:"metrics_changelog"`
|
||||
|
||||
fetchTime time.Time
|
||||
}
|
||||
|
||||
type change struct {
|
||||
Forward metricGroupChange
|
||||
Backward metricGroupChange
|
||||
}
|
||||
|
||||
// metricGroupChange represents semconv metric group.
|
||||
// NOTE(bwplotka): Only implementing fields that matter for querying.
|
||||
type metricGroupChange struct {
|
||||
MetricName string `yaml:"metric_name"`
|
||||
Unit string `yaml:"unit"`
|
||||
ValuePromQL string `yaml:"value_promql"`
|
||||
Attributes []attribute `yaml:"attributes"`
|
||||
}
|
||||
|
||||
func (m metricGroupChange) DirectUnit() string {
|
||||
return strings.TrimSuffix(strings.TrimPrefix(m.Unit, "{"), "}")
|
||||
}
|
||||
|
||||
type attribute struct {
|
||||
Tag string `yaml:"tag"`
|
||||
Members []attributeMember `yaml:"members"`
|
||||
}
|
||||
|
||||
type attributeMember struct {
|
||||
Value string `yaml:"value"`
|
||||
}
|
||||
|
||||
func fetchChangelog(schemaChangelogURL string) (_ *changelog, err error) {
|
||||
ch := &changelog{}
|
||||
if err := fetchAndUnmarshal(schemaChangelogURL, ch); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ch, nil
|
||||
}
|
||||
|
||||
type ids struct {
|
||||
Version int `yaml:"version"`
|
||||
|
||||
MetricsIDs map[string][]versionedID `yaml:"metrics_ids"`
|
||||
uniqueNameToIdentity map[string]string
|
||||
uniqueNameTypeToIdentity map[string]string
|
||||
|
||||
fetchTime time.Time
|
||||
}
|
||||
|
||||
func fetchIDs(schemaIDsURL string) (_ *ids, err error) {
|
||||
i := &ids{
|
||||
uniqueNameTypeToIdentity: make(map[string]string),
|
||||
uniqueNameToIdentity: make(map[string]string),
|
||||
}
|
||||
if err := fetchAndUnmarshal(schemaIDsURL, i); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for id := range i.MetricsIDs {
|
||||
var name, nameType string
|
||||
// Parse identity in a form of name~unit.type or name.type.
|
||||
parts := strings.Split(id, "~")
|
||||
if len(parts) > 1 {
|
||||
name = parts[0]
|
||||
|
||||
unitAndType := strings.TrimPrefix(id, name+"~")
|
||||
parts = strings.Split(unitAndType, ".")
|
||||
nameType = name + "." + parts[len(parts)-1]
|
||||
} else {
|
||||
parts := strings.Split(id, ".")
|
||||
name = parts[0]
|
||||
nameType = id
|
||||
}
|
||||
|
||||
if _, ok := i.uniqueNameToIdentity[name]; ok {
|
||||
// Not unique, put sentinel "" which will trigger error.
|
||||
i.uniqueNameToIdentity[name] = ""
|
||||
} else {
|
||||
i.uniqueNameToIdentity[name] = id
|
||||
}
|
||||
|
||||
if _, ok := i.uniqueNameTypeToIdentity[nameType]; ok {
|
||||
// Not unique, put sentinel "" which will trigger error.
|
||||
i.uniqueNameTypeToIdentity[nameType] = ""
|
||||
} else {
|
||||
i.uniqueNameTypeToIdentity[nameType] = id
|
||||
}
|
||||
}
|
||||
return i, nil
|
||||
}
|
||||
|
||||
type versionedID struct {
|
||||
ID metricID `yaml:"id"`
|
||||
IntroVersion string `yaml:"intro_version"`
|
||||
}
|
||||
|
||||
func fetchAndUnmarshal[T any](url string, out *T) (err error) {
|
||||
var b []byte
|
||||
if strings.HasPrefix(url, "http") {
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
return fmt.Errorf("http fetch %v: %w", url, err)
|
||||
}
|
||||
if resp.StatusCode/100 != 2 {
|
||||
// TODO(bwplotka): Print potential body?
|
||||
return fmt.Errorf("http fetch %v, got non-200 status: %v", url, resp.StatusCode)
|
||||
}
|
||||
|
||||
// TODO(bwplotka): Add limit.
|
||||
b, err = io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("read all from http %v: %w", url, err)
|
||||
}
|
||||
} else {
|
||||
b, err = os.ReadFile(url)
|
||||
if err != nil {
|
||||
return fmt.Errorf("read all from file %v: %w", url, err)
|
||||
}
|
||||
}
|
||||
if err := yaml.Unmarshal(b, out); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
125
semconv/gen_files_test.go
Normal file
125
semconv/gen_files_test.go
Normal file
|
|
@ -0,0 +1,125 @@
|
|||
package semconv
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var (
|
||||
testdataElementsChanges = []change{
|
||||
{
|
||||
Forward: metricGroupChange{MetricName: "my_app_custom_elements_changed_total", Unit: "", ValuePromQL: "", Attributes: []attribute{{Tag: "number"}, {Tag: "class", Members: []attributeMember{{Value: "FIRST"}, {Value: "SECOND"}, {Value: "OTHER"}}}}},
|
||||
Backward: metricGroupChange{MetricName: "my_app_custom_elements_total", Unit: "", ValuePromQL: "", Attributes: []attribute{{Tag: "integer"}, {Tag: "category", Members: []attributeMember{{Value: "first"}, {Value: "second"}, {Value: "other"}}}}},
|
||||
},
|
||||
}
|
||||
testdataLatencyChanges = []change{
|
||||
{
|
||||
Forward: metricGroupChange{MetricName: "my_app_latency_seconds_total", Unit: "{seconds}", ValuePromQL: "$old / 1000"},
|
||||
Backward: metricGroupChange{MetricName: "my_app_latency_milliseconds_total", Unit: "{milliseconds}", ValuePromQL: "$new * 1000"},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
func TestFetchChangelog(t *testing.T) {
|
||||
expected := &changelog{
|
||||
Version: 1,
|
||||
MetricsChangelog: map[semanticMetricID][]change{
|
||||
"my_app_custom_elements": testdataElementsChanges,
|
||||
"my_app_latency": testdataLatencyChanges,
|
||||
"my_app_some_elements_totals": nil,
|
||||
},
|
||||
}
|
||||
|
||||
t.Run("local", func(t *testing.T) {
|
||||
got, err := fetchChangelog("./testdata/latest/.gen/changelog.yaml")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expected, got)
|
||||
})
|
||||
// TODO(bwplotka): Move to something Prometheus owns e.g. internal Prometheus repo path.
|
||||
t.Run("http", func(t *testing.T) {
|
||||
got, err := fetchChangelog("https://raw.githubusercontent.com/bwplotka/metric-rename-demo/refs/heads/diff/my-org/semconv/v1.1.0/.gen/changelog.yaml")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expected, got)
|
||||
})
|
||||
t.Run("http-custom", func(t *testing.T) {
|
||||
got, err := fetchChangelog("https://bwplotka.dev/semconv/latest/.gen/changelog.yaml")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expected, got)
|
||||
})
|
||||
}
|
||||
|
||||
func TestFetchIDs(t *testing.T) {
|
||||
expected := &ids{
|
||||
Version: 1,
|
||||
MetricsIDs: map[string][]versionedID{
|
||||
"my_app_custom_elements_changed_total.counter": {
|
||||
{
|
||||
ID: "my_app_custom_elements.2",
|
||||
IntroVersion: "v1.1.0",
|
||||
},
|
||||
},
|
||||
"my_app_custom_elements_total.counter": {
|
||||
{
|
||||
ID: "my_app_custom_elements",
|
||||
IntroVersion: "v1.0.0",
|
||||
},
|
||||
},
|
||||
"my_app_latency_milliseconds_total~milliseconds.histogram": {
|
||||
{
|
||||
ID: "my_app_latency",
|
||||
IntroVersion: "v1.0.0",
|
||||
},
|
||||
},
|
||||
"my_app_latency_seconds_total~seconds.histogram": {
|
||||
{ID: "my_app_latency.2",
|
||||
IntroVersion: "v1.1.0"},
|
||||
},
|
||||
"my_app_some_elements_totals~gauge": {
|
||||
{
|
||||
ID: "my_app_some_elements",
|
||||
IntroVersion: "v1.0.0",
|
||||
},
|
||||
},
|
||||
},
|
||||
uniqueNameToIdentity: map[string]string{
|
||||
"my_app_custom_elements_changed_total": "my_app_custom_elements_changed_total.counter",
|
||||
"my_app_custom_elements_total": "my_app_custom_elements_total.counter",
|
||||
"my_app_latency_milliseconds_total": "my_app_latency_milliseconds_total~milliseconds.histogram",
|
||||
"my_app_latency_seconds_total": "my_app_latency_seconds_total~seconds.histogram",
|
||||
"my_app_some_elements_totals": "my_app_some_elements_totals.gauge",
|
||||
},
|
||||
uniqueNameTypeToIdentity: map[string]string{
|
||||
"my_app_custom_elements_changed_total.counter": "my_app_custom_elements_changed_total.counter",
|
||||
"my_app_custom_elements_total.counter": "my_app_custom_elements_total.counter",
|
||||
"my_app_latency_milliseconds_total.histogram": "my_app_latency_milliseconds_total~milliseconds.histogram",
|
||||
"my_app_latency_seconds_total.histogram": "my_app_latency_seconds_total~seconds.histogram",
|
||||
"my_app_some_elements_totals.gauge": "my_app_some_elements_totals.gauge",
|
||||
},
|
||||
}
|
||||
|
||||
t.Run("local", func(t *testing.T) {
|
||||
got, err := fetchIDs("./testdata/latest/.gen/ids.yaml")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expected, got)
|
||||
})
|
||||
t.Run("http", func(t *testing.T) {
|
||||
got, err := fetchIDs("https://raw.githubusercontent.com/bwplotka/metric-rename-demo/refs/heads/diff/my-org/semconv/latest/.gen/ids.yaml")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expected, got)
|
||||
})
|
||||
}
|
||||
|
||||
func TestSemanticMetricID(t *testing.T) {
|
||||
gotID, gotRev := metricID("my_app_custom_elements").semanticID()
|
||||
require.Equal(t, semanticMetricID("my_app_custom_elements"), gotID)
|
||||
require.Equal(t, 0, gotRev)
|
||||
|
||||
gotID, gotRev = metricID("my_app_custom_elements.yolo").semanticID()
|
||||
require.Equal(t, semanticMetricID("my_app_custom_elements.yolo"), gotID)
|
||||
require.Equal(t, 0, gotRev)
|
||||
|
||||
gotID, gotRev = metricID("my_app_custom_elements.2").semanticID()
|
||||
require.Equal(t, semanticMetricID("my_app_custom_elements"), gotID)
|
||||
require.Equal(t, 2, gotRev)
|
||||
}
|
||||
117
semconv/storage.go
Normal file
117
semconv/storage.go
Normal file
|
|
@ -0,0 +1,117 @@
|
|||
package semconv
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/util/annotations"
|
||||
)
|
||||
|
||||
const schemaURLLabel = "__schema_url__"
|
||||
|
||||
// AwareStorage wraps given storage with a semconv awareness that
|
||||
// performs versioned read when __schema_url__ matcher is provided.
|
||||
// TODO(bwplotka): Technically we only need Querier?
|
||||
func AwareStorage(s storage.Storage) storage.Storage {
|
||||
return &awareStorage{Storage: s, engine: newSchemaEngine()}
|
||||
}
|
||||
|
||||
type awareStorage struct {
|
||||
storage.Storage
|
||||
|
||||
engine *schemaEngine
|
||||
}
|
||||
|
||||
type awareQuerier struct {
|
||||
storage.Querier
|
||||
|
||||
engine *schemaEngine
|
||||
}
|
||||
|
||||
func (s *awareStorage) Querier(mint, maxt int64) (storage.Querier, error) {
|
||||
q, err := s.Storage.Querier(mint, maxt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &awareQuerier{Querier: q, engine: s.engine}, nil
|
||||
}
|
||||
|
||||
type annotatedSeriesSet struct {
|
||||
storage.SeriesSet
|
||||
|
||||
warning string
|
||||
}
|
||||
|
||||
func annotateSeriesSet(s storage.SeriesSet, warning string) storage.SeriesSet {
|
||||
return &annotatedSeriesSet{warning: warning, SeriesSet: s}
|
||||
}
|
||||
|
||||
func (s *annotatedSeriesSet) Warnings() annotations.Annotations {
|
||||
got := s.SeriesSet.Warnings()
|
||||
return got.Add(errors.New(s.warning))
|
||||
}
|
||||
|
||||
func (q *awareQuerier) Select(ctx context.Context, sort bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
|
||||
var schemaURL string
|
||||
|
||||
for _, m := range matchers {
|
||||
if m.Name != schemaURLLabel {
|
||||
continue
|
||||
}
|
||||
if schemaURL != "" {
|
||||
return annotateSeriesSet(
|
||||
q.Querier.Select(ctx, sort, hints, matchers...),
|
||||
fmt.Sprintf("schema: __schema_url__ matcher was used more than once, schematization logic is skipped for %v", matchers),
|
||||
)
|
||||
}
|
||||
if m.Type != labels.MatchEqual {
|
||||
return annotateSeriesSet(
|
||||
q.Querier.Select(ctx, sort, hints, matchers...),
|
||||
fmt.Sprintf("schema: __schema_url__ matcher is ambigious (not equal type), schematization logic is skipped for %v", matchers),
|
||||
)
|
||||
}
|
||||
schemaURL = m.Value
|
||||
}
|
||||
if schemaURL == "" {
|
||||
return q.Querier.Select(ctx, sort, hints, matchers...)
|
||||
}
|
||||
|
||||
variants, err := q.engine.FindVariants(schemaURL, matchers)
|
||||
if err != nil {
|
||||
return annotateSeriesSet(
|
||||
q.Querier.Select(ctx, sort, hints, matchers...),
|
||||
fmt.Errorf("schema: failed to find variants %w, schematization logic is skipped for %v", err, matchers).Error(),
|
||||
)
|
||||
}
|
||||
|
||||
var (
|
||||
wg sync.WaitGroup
|
||||
seriesSetChan = make(chan storage.SeriesSet)
|
||||
seriesSet = make([]storage.SeriesSet, 0, len(variants))
|
||||
)
|
||||
|
||||
// TODO(bwplotka): Async limit?
|
||||
// Lookup alternative variants.
|
||||
for _, v := range variants {
|
||||
wg.Add(1)
|
||||
go func(m []*labels.Matcher) {
|
||||
defer wg.Done()
|
||||
|
||||
// We need to sort for NewMergeSeriesSet to work.
|
||||
seriesSetChan <- v.SeriesSet(q.Querier.Select(ctx, true, hints, m...))
|
||||
}(v.matchers)
|
||||
}
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(seriesSetChan)
|
||||
}()
|
||||
|
||||
for r := range seriesSetChan {
|
||||
seriesSet = append(seriesSet, r)
|
||||
}
|
||||
return storage.NewMergeSeriesSet(seriesSet, 0, storage.ChainedSeriesMerge)
|
||||
}
|
||||
611
semconv/storage_test.go
Normal file
611
semconv/storage_test.go
Normal file
|
|
@ -0,0 +1,611 @@
|
|||
package semconv
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
)
|
||||
|
||||
func testSchemaURL(version string) string {
|
||||
//return "https://bwplotka.dev/semconv/" + version
|
||||
return "./testdata/" + version
|
||||
}
|
||||
|
||||
var (
|
||||
testdataElementsSeriesOld = labels.FromStrings(
|
||||
"__name__", "my_app_custom_elements_total",
|
||||
"__schema_url__", testSchemaURL("v1.0.0"),
|
||||
"__type__", "counter",
|
||||
"integer", "1",
|
||||
"category", "first",
|
||||
"fraction", "1.243",
|
||||
"test", "old",
|
||||
)
|
||||
testdataElementsSeriesNew = labels.FromStrings(
|
||||
"__name__", "my_app_custom_elements_changed_total",
|
||||
"__schema_url__", testSchemaURL("v1.1.0"),
|
||||
"__type__", "counter",
|
||||
"number", "1",
|
||||
"class", "FIRST",
|
||||
"fraction", "1.243",
|
||||
"test", "new",
|
||||
)
|
||||
testdataLatencySeriesOld = labels.FromStrings(
|
||||
"__name__", "my_app_latency_milliseconds",
|
||||
"__schema_url__", testSchemaURL("v1.0.0"),
|
||||
"__type__", "histogram",
|
||||
"__unit__", "milliseconds",
|
||||
"code", "200",
|
||||
"test", "old",
|
||||
)
|
||||
testdataLatencySeriesNew = labels.FromStrings(
|
||||
"__name__", "my_app_latency_seconds",
|
||||
"__schema_url__", testSchemaURL("v1.1.0"),
|
||||
"__type__", "histogram",
|
||||
"__unit__", "seconds",
|
||||
"code", "200",
|
||||
"test", "new",
|
||||
)
|
||||
)
|
||||
|
||||
type appendSeries struct {
|
||||
series labels.Labels
|
||||
samples []chunks.Sample
|
||||
}
|
||||
|
||||
func openTestDB(t testing.TB, opts *tsdb.Options, dataToAppend []appendSeries) (db *tsdb.DB) {
|
||||
t.Helper()
|
||||
|
||||
tmpdir := t.TempDir()
|
||||
if opts == nil {
|
||||
opts = tsdb.DefaultOptions()
|
||||
}
|
||||
opts.EnableNativeHistograms = true
|
||||
|
||||
db, err := tsdb.Open(tmpdir, nil, nil, opts, nil)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
_ = db.Close
|
||||
})
|
||||
|
||||
// Append test data.
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
for _, a := range dataToAppend {
|
||||
for _, s := range a.samples {
|
||||
if s.H() != nil || s.FH() != nil {
|
||||
_, err = app.AppendHistogram(0, a.series, s.T(), s.H(), nil)
|
||||
require.NoError(t, err)
|
||||
} else {
|
||||
_, err = app.Append(0, a.series, s.T(), s.F())
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
return db
|
||||
}
|
||||
|
||||
func testNHCB(i int) *histogram.Histogram {
|
||||
return &histogram.Histogram{
|
||||
Schema: histogram.CustomBucketsSchema,
|
||||
CounterResetHint: func() histogram.CounterResetHint {
|
||||
if i == 0 {
|
||||
return 0
|
||||
}
|
||||
return 2
|
||||
}(),
|
||||
Count: 10 + uint64(i),
|
||||
Sum: 2.7 + float64(i),
|
||||
PositiveSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 4},
|
||||
{Offset: 0, Length: 0},
|
||||
{Offset: 0, Length: 3},
|
||||
},
|
||||
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0 + int64(i)},
|
||||
CustomValues: []float64{5, 10, 20, 50, 100, 500},
|
||||
}
|
||||
}
|
||||
|
||||
type sample struct {
|
||||
t int64
|
||||
f float64
|
||||
h *histogram.Histogram
|
||||
fh *histogram.FloatHistogram
|
||||
}
|
||||
|
||||
func newSample(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample {
|
||||
return sample{t, v, h, fh}
|
||||
}
|
||||
|
||||
func (s sample) T() int64 { return s.t }
|
||||
func (s sample) F() float64 { return s.f }
|
||||
func (s sample) H() *histogram.Histogram { return s.h }
|
||||
func (s sample) FH() *histogram.FloatHistogram { return s.fh }
|
||||
|
||||
func (s sample) Type() chunkenc.ValueType {
|
||||
switch {
|
||||
case s.h != nil:
|
||||
return chunkenc.ValHistogram
|
||||
case s.fh != nil:
|
||||
return chunkenc.ValFloatHistogram
|
||||
default:
|
||||
return chunkenc.ValFloat
|
||||
}
|
||||
}
|
||||
|
||||
func (s sample) Copy() chunks.Sample {
|
||||
c := sample{t: s.t, f: s.f}
|
||||
if s.h != nil {
|
||||
c.h = s.h.Copy()
|
||||
}
|
||||
if s.fh != nil {
|
||||
c.fh = s.fh.Copy()
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample {
|
||||
t.Helper()
|
||||
|
||||
ss := q.Select(context.Background(), false, nil, matchers...)
|
||||
|
||||
var it chunkenc.Iterator
|
||||
result := map[string][]chunks.Sample{}
|
||||
for ss.Next() {
|
||||
series := ss.At()
|
||||
|
||||
it = series.Iterator(it)
|
||||
samples, err := storage.ExpandSamples(it, newSample)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, it.Err())
|
||||
|
||||
if len(samples) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
name := series.Labels().String()
|
||||
result[name] = samples
|
||||
}
|
||||
require.NoError(t, ss.Err())
|
||||
require.Empty(t, ss.Warnings())
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func scaleSamples(samples []chunks.Sample, up bool, value float64) []chunks.Sample {
|
||||
ret := make([]chunks.Sample, len(samples))
|
||||
for i, s := range samples {
|
||||
if fh := s.FH(); fh != nil {
|
||||
if !fh.UsesCustomBuckets() {
|
||||
panic("can't scale native histograms ")
|
||||
}
|
||||
fh = fh.Copy()
|
||||
if up {
|
||||
fh.Sum = fh.Sum * value
|
||||
for cvi := range fh.CustomValues {
|
||||
fh.CustomValues[cvi] = fh.CustomValues[cvi] * value
|
||||
}
|
||||
} else {
|
||||
fh.Sum = fh.Sum / value
|
||||
for cvi := range fh.CustomValues {
|
||||
fh.CustomValues[cvi] = fh.CustomValues[cvi] / value
|
||||
}
|
||||
}
|
||||
ret[i] = sample{t: s.T(), fh: fh}
|
||||
continue
|
||||
}
|
||||
if h := s.H(); h != nil {
|
||||
if !h.UsesCustomBuckets() {
|
||||
panic("can't scale native histograms ")
|
||||
}
|
||||
h = h.Copy()
|
||||
if up {
|
||||
h.Sum = h.Sum * value
|
||||
for cvi := range h.CustomValues {
|
||||
h.CustomValues[cvi] = h.CustomValues[cvi] * value
|
||||
}
|
||||
} else {
|
||||
h.Sum = h.Sum / value
|
||||
for cvi := range h.CustomValues {
|
||||
h.CustomValues[cvi] = h.CustomValues[cvi] / value
|
||||
}
|
||||
}
|
||||
ret[i] = sample{t: s.T(), h: h}
|
||||
continue
|
||||
}
|
||||
if up {
|
||||
ret[i] = sample{t: s.T(), f: s.F() * value}
|
||||
} else {
|
||||
ret[i] = sample{t: s.T(), f: s.F() / value}
|
||||
}
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func TestScaleSamples(t *testing.T) {
|
||||
t.Run("nhcb", func(t *testing.T) {
|
||||
testNHCBSamples := make([]chunks.Sample, 10)
|
||||
for i := range 10 {
|
||||
testNHCBSamples[i] = sample{
|
||||
t: int64(i),
|
||||
h: testNHCB(i),
|
||||
}
|
||||
}
|
||||
scaled := scaleSamples(testNHCBSamples, true, 1000)
|
||||
require.NotEqual(t, testNHCBSamples, scaled)
|
||||
})
|
||||
}
|
||||
|
||||
func TestAwareStorage(t *testing.T) {
|
||||
const samples = 10
|
||||
|
||||
testFSamples := make([]chunks.Sample, samples)
|
||||
for i := range samples {
|
||||
testFSamples[i] = sample{
|
||||
t: int64(i),
|
||||
f: float64(i),
|
||||
}
|
||||
}
|
||||
testNHCBSamples := make([]chunks.Sample, samples)
|
||||
for i := range samples {
|
||||
testNHCBSamples[i] = sample{
|
||||
t: int64(i),
|
||||
h: testNHCB(i),
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("counter", func(t *testing.T) {
|
||||
db := openTestDB(t, nil, []appendSeries{
|
||||
{series: testdataElementsSeriesOld, samples: testFSamples},
|
||||
{series: testdataElementsSeriesNew, samples: testFSamples},
|
||||
})
|
||||
|
||||
notAware, err := db.Querier(0, samples)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, notAware.Close())
|
||||
})
|
||||
aware, err := AwareStorage(db).Querier(0, samples)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, aware.Close())
|
||||
})
|
||||
|
||||
t.Run("backward", func(t *testing.T) {
|
||||
onlyNewResult := query(t, notAware,
|
||||
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.1.0")),
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataElementsSeriesNew.MetricIdentity().Name),
|
||||
labels.MustNewMatcher(labels.MatchNotEqual, "number", "2"),
|
||||
labels.MustNewMatcher(labels.MatchRegexp, "class", "FIRST|OTHER"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "fraction", testdataElementsSeriesNew.Get("fraction")),
|
||||
)
|
||||
require.Equal(t, map[string][]chunks.Sample{
|
||||
`{__name__="my_app_custom_elements_changed_total", __schema_url__="` + testSchemaURL("v1.1.0") + `", __type__="counter", class="FIRST", fraction="1.243", number="1", test="new"}`: testFSamples,
|
||||
}, onlyNewResult)
|
||||
got := query(t, aware,
|
||||
// Without schema selector, semconv aware storage should have no effect.
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataElementsSeriesNew.MetricIdentity().Name),
|
||||
labels.MustNewMatcher(labels.MatchNotEqual, "number", "2"),
|
||||
labels.MustNewMatcher(labels.MatchRegexp, "class", "FIRST|OTHER"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "fraction", testdataElementsSeriesNew.Get("fraction")),
|
||||
)
|
||||
require.Equal(t, onlyNewResult, got)
|
||||
|
||||
compatibleResult := query(t, aware,
|
||||
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testdataElementsSeriesNew.Get(schemaURLLabel)),
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataElementsSeriesNew.MetricIdentity().Name),
|
||||
labels.MustNewMatcher(labels.MatchNotEqual, "number", "2"),
|
||||
labels.MustNewMatcher(labels.MatchRegexp, "class", "FIRST|OTHER"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "fraction", testdataElementsSeriesNew.Get("fraction")),
|
||||
)
|
||||
require.Equal(t, map[string][]chunks.Sample{
|
||||
`{__name__="my_app_custom_elements_changed_total", __type__="counter", class="FIRST", fraction="1.243", number="1", test="new"}`: testFSamples,
|
||||
`{__name__="my_app_custom_elements_changed_total", __type__="counter", class="FIRST", fraction="1.243", number="1", test="old"}`: testFSamples,
|
||||
}, compatibleResult)
|
||||
})
|
||||
t.Run("forward", func(t *testing.T) {
|
||||
onlyOldResult := query(t, notAware,
|
||||
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testdataElementsSeriesOld.Get(schemaURLLabel)),
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataElementsSeriesOld.MetricIdentity().Name),
|
||||
labels.MustNewMatcher(labels.MatchNotEqual, "integer", "2"),
|
||||
labels.MustNewMatcher(labels.MatchRegexp, "category", "first|other"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "fraction", testdataElementsSeriesOld.Get("fraction")),
|
||||
)
|
||||
require.Equal(t, map[string][]chunks.Sample{
|
||||
`{__name__="my_app_custom_elements_total", __schema_url__="` + testSchemaURL("v1.0.0") + `", __type__="counter", category="first", fraction="1.243", integer="1", test="old"}`: testFSamples,
|
||||
}, onlyOldResult)
|
||||
got := query(t, aware,
|
||||
// Without schema selector, semconv aware storage should have no effect.
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataElementsSeriesOld.MetricIdentity().Name),
|
||||
labels.MustNewMatcher(labels.MatchNotEqual, "integer", "2"),
|
||||
labels.MustNewMatcher(labels.MatchRegexp, "category", "first|other"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "fraction", testdataElementsSeriesOld.Get("fraction")),
|
||||
)
|
||||
require.Equal(t, onlyOldResult, got)
|
||||
|
||||
compatibleResult := query(t, aware,
|
||||
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testdataElementsSeriesOld.Get(schemaURLLabel)),
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataElementsSeriesOld.MetricIdentity().Name),
|
||||
labels.MustNewMatcher(labels.MatchNotEqual, "integer", "2"),
|
||||
labels.MustNewMatcher(labels.MatchRegexp, "category", "first|other"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "fraction", testdataElementsSeriesOld.Get("fraction")),
|
||||
)
|
||||
require.Equal(t, map[string][]chunks.Sample{
|
||||
`{__name__="my_app_custom_elements_total", __type__="counter", category="first", fraction="1.243", integer="1", test="old"}`: testFSamples,
|
||||
`{__name__="my_app_custom_elements_total", __type__="counter", category="first", fraction="1.243", integer="1", test="new"}`: testFSamples,
|
||||
}, compatibleResult)
|
||||
})
|
||||
})
|
||||
t.Run("classic histogram", func(t *testing.T) {
|
||||
var a []appendSeries
|
||||
for _, m := range []labels.Labels{testdataLatencySeriesNew, testdataLatencySeriesOld} {
|
||||
b := labels.NewBuilder(m)
|
||||
if m.Get("test") == "new" {
|
||||
b.Set("le", "10")
|
||||
} else {
|
||||
b.Set("le", "10000")
|
||||
}
|
||||
|
||||
b.Set("__name__", m.MetricIdentity().Name+"_bucket")
|
||||
a = append(a, appendSeries{series: b.Labels(), samples: testFSamples})
|
||||
b = labels.NewBuilder(m)
|
||||
b.Set("__name__", m.MetricIdentity().Name+"_count")
|
||||
a = append(a, appendSeries{series: b.Labels(), samples: testFSamples})
|
||||
b = labels.NewBuilder(m)
|
||||
b.Set("__name__", m.MetricIdentity().Name+"_sum")
|
||||
a = append(a, appendSeries{series: b.Labels(), samples: testFSamples})
|
||||
}
|
||||
db := openTestDB(t, nil, a)
|
||||
|
||||
notAware, err := db.Querier(0, samples)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, notAware.Close())
|
||||
})
|
||||
aware, err := AwareStorage(db).Querier(0, samples)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, aware.Close())
|
||||
})
|
||||
|
||||
t.Run("backward", func(t *testing.T) {
|
||||
t.Run("_bucket", func(t *testing.T) {
|
||||
onlyNewResult := query(t, notAware,
|
||||
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.1.0")),
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_bucket"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
|
||||
)
|
||||
require.Equal(t, map[string][]chunks.Sample{
|
||||
`{__name__="my_app_latency_seconds_bucket", __schema_url__="` + testSchemaURL("v1.1.0") + `", __type__="histogram", __unit__="seconds", code="200", le="10", test="new"}`: testFSamples,
|
||||
}, onlyNewResult)
|
||||
got := query(t, aware,
|
||||
// Without schema selector, semconv aware storage should have no effect.
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_bucket"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
|
||||
)
|
||||
require.Equal(t, onlyNewResult, got)
|
||||
|
||||
compatibleResult := query(t, aware,
|
||||
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.1.0")),
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_bucket"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
|
||||
)
|
||||
require.Equal(t, map[string][]chunks.Sample{
|
||||
`{__name__="my_app_latency_seconds_bucket", __type__="histogram", __unit__="seconds", code="200", le="10", test="new"}`: testFSamples,
|
||||
`{__name__="my_app_latency_seconds_bucket", __type__="histogram", __unit__="seconds", code="200", le="10", test="old"}`: testFSamples,
|
||||
}, compatibleResult)
|
||||
})
|
||||
t.Run("_count", func(t *testing.T) {
|
||||
onlyNewResult := query(t, notAware,
|
||||
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.1.0")),
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_count"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
|
||||
)
|
||||
require.Equal(t, map[string][]chunks.Sample{
|
||||
`{__name__="my_app_latency_seconds_count", __schema_url__="` + testSchemaURL("v1.1.0") + `", __type__="histogram", __unit__="seconds", code="200", test="new"}`: testFSamples, // TODO(bwplotka): Type and unit proposal is not really consistent with count/sum
|
||||
}, onlyNewResult)
|
||||
got := query(t, aware,
|
||||
// Without schema selector, semconv aware storage should have no effect.
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_count"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
|
||||
)
|
||||
require.Equal(t, onlyNewResult, got)
|
||||
|
||||
compatibleResult := query(t, aware,
|
||||
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.1.0")),
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_count"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
|
||||
)
|
||||
require.Equal(t, map[string][]chunks.Sample{
|
||||
`{__name__="my_app_latency_seconds_count", __type__="histogram", __unit__="seconds", code="200", test="new"}`: testFSamples,
|
||||
`{__name__="my_app_latency_seconds_count", __type__="histogram", __unit__="seconds", code="200", test="old"}`: testFSamples,
|
||||
}, compatibleResult)
|
||||
})
|
||||
t.Run("_sum", func(t *testing.T) {
|
||||
onlyNewResult := query(t, notAware,
|
||||
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.1.0")),
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_sum"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
|
||||
)
|
||||
require.Equal(t, map[string][]chunks.Sample{
|
||||
`{__name__="my_app_latency_seconds_sum", __schema_url__="` + testSchemaURL("v1.1.0") + `", __type__="histogram", __unit__="seconds", code="200", test="new"}`: testFSamples, // TODO(bwplotka): Type and unit proposal is not really consistent with count/sum
|
||||
}, onlyNewResult)
|
||||
got := query(t, aware,
|
||||
// Without schema selector, semconv aware storage should have no effect.
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_sum"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
|
||||
)
|
||||
require.Equal(t, onlyNewResult, got)
|
||||
|
||||
compatibleResult := query(t, aware,
|
||||
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.1.0")),
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_seconds_sum"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
|
||||
)
|
||||
require.Equal(t, map[string][]chunks.Sample{
|
||||
`{__name__="my_app_latency_seconds_sum", __type__="histogram", __unit__="seconds", code="200", test="new"}`: testFSamples,
|
||||
`{__name__="my_app_latency_seconds_sum", __type__="histogram", __unit__="seconds", code="200", test="old"}`: scaleSamples(testFSamples, false, 1000),
|
||||
}, compatibleResult)
|
||||
})
|
||||
})
|
||||
t.Run("forward", func(t *testing.T) {
|
||||
t.Run("_bucket", func(t *testing.T) {
|
||||
onlyOldResult := query(t, notAware,
|
||||
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.0.0")),
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_bucket"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
|
||||
)
|
||||
require.Equal(t, map[string][]chunks.Sample{
|
||||
`{__name__="my_app_latency_milliseconds_bucket", __schema_url__="` + testSchemaURL("v1.0.0") + `", __type__="histogram", __unit__="milliseconds", code="200", le="10000", test="old"}`: testFSamples,
|
||||
}, onlyOldResult)
|
||||
got := query(t, aware,
|
||||
// Without schema selector, semconv aware storage should have no effect.
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_bucket"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
|
||||
)
|
||||
require.Equal(t, onlyOldResult, got)
|
||||
|
||||
compatibleResult := query(t, aware,
|
||||
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.0.0")),
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_bucket"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
|
||||
)
|
||||
require.Equal(t, map[string][]chunks.Sample{
|
||||
`{__name__="my_app_latency_milliseconds_bucket", __type__="histogram", __unit__="milliseconds", code="200", le="10000", test="new"}`: testFSamples,
|
||||
`{__name__="my_app_latency_milliseconds_bucket", __type__="histogram", __unit__="milliseconds", code="200", le="10000", test="old"}`: testFSamples,
|
||||
}, compatibleResult)
|
||||
})
|
||||
t.Run("_count", func(t *testing.T) {
|
||||
onlyOldResult := query(t, notAware,
|
||||
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.0.0")),
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_count"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
|
||||
)
|
||||
require.Equal(t, map[string][]chunks.Sample{
|
||||
`{__name__="my_app_latency_milliseconds_count", __schema_url__="` + testSchemaURL("v1.0.0") + `", __type__="histogram", __unit__="milliseconds", code="200", test="old"}`: testFSamples, // TODO(bwplotka): Type and unit proposal is not really consistent with count/sum
|
||||
}, onlyOldResult)
|
||||
got := query(t, aware,
|
||||
// Without schema selector, semconv aware storage should have no effect.
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_count"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
|
||||
)
|
||||
require.Equal(t, onlyOldResult, got)
|
||||
|
||||
compatibleResult := query(t, aware,
|
||||
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.0.0")),
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_count"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
|
||||
)
|
||||
require.Equal(t, map[string][]chunks.Sample{
|
||||
`{__name__="my_app_latency_milliseconds_count", __type__="histogram", __unit__="milliseconds", code="200", test="new"}`: testFSamples,
|
||||
`{__name__="my_app_latency_milliseconds_count", __type__="histogram", __unit__="milliseconds", code="200", test="old"}`: testFSamples,
|
||||
}, compatibleResult)
|
||||
})
|
||||
t.Run("_sum", func(t *testing.T) {
|
||||
onlyOldResult := query(t, notAware,
|
||||
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.0.0")),
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_sum"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
|
||||
)
|
||||
require.Equal(t, map[string][]chunks.Sample{
|
||||
`{__name__="my_app_latency_milliseconds_sum", __schema_url__="` + testSchemaURL("v1.0.0") + `", __type__="histogram", __unit__="milliseconds", code="200", test="old"}`: testFSamples, // TODO(bwplotka): Type and unit proposal is not really consistent with count/sum
|
||||
}, onlyOldResult)
|
||||
got := query(t, aware,
|
||||
// Without schema selector, semconv aware storage should have no effect.
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_sum"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
|
||||
)
|
||||
require.Equal(t, onlyOldResult, got)
|
||||
|
||||
compatibleResult := query(t, aware,
|
||||
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.0.0")),
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_app_latency_milliseconds_sum"),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
|
||||
)
|
||||
require.Equal(t, map[string][]chunks.Sample{
|
||||
`{__name__="my_app_latency_milliseconds_sum", __type__="histogram", __unit__="milliseconds", code="200", test="old"}`: testFSamples,
|
||||
`{__name__="my_app_latency_milliseconds_sum", __type__="histogram", __unit__="milliseconds", code="200", test="new"}`: scaleSamples(testFSamples, true, 1000),
|
||||
}, compatibleResult)
|
||||
})
|
||||
})
|
||||
})
|
||||
t.Run("native histogram", func(t *testing.T) {
|
||||
db := openTestDB(t, nil, []appendSeries{
|
||||
{series: testdataLatencySeriesOld, samples: testNHCBSamples},
|
||||
{series: testdataLatencySeriesNew, samples: testNHCBSamples},
|
||||
})
|
||||
|
||||
notAware, err := db.Querier(0, samples)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, notAware.Close())
|
||||
})
|
||||
aware, err := AwareStorage(db).Querier(0, samples)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, aware.Close())
|
||||
})
|
||||
|
||||
t.Run("backward", func(t *testing.T) {
|
||||
onlyNewResult := query(t, notAware,
|
||||
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.1.0")),
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataLatencySeriesNew.MetricIdentity().Name),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "code", testdataLatencySeriesNew.Get("code")),
|
||||
)
|
||||
require.Equal(t, map[string][]chunks.Sample{
|
||||
`{__name__="my_app_latency_seconds", __schema_url__="` + testSchemaURL("v1.1.0") + `", __type__="histogram", __unit__="seconds", code="200", test="new"}`: testNHCBSamples,
|
||||
}, onlyNewResult)
|
||||
got := query(t, aware,
|
||||
// Without schema selector, semconv aware storage should have no effect.
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataLatencySeriesNew.MetricIdentity().Name),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "code", testdataLatencySeriesNew.Get("code")),
|
||||
)
|
||||
require.Equal(t, onlyNewResult, got)
|
||||
|
||||
compatibleResult := query(t, aware,
|
||||
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.1.0")),
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataLatencySeriesNew.MetricIdentity().Name),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "code", testdataLatencySeriesNew.Get("code")),
|
||||
)
|
||||
require.Equal(t, map[string][]chunks.Sample{
|
||||
`{__name__="my_app_latency_seconds", __type__="histogram", __unit__="seconds", code="200", test="new"}`: testNHCBSamples,
|
||||
`{__name__="my_app_latency_seconds", __type__="histogram", __unit__="seconds", code="200", test="old"}`: scaleSamples(testNHCBSamples, false, 1000),
|
||||
}, compatibleResult)
|
||||
})
|
||||
t.Run("forward", func(t *testing.T) {
|
||||
onlyOldResult := query(t, notAware,
|
||||
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.0.0")),
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataLatencySeriesOld.MetricIdentity().Name),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "code", testdataLatencySeriesOld.Get("code")),
|
||||
)
|
||||
require.Equal(t, map[string][]chunks.Sample{
|
||||
`{__name__="my_app_latency_milliseconds", __schema_url__="` + testSchemaURL("v1.0.0") + `", __type__="histogram", __unit__="milliseconds", code="200", test="old"}`: testNHCBSamples,
|
||||
}, onlyOldResult)
|
||||
got := query(t, aware,
|
||||
// Without schema selector, semconv aware storage should have no effect.
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataLatencySeriesOld.MetricIdentity().Name),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "code", testdataLatencySeriesOld.Get("code")),
|
||||
)
|
||||
require.Equal(t, onlyOldResult, got)
|
||||
|
||||
compatibleResult := query(t, aware,
|
||||
labels.MustNewMatcher(labels.MatchEqual, schemaURLLabel, testSchemaURL("v1.0.0")),
|
||||
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testdataLatencySeriesOld.MetricIdentity().Name),
|
||||
labels.MustNewMatcher(labels.MatchEqual, "code", testdataLatencySeriesOld.Get("code")),
|
||||
)
|
||||
require.Equal(t, map[string][]chunks.Sample{
|
||||
`{__name__="my_app_latency_milliseconds", __type__="histogram", __unit__="milliseconds", code="200", test="new"}`: scaleSamples(testNHCBSamples, true, 1000),
|
||||
`{__name__="my_app_latency_milliseconds", __type__="histogram", __unit__="milliseconds", code="200", test="old"}`: testNHCBSamples,
|
||||
}, compatibleResult)
|
||||
})
|
||||
})
|
||||
}
|
||||
36
semconv/testdata/latest/.gen/changelog.yaml
vendored
Normal file
36
semconv/testdata/latest/.gen/changelog.yaml
vendored
Normal file
|
|
@ -0,0 +1,36 @@
|
|||
# Version of this file.
|
||||
version: 1
|
||||
|
||||
# changelog contains all changes made to elements with the same semantic id.
|
||||
metrics_changelog:
|
||||
my_app_latency:
|
||||
- forward:
|
||||
metric_name: my_app_latency_seconds
|
||||
unit: "{seconds}"
|
||||
value_promql: "metric{} / 1000"
|
||||
backward:
|
||||
metric_name: my_app_latency_milliseconds
|
||||
unit: "{milliseconds}"
|
||||
value_promql: "metric{} * 1000"
|
||||
|
||||
my_app_custom_elements:
|
||||
- forward:
|
||||
metric_name: my_app_custom_elements_changed_total
|
||||
attributes:
|
||||
- tag: "number"
|
||||
- tag: "class"
|
||||
members:
|
||||
- value: "FIRST"
|
||||
- value: "SECOND"
|
||||
- value: "OTHER"
|
||||
backward:
|
||||
metric_name: my_app_custom_elements_total
|
||||
attributes:
|
||||
- tag: "integer"
|
||||
- tag: "category"
|
||||
members:
|
||||
- value: "first"
|
||||
- value: "second"
|
||||
- value: "other"
|
||||
|
||||
my_app_some_elements_totals:
|
||||
20
semconv/testdata/latest/.gen/ids.yaml
vendored
Normal file
20
semconv/testdata/latest/.gen/ids.yaml
vendored
Normal file
|
|
@ -0,0 +1,20 @@
|
|||
# Version of this file.
|
||||
version: 1
|
||||
|
||||
# map from identity of an element to its id(s).
|
||||
metrics_ids:
|
||||
my_app_latency_seconds~seconds.histogram:
|
||||
- id: "my_app_latency.2"
|
||||
intro_version: "v1.1.0" # When introduced.
|
||||
my_app_custom_elements_changed_total.counter:
|
||||
- id: "my_app_custom_elements.2"
|
||||
intro_version: "v1.1.0"
|
||||
my_app_latency_milliseconds~milliseconds.histogram:
|
||||
- id: "my_app_latency"
|
||||
intro_version: "v1.0.0"
|
||||
my_app_custom_elements_total.counter:
|
||||
- id: "my_app_custom_elements"
|
||||
intro_version: "v1.0.0"
|
||||
my_app_some_elements~gauge:
|
||||
- id: "my_app_some_elements"
|
||||
intro_version: "v1.0.0"
|
||||
0
semconv/testdata/v1.0.0/.gitkeep
vendored
Normal file
0
semconv/testdata/v1.0.0/.gitkeep
vendored
Normal file
0
semconv/testdata/v1.1.0/.gitkeep
vendored
Normal file
0
semconv/testdata/v1.1.0/.gitkeep
vendored
Normal file
65
semconv/value.go
Normal file
65
semconv/value.go
Normal file
|
|
@ -0,0 +1,65 @@
|
|||
package semconv
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
)
|
||||
|
||||
type valueTransformer struct {
|
||||
expr parser.Expr
|
||||
}
|
||||
|
||||
func newValueTransformer(toPromQL string) (*valueTransformer, error) {
|
||||
p := parser.NewParser(toPromQL)
|
||||
expr, err := p.ParseExpr()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't parse %v: %w", toPromQL, err)
|
||||
}
|
||||
if _, err = transform(expr, 0); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &valueTransformer{expr: expr}, nil
|
||||
}
|
||||
|
||||
func (t *valueTransformer) Transform(v float64) float64 {
|
||||
if t == nil {
|
||||
return v // Noop.
|
||||
}
|
||||
// We did what we could and tested transform in constructor, skipping here.
|
||||
v, _ = transform(t.expr, v)
|
||||
return v
|
||||
}
|
||||
|
||||
func transform(node parser.Expr, in float64) (float64, error) {
|
||||
switch e := node.(type) {
|
||||
case *parser.NumberLiteral:
|
||||
return e.Val, nil
|
||||
case *parser.VectorSelector:
|
||||
return in, nil
|
||||
case *parser.BinaryExpr:
|
||||
lhs, err := transform(e.LHS, in)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
rhs, err := transform(e.RHS, in)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
switch e.Op {
|
||||
case parser.ADD:
|
||||
return lhs + rhs, nil
|
||||
case parser.SUB:
|
||||
return lhs - rhs, nil
|
||||
case parser.MUL:
|
||||
return lhs * rhs, nil
|
||||
case parser.DIV:
|
||||
return lhs / rhs, nil
|
||||
default:
|
||||
return 0, fmt.Errorf("binary operator %v not allowed", parser.ItemTypeStr[e.Op])
|
||||
}
|
||||
default:
|
||||
return 0, fmt.Errorf("PromQL node %v not allowed", e.Type())
|
||||
}
|
||||
}
|
||||
24
semconv/value_test.go
Normal file
24
semconv/value_test.go
Normal file
|
|
@ -0,0 +1,24 @@
|
|||
package semconv
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestValueTransformer(t *testing.T) {
|
||||
v, err := newValueTransformer("value{} / 1000")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, float64(4), v.Transform(4000))
|
||||
require.Equal(t, float64(4), v.Transform(4000))
|
||||
|
||||
v, err = newValueTransformer("whatever{foo=\"bar\"} * 1024")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, float64(81920), v.Transform(80))
|
||||
require.Equal(t, float64(81920), v.Transform(80))
|
||||
|
||||
v, err = newValueTransformer("a{} + 15 - 44")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, float64(-27), v.Transform(2))
|
||||
require.Equal(t, float64(-27), v.Transform(2))
|
||||
}
|
||||
Loading…
Reference in a new issue