From d68982d38ec8a74e60012027a91a7110a3e1c06e Mon Sep 17 00:00:00 2001 From: bwplotka Date: Sun, 30 Mar 2025 22:13:03 +0100 Subject: [PATCH] Add support for overrides. Signed-off-by: bwplotka --- cmd/prometheus/main.go | 20 +++++++--- config/config.go | 18 +++++++++ semconv/engine.go | 84 ++++++++++++++++++++++++++------------- semconv/engine_test.go | 13 ++++++ semconv/gen_files.go | 13 ++++++ semconv/gen_files_test.go | 57 +++++++++++++++++++++----- semconv/storage.go | 20 +++++++++- semconv/storage_test.go | 29 +++++++++++--- semconv/value.go | 13 ++++++ semconv/value_test.go | 13 ++++++ storage/merge.go | 10 +---- 11 files changed, 230 insertions(+), 60 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 8a4902a8d1..a1595dc61c 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -728,14 +728,15 @@ func main() { ) var ( - localStorage = &readyStorage{stats: tsdb.NewDBStats()} - scraper = &readyScrapeManager{} - remoteStorage = remote.NewStorage(logger.With("component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper) - fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) + localStorage = &readyStorage{stats: tsdb.NewDBStats()} + scraper = &readyScrapeManager{} + remoteStorage = remote.NewStorage(logger.With("component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper) + fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) + semconvReloader func(*config.Config) error ) if cfg.enableSemconvVersionedRead { - fanoutStorage = semconv.AwareStorage(fanoutStorage) + fanoutStorage, semconvReloader = semconv.AwareStorage(fanoutStorage) } var ( @@ -969,6 +970,15 @@ func main() { } return discoveryManagerNotify.ApplyConfig(c) }, + }, + { + name: "semconv", + reloader: func(c *config.Config) error { + if !cfg.enableSemconvVersionedRead { + return nil + } + return semconvReloader(c) + }, }, { name: "rules", reloader: func(cfg *config.Config) error { diff --git a/config/config.go b/config/config.go index a38080f22a..8e16d635d6 100644 --- a/config/config.go +++ b/config/config.go @@ -272,10 +272,28 @@ type Config struct { RemoteWriteConfigs []*RemoteWriteConfig `yaml:"remote_write,omitempty"` RemoteReadConfigs []*RemoteReadConfig `yaml:"remote_read,omitempty"` OTLPConfig OTLPConfig `yaml:"otlp,omitempty"` + SemConv SemConvConfig `yaml:"semconv,omitempty"` loaded bool // Certain methods require configuration to use Load validation. } +// SemConvConfig represents semconv package runtime configuration. +type SemConvConfig struct { + // SchemaOverrides is a local mapping from the schema base URL (identifying schema + // registries) to an alternative schema base URL. URLs are considered local + // (within the process local filesystem) if they don't start with "http" prefix. + // + // This option is useful when manual adjustments to schema are needed or + // for when reaching internet is not an option (local cache). + // + // For example, imagine data that use schema URL like https://bwplotka.dev/semconv/1.1.0 + // Setting an override with "https://bwplotka.dev/semconv" -> "./semconv" will + // cause Prometheus to look for the schema artifacts (e.g.in current implementation + // "./semconv/changelog.yaml" and "./semconv/ids.yaml" files) in the local + // filesystem. + SchemaOverrides map[string]string `yaml:"schema_overrides,omitempty"` +} + // SetDirectory joins any relative file paths with dir. // This method writes to config, and it's not concurrency safe. func (c *Config) SetDirectory(dir string) { diff --git a/semconv/engine.go b/semconv/engine.go index 2368c3a902..7f8df62891 100644 --- a/semconv/engine.go +++ b/semconv/engine.go @@ -1,3 +1,16 @@ +// Copyright 2025 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package semconv import ( @@ -12,6 +25,8 @@ import ( "github.com/maruel/natural" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/model/labels" ) @@ -19,16 +34,19 @@ const cacheTTL = 1 * time.Hour type schemaEngine struct { // TODO(bwplotka): Implement GC logic for ttl and limits. - cachedIDs map[string]*ids - cacheIDsMu sync.RWMutex - cachedChangelog map[string]*changelog - cacheChangelogMu sync.RWMutex + cachedIDs map[string]*ids + cacheMu sync.RWMutex + cachedChangelog map[string]*changelog + + schemaBaseOverride map[string]string } func newSchemaEngine() *schemaEngine { return &schemaEngine{ cachedIDs: map[string]*ids{}, cachedChangelog: map[string]*changelog{}, + + schemaBaseOverride: map[string]string{}, } } @@ -101,10 +119,24 @@ func (b matcherBuilder) ToMatchers(extraNameSuffix string) []*labels.Matcher { return append(ret, b.other...) } -func (e *schemaEngine) fetchIDs(schemaIDsURL string) (_ *ids, err error) { - e.cacheIDsMu.RLock() +func (e *schemaEngine) ApplyConfig(cfg *config.Config) error { + e.cacheMu.Lock() + e.schemaBaseOverride = cfg.SemConv.SchemaOverrides + e.cacheMu.Unlock() + return nil +} + +func (e *schemaEngine) fetchIDs(schemaURL string) (_ *ids, err error) { + e.cacheMu.RLock() + // NOTE(bwplotka): Be careful with path as it cleans potential http:// to http:/ + schemaBase, _ := path.Split(schemaURL) + schemaBase = strings.TrimSuffix(schemaBase, "/") + if o, ok := e.schemaBaseOverride[schemaBase]; ok { + schemaBase = o + } + schemaIDsURL := fmt.Sprintf("%v/ids.yaml", schemaBase) ids, ok := e.cachedIDs[schemaIDsURL] - e.cacheIDsMu.RUnlock() + e.cacheMu.RUnlock() if ok && time.Now().Sub(ids.fetchTime) < cacheTTL { return ids, nil } @@ -113,16 +145,24 @@ func (e *schemaEngine) fetchIDs(schemaIDsURL string) (_ *ids, err error) { if err != nil { return nil, err } - e.cacheIDsMu.Lock() + e.cacheMu.Lock() e.cachedIDs[schemaIDsURL] = ids - e.cacheIDsMu.Unlock() + e.cacheMu.Unlock() return ids, nil } -func (e *schemaEngine) fetchChangelog(schemaChangelogURL string) (_ *changelog, err error) { - e.cacheChangelogMu.RLock() +func (e *schemaEngine) fetchChangelog(schemaURL string) (_ *changelog, err error) { + e.cacheMu.RLock() + // NOTE(bwplotka): Be careful with path as it cleans potential http:// to http:/ + schemaBase, _ := path.Split(schemaURL) + schemaBase = strings.TrimSuffix(schemaBase, "/") + if o, ok := e.schemaBaseOverride[schemaBase]; ok { + schemaBase = o + } + schemaChangelogURL := fmt.Sprintf("%v/changelog.yaml", schemaBase) + ch, ok := e.cachedChangelog[schemaChangelogURL] - e.cacheChangelogMu.RUnlock() + e.cacheMu.RUnlock() if ok && time.Now().Sub(ch.fetchTime) < cacheTTL { return ch, nil } @@ -131,24 +171,12 @@ func (e *schemaEngine) fetchChangelog(schemaChangelogURL string) (_ *changelog, if err != nil { return nil, err } - e.cacheChangelogMu.Lock() + e.cacheMu.Lock() e.cachedChangelog[schemaChangelogURL] = ch - e.cacheChangelogMu.Unlock() + e.cacheMu.Unlock() return ch, nil } -func schemaChangelogURL(schemaURL string) string { - // NOTE(bwplotka): Be careful with path as it cleans potential http:// to http:/ - dir, _ := path.Split(schemaURL) - return fmt.Sprintf("%v/changelog.yaml", dir) -} - -func schemaIDsURL(schemaURL string) string { - // NOTE(bwplotka): Be careful with path as it cleans potential http:// to http:/ - dir, _ := path.Split(schemaURL) - return fmt.Sprintf("%v/ids.yaml", dir) -} - // findMetricID returns the metric ID from the schema definition for this identity and schema URL. // This allows parsing semantic ID and the revision number. This function also returns // magicSuffix that was matched if any. @@ -156,7 +184,7 @@ func (e *schemaEngine) findMetricID(schemaURL string, metric labels.MetricIdenti schemaVersion := path.Base(schemaURL) // TODO(bwplotka): This assumes such a file structure is part of the spec. - ids, err := e.fetchIDs(schemaIDsURL(schemaURL)) + ids, err := e.fetchIDs(schemaURL) if err != nil { return "", "", fmt.Errorf("based on __schema_url__=%v; %w", schemaURL, err) } @@ -224,7 +252,7 @@ func (e *schemaEngine) FindMatcherVariants(schemaURL string, originalMatchers [] return nil, q, fmt.Errorf("FindMetricID: %w", err) } - ch, err := e.fetchChangelog(schemaChangelogURL(schemaURL)) + ch, err := e.fetchChangelog(schemaURL) if err != nil { return nil, q, err } diff --git a/semconv/engine_test.go b/semconv/engine_test.go index d28093a500..cd98ea7865 100644 --- a/semconv/engine_test.go +++ b/semconv/engine_test.go @@ -1,3 +1,16 @@ +// Copyright 2025 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package semconv import ( diff --git a/semconv/gen_files.go b/semconv/gen_files.go index d587cf85fc..0e3c920b06 100644 --- a/semconv/gen_files.go +++ b/semconv/gen_files.go @@ -1,3 +1,16 @@ +// Copyright 2025 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package semconv import ( diff --git a/semconv/gen_files_test.go b/semconv/gen_files_test.go index 1e7f1e51fe..23286ea32c 100644 --- a/semconv/gen_files_test.go +++ b/semconv/gen_files_test.go @@ -1,3 +1,16 @@ +// Copyright 2025 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package semconv import ( @@ -8,14 +21,14 @@ import ( var ( testdataElementsChanges = []change{ - { - Forward: metricGroupChange{MetricName: "", Unit: "", ValuePromQL: "", Attributes: []attribute{{Tag: "my_number"}}}, - Backward: metricGroupChange{MetricName: "", Unit: "", ValuePromQL: "", Attributes: []attribute{{Tag: "number"}}}, - }, { Forward: metricGroupChange{MetricName: "my_app_custom_changed_elements_total", Unit: "", ValuePromQL: "", Attributes: []attribute{{Tag: "number"}, {Tag: "class", Members: []attributeMember{{Value: "FIRST"}, {Value: "SECOND"}, {Value: "OTHER"}}}}}, Backward: metricGroupChange{MetricName: "my_app_custom_elements_total", Unit: "", ValuePromQL: "", Attributes: []attribute{{Tag: "integer"}, {Tag: "category", Members: []attributeMember{{Value: "first"}, {Value: "second"}, {Value: "other"}}}}}, }, + { + Forward: metricGroupChange{MetricName: "", Unit: "", ValuePromQL: "", Attributes: []attribute{{Tag: "my_number"}}}, + Backward: metricGroupChange{MetricName: "", Unit: "", ValuePromQL: "", Attributes: []attribute{{Tag: "number"}}}, + }, } testdataLatencyChanges = []change{ { @@ -35,24 +48,35 @@ func TestFetchChangelog(t *testing.T) { } t.Run("local", func(t *testing.T) { - got, err := fetchChangelog("./testdata/changelog.yaml") + e := newSchemaEngine() + got, err := e.fetchChangelog("./testdata/1.1.0") require.NoError(t, err) require.Equal(t, expected, got) }) // TODO(bwplotka): Move to something Prometheus owns e.g. internal Prometheus repo path. t.Run("http", func(t *testing.T) { - got, err := fetchChangelog("https://raw.githubusercontent.com/bwplotka/metric-rename-demo/refs/heads/diff/my-org/semconv/changelog.yaml") + e := newSchemaEngine() + got, err := e.fetchChangelog("https://raw.githubusercontent.com/bwplotka/metric-rename-demo/refs/heads/diff/my-org/semconv/1.1.0") require.NoError(t, err) require.Equal(t, expected, got) }) t.Run("http-custom", func(t *testing.T) { - got, err := fetchChangelog("https://bwplotka.dev/semconv/changelog.yaml") + e := newSchemaEngine() + got, err := e.fetchChangelog("https://bwplotka.dev/semconv/1.0.0") + require.NoError(t, err) + require.Equal(t, expected, got) + }) + t.Run("override", func(t *testing.T) { + e := newSchemaEngine() + e.schemaBaseOverride["https://bwplotka.dev/YOLO"] = "./testdata" + + got, err := e.fetchChangelog("https://bwplotka.dev/YOLO/1.1.0") require.NoError(t, err) require.Equal(t, expected, got) }) } -func TestFetchIDs(t *testing.T) { +func TestSchemaEngine_FetchIDs(t *testing.T) { expected := &ids{ Version: 1, MetricsIDs: map[string][]versionedID{ @@ -106,17 +130,28 @@ func TestFetchIDs(t *testing.T) { } t.Run("local", func(t *testing.T) { - got, err := fetchIDs("./testdata/ids.yaml") + e := newSchemaEngine() + got, err := e.fetchIDs("./testdata/1.1.0") require.NoError(t, err) require.Equal(t, expected, got) }) t.Run("http", func(t *testing.T) { - got, err := fetchIDs("https://raw.githubusercontent.com/bwplotka/metric-rename-demo/refs/heads/diff/my-org/semconv/ids.yaml") + e := newSchemaEngine() + got, err := e.fetchIDs("https://raw.githubusercontent.com/bwplotka/metric-rename-demo/refs/heads/diff/my-org/semconv/1.1.0") require.NoError(t, err) require.Equal(t, expected, got) }) t.Run("http-custom", func(t *testing.T) { - got, err := fetchIDs("https://bwplotka.dev/semconv/ids.yaml") + e := newSchemaEngine() + got, err := e.fetchIDs("https://bwplotka.dev/semconv/1.1.0") + require.NoError(t, err) + require.Equal(t, expected, got) + }) + t.Run("override", func(t *testing.T) { + e := newSchemaEngine() + e.schemaBaseOverride["https://bwplotka.dev/YOLO"] = "./testdata" + + got, err := e.fetchIDs("https://bwplotka.dev/YOLO/1.1.0") require.NoError(t, err) require.Equal(t, expected, got) }) diff --git a/semconv/storage.go b/semconv/storage.go index 83813b4eeb..87c7da5dfd 100644 --- a/semconv/storage.go +++ b/semconv/storage.go @@ -1,3 +1,16 @@ +// Copyright 2025 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package semconv import ( @@ -8,6 +21,8 @@ import ( "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -20,8 +35,9 @@ const schemaURLLabel = "__schema_url__" // AwareStorage wraps given storage with a semconv awareness that // performs versioned read when __schema_url__ matcher is provided. // TODO(bwplotka): Technically we only need Querier? -func AwareStorage(s storage.Storage) storage.Storage { - return &awareStorage{Storage: s, engine: newSchemaEngine()} +func AwareStorage(s storage.Storage) (storage.Storage, func(config *config.Config) error) { + e := newSchemaEngine() + return &awareStorage{Storage: s, engine: e}, e.ApplyConfig } type awareStorage struct { diff --git a/semconv/storage_test.go b/semconv/storage_test.go index 8ebae2ed88..8ce3d8d593 100644 --- a/semconv/storage_test.go +++ b/semconv/storage_test.go @@ -1,3 +1,16 @@ +// Copyright 2025 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package semconv import ( @@ -9,7 +22,6 @@ import ( "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/model/timestamp" - "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/model/histogram" @@ -281,7 +293,8 @@ func TestAwareStorage(t *testing.T) { t.Cleanup(func() { require.NoError(t, notAware.Close()) }) - aware, err := AwareStorage(db).Querier(0, samples) + s, _ := AwareStorage(db) + aware, err := s.Querier(0, samples) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, aware.Close()) @@ -379,7 +392,8 @@ func TestAwareStorage(t *testing.T) { t.Cleanup(func() { require.NoError(t, notAware.Close()) }) - aware, err := AwareStorage(db).Querier(0, samples) + s, _ := AwareStorage(db) + aware, err := s.Querier(0, samples) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, aware.Close()) @@ -557,7 +571,8 @@ func TestAwareStorage(t *testing.T) { t.Cleanup(func() { require.NoError(t, notAware.Close()) }) - aware, err := AwareStorage(db).Querier(0, samples) + s, _ := AwareStorage(db) + aware, err := s.Querier(0, samples) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, aware.Close()) @@ -642,8 +657,9 @@ func TestAwareStorage_PromQL_OverlappingSeries(t *testing.T) { {series: seriesNew, samples: testFSamples[5:]}, }) + s, _ := AwareStorage(db) q, err := e.NewInstantQuery( - ctx, AwareStorage(db), nil, + ctx, s, nil, fmt.Sprintf("rate(%v{__schema_url__=%q}[10])", seriesOld.MetricIdentity().Name, testSchemaURL("1.0.0")), timestamp.Time(10), ) require.NoError(t, err) @@ -662,8 +678,9 @@ func TestAwareStorage_PromQL_OverlappingSeries(t *testing.T) { {series: seriesNew, samples: testFSamples}, }) + s, _ := AwareStorage(db) q, err := e.NewInstantQuery( - ctx, AwareStorage(db), nil, + ctx, s, nil, fmt.Sprintf("rate(%v{__schema_url__=%q}[10])", seriesOld.MetricIdentity().Name, testSchemaURL("1.0.0")), timestamp.Time(10), ) require.NoError(t, err) diff --git a/semconv/value.go b/semconv/value.go index a491fde43a..cc1e82c8f2 100644 --- a/semconv/value.go +++ b/semconv/value.go @@ -1,3 +1,16 @@ +// Copyright 2025 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package semconv import ( diff --git a/semconv/value_test.go b/semconv/value_test.go index b835a503bd..ee04d9c7b5 100644 --- a/semconv/value_test.go +++ b/semconv/value_test.go @@ -1,3 +1,16 @@ +// Copyright 2025 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package semconv import ( diff --git a/storage/merge.go b/storage/merge.go index f477fae59b..f65cec6850 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -408,20 +408,14 @@ func (c *genericMergeSeriesSet) Next() bool { func (c *genericMergeSeriesSet) At() Labels { if len(c.currentSets) == 1 { - at := c.currentSets[0].At() - fmt.Println("DEBUG: (one) Returning", at.Labels(), "from", at) - return at + return c.currentSets[0].At() } series := make([]Labels, 0, len(c.currentSets)) for _, seriesSet := range c.currentSets { at := seriesSet.At() - fmt.Println("DEBUG: Got", at.Labels(), "from", at) series = append(series, at) } - // DEBUG. - l := c.mergeFunc(series...) - fmt.Println("DEBUG: Returning", l.Labels(), "from", l) - return l + return c.mergeFunc(series...) } func (c *genericMergeSeriesSet) Err() error {