Add support for overrides.

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2025-03-30 22:13:03 +01:00
parent f344f7343f
commit d68982d38e
11 changed files with 230 additions and 60 deletions

View file

@ -728,14 +728,15 @@ func main() {
)
var (
localStorage = &readyStorage{stats: tsdb.NewDBStats()}
scraper = &readyScrapeManager{}
remoteStorage = remote.NewStorage(logger.With("component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper)
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
localStorage = &readyStorage{stats: tsdb.NewDBStats()}
scraper = &readyScrapeManager{}
remoteStorage = remote.NewStorage(logger.With("component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper)
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
semconvReloader func(*config.Config) error
)
if cfg.enableSemconvVersionedRead {
fanoutStorage = semconv.AwareStorage(fanoutStorage)
fanoutStorage, semconvReloader = semconv.AwareStorage(fanoutStorage)
}
var (
@ -969,6 +970,15 @@ func main() {
}
return discoveryManagerNotify.ApplyConfig(c)
},
},
{
name: "semconv",
reloader: func(c *config.Config) error {
if !cfg.enableSemconvVersionedRead {
return nil
}
return semconvReloader(c)
},
}, {
name: "rules",
reloader: func(cfg *config.Config) error {

View file

@ -272,10 +272,28 @@ type Config struct {
RemoteWriteConfigs []*RemoteWriteConfig `yaml:"remote_write,omitempty"`
RemoteReadConfigs []*RemoteReadConfig `yaml:"remote_read,omitempty"`
OTLPConfig OTLPConfig `yaml:"otlp,omitempty"`
SemConv SemConvConfig `yaml:"semconv,omitempty"`
loaded bool // Certain methods require configuration to use Load validation.
}
// SemConvConfig represents semconv package runtime configuration.
type SemConvConfig struct {
// SchemaOverrides is a local mapping from the schema base URL (identifying schema
// registries) to an alternative schema base URL. URLs are considered local
// (within the process local filesystem) if they don't start with "http" prefix.
//
// This option is useful when manual adjustments to schema are needed or
// for when reaching internet is not an option (local cache).
//
// For example, imagine data that use schema URL like https://bwplotka.dev/semconv/1.1.0
// Setting an override with "https://bwplotka.dev/semconv" -> "./semconv" will
// cause Prometheus to look for the schema artifacts (e.g.in current implementation
// "./semconv/changelog.yaml" and "./semconv/ids.yaml" files) in the local
// filesystem.
SchemaOverrides map[string]string `yaml:"schema_overrides,omitempty"`
}
// SetDirectory joins any relative file paths with dir.
// This method writes to config, and it's not concurrency safe.
func (c *Config) SetDirectory(dir string) {

View file

@ -1,3 +1,16 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package semconv
import (
@ -12,6 +25,8 @@ import (
"github.com/maruel/natural"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels"
)
@ -19,16 +34,19 @@ const cacheTTL = 1 * time.Hour
type schemaEngine struct {
// TODO(bwplotka): Implement GC logic for ttl and limits.
cachedIDs map[string]*ids
cacheIDsMu sync.RWMutex
cachedChangelog map[string]*changelog
cacheChangelogMu sync.RWMutex
cachedIDs map[string]*ids
cacheMu sync.RWMutex
cachedChangelog map[string]*changelog
schemaBaseOverride map[string]string
}
func newSchemaEngine() *schemaEngine {
return &schemaEngine{
cachedIDs: map[string]*ids{},
cachedChangelog: map[string]*changelog{},
schemaBaseOverride: map[string]string{},
}
}
@ -101,10 +119,24 @@ func (b matcherBuilder) ToMatchers(extraNameSuffix string) []*labels.Matcher {
return append(ret, b.other...)
}
func (e *schemaEngine) fetchIDs(schemaIDsURL string) (_ *ids, err error) {
e.cacheIDsMu.RLock()
func (e *schemaEngine) ApplyConfig(cfg *config.Config) error {
e.cacheMu.Lock()
e.schemaBaseOverride = cfg.SemConv.SchemaOverrides
e.cacheMu.Unlock()
return nil
}
func (e *schemaEngine) fetchIDs(schemaURL string) (_ *ids, err error) {
e.cacheMu.RLock()
// NOTE(bwplotka): Be careful with path as it cleans potential http:// to http:/
schemaBase, _ := path.Split(schemaURL)
schemaBase = strings.TrimSuffix(schemaBase, "/")
if o, ok := e.schemaBaseOverride[schemaBase]; ok {
schemaBase = o
}
schemaIDsURL := fmt.Sprintf("%v/ids.yaml", schemaBase)
ids, ok := e.cachedIDs[schemaIDsURL]
e.cacheIDsMu.RUnlock()
e.cacheMu.RUnlock()
if ok && time.Now().Sub(ids.fetchTime) < cacheTTL {
return ids, nil
}
@ -113,16 +145,24 @@ func (e *schemaEngine) fetchIDs(schemaIDsURL string) (_ *ids, err error) {
if err != nil {
return nil, err
}
e.cacheIDsMu.Lock()
e.cacheMu.Lock()
e.cachedIDs[schemaIDsURL] = ids
e.cacheIDsMu.Unlock()
e.cacheMu.Unlock()
return ids, nil
}
func (e *schemaEngine) fetchChangelog(schemaChangelogURL string) (_ *changelog, err error) {
e.cacheChangelogMu.RLock()
func (e *schemaEngine) fetchChangelog(schemaURL string) (_ *changelog, err error) {
e.cacheMu.RLock()
// NOTE(bwplotka): Be careful with path as it cleans potential http:// to http:/
schemaBase, _ := path.Split(schemaURL)
schemaBase = strings.TrimSuffix(schemaBase, "/")
if o, ok := e.schemaBaseOverride[schemaBase]; ok {
schemaBase = o
}
schemaChangelogURL := fmt.Sprintf("%v/changelog.yaml", schemaBase)
ch, ok := e.cachedChangelog[schemaChangelogURL]
e.cacheChangelogMu.RUnlock()
e.cacheMu.RUnlock()
if ok && time.Now().Sub(ch.fetchTime) < cacheTTL {
return ch, nil
}
@ -131,24 +171,12 @@ func (e *schemaEngine) fetchChangelog(schemaChangelogURL string) (_ *changelog,
if err != nil {
return nil, err
}
e.cacheChangelogMu.Lock()
e.cacheMu.Lock()
e.cachedChangelog[schemaChangelogURL] = ch
e.cacheChangelogMu.Unlock()
e.cacheMu.Unlock()
return ch, nil
}
func schemaChangelogURL(schemaURL string) string {
// NOTE(bwplotka): Be careful with path as it cleans potential http:// to http:/
dir, _ := path.Split(schemaURL)
return fmt.Sprintf("%v/changelog.yaml", dir)
}
func schemaIDsURL(schemaURL string) string {
// NOTE(bwplotka): Be careful with path as it cleans potential http:// to http:/
dir, _ := path.Split(schemaURL)
return fmt.Sprintf("%v/ids.yaml", dir)
}
// findMetricID returns the metric ID from the schema definition for this identity and schema URL.
// This allows parsing semantic ID and the revision number. This function also returns
// magicSuffix that was matched if any.
@ -156,7 +184,7 @@ func (e *schemaEngine) findMetricID(schemaURL string, metric labels.MetricIdenti
schemaVersion := path.Base(schemaURL)
// TODO(bwplotka): This assumes such a file structure is part of the spec.
ids, err := e.fetchIDs(schemaIDsURL(schemaURL))
ids, err := e.fetchIDs(schemaURL)
if err != nil {
return "", "", fmt.Errorf("based on __schema_url__=%v; %w", schemaURL, err)
}
@ -224,7 +252,7 @@ func (e *schemaEngine) FindMatcherVariants(schemaURL string, originalMatchers []
return nil, q, fmt.Errorf("FindMetricID: %w", err)
}
ch, err := e.fetchChangelog(schemaChangelogURL(schemaURL))
ch, err := e.fetchChangelog(schemaURL)
if err != nil {
return nil, q, err
}

View file

@ -1,3 +1,16 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package semconv
import (

View file

@ -1,3 +1,16 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package semconv
import (

View file

@ -1,3 +1,16 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package semconv
import (
@ -8,14 +21,14 @@ import (
var (
testdataElementsChanges = []change{
{
Forward: metricGroupChange{MetricName: "", Unit: "", ValuePromQL: "", Attributes: []attribute{{Tag: "my_number"}}},
Backward: metricGroupChange{MetricName: "", Unit: "", ValuePromQL: "", Attributes: []attribute{{Tag: "number"}}},
},
{
Forward: metricGroupChange{MetricName: "my_app_custom_changed_elements_total", Unit: "", ValuePromQL: "", Attributes: []attribute{{Tag: "number"}, {Tag: "class", Members: []attributeMember{{Value: "FIRST"}, {Value: "SECOND"}, {Value: "OTHER"}}}}},
Backward: metricGroupChange{MetricName: "my_app_custom_elements_total", Unit: "", ValuePromQL: "", Attributes: []attribute{{Tag: "integer"}, {Tag: "category", Members: []attributeMember{{Value: "first"}, {Value: "second"}, {Value: "other"}}}}},
},
{
Forward: metricGroupChange{MetricName: "", Unit: "", ValuePromQL: "", Attributes: []attribute{{Tag: "my_number"}}},
Backward: metricGroupChange{MetricName: "", Unit: "", ValuePromQL: "", Attributes: []attribute{{Tag: "number"}}},
},
}
testdataLatencyChanges = []change{
{
@ -35,24 +48,35 @@ func TestFetchChangelog(t *testing.T) {
}
t.Run("local", func(t *testing.T) {
got, err := fetchChangelog("./testdata/changelog.yaml")
e := newSchemaEngine()
got, err := e.fetchChangelog("./testdata/1.1.0")
require.NoError(t, err)
require.Equal(t, expected, got)
})
// TODO(bwplotka): Move to something Prometheus owns e.g. internal Prometheus repo path.
t.Run("http", func(t *testing.T) {
got, err := fetchChangelog("https://raw.githubusercontent.com/bwplotka/metric-rename-demo/refs/heads/diff/my-org/semconv/changelog.yaml")
e := newSchemaEngine()
got, err := e.fetchChangelog("https://raw.githubusercontent.com/bwplotka/metric-rename-demo/refs/heads/diff/my-org/semconv/1.1.0")
require.NoError(t, err)
require.Equal(t, expected, got)
})
t.Run("http-custom", func(t *testing.T) {
got, err := fetchChangelog("https://bwplotka.dev/semconv/changelog.yaml")
e := newSchemaEngine()
got, err := e.fetchChangelog("https://bwplotka.dev/semconv/1.0.0")
require.NoError(t, err)
require.Equal(t, expected, got)
})
t.Run("override", func(t *testing.T) {
e := newSchemaEngine()
e.schemaBaseOverride["https://bwplotka.dev/YOLO"] = "./testdata"
got, err := e.fetchChangelog("https://bwplotka.dev/YOLO/1.1.0")
require.NoError(t, err)
require.Equal(t, expected, got)
})
}
func TestFetchIDs(t *testing.T) {
func TestSchemaEngine_FetchIDs(t *testing.T) {
expected := &ids{
Version: 1,
MetricsIDs: map[string][]versionedID{
@ -106,17 +130,28 @@ func TestFetchIDs(t *testing.T) {
}
t.Run("local", func(t *testing.T) {
got, err := fetchIDs("./testdata/ids.yaml")
e := newSchemaEngine()
got, err := e.fetchIDs("./testdata/1.1.0")
require.NoError(t, err)
require.Equal(t, expected, got)
})
t.Run("http", func(t *testing.T) {
got, err := fetchIDs("https://raw.githubusercontent.com/bwplotka/metric-rename-demo/refs/heads/diff/my-org/semconv/ids.yaml")
e := newSchemaEngine()
got, err := e.fetchIDs("https://raw.githubusercontent.com/bwplotka/metric-rename-demo/refs/heads/diff/my-org/semconv/1.1.0")
require.NoError(t, err)
require.Equal(t, expected, got)
})
t.Run("http-custom", func(t *testing.T) {
got, err := fetchIDs("https://bwplotka.dev/semconv/ids.yaml")
e := newSchemaEngine()
got, err := e.fetchIDs("https://bwplotka.dev/semconv/1.1.0")
require.NoError(t, err)
require.Equal(t, expected, got)
})
t.Run("override", func(t *testing.T) {
e := newSchemaEngine()
e.schemaBaseOverride["https://bwplotka.dev/YOLO"] = "./testdata"
got, err := e.fetchIDs("https://bwplotka.dev/YOLO/1.1.0")
require.NoError(t, err)
require.Equal(t, expected, got)
})

View file

@ -1,3 +1,16 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package semconv
import (
@ -8,6 +21,8 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
@ -20,8 +35,9 @@ const schemaURLLabel = "__schema_url__"
// AwareStorage wraps given storage with a semconv awareness that
// performs versioned read when __schema_url__ matcher is provided.
// TODO(bwplotka): Technically we only need Querier?
func AwareStorage(s storage.Storage) storage.Storage {
return &awareStorage{Storage: s, engine: newSchemaEngine()}
func AwareStorage(s storage.Storage) (storage.Storage, func(config *config.Config) error) {
e := newSchemaEngine()
return &awareStorage{Storage: s, engine: e}, e.ApplyConfig
}
type awareStorage struct {

View file

@ -1,3 +1,16 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package semconv
import (
@ -9,7 +22,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/model/histogram"
@ -281,7 +293,8 @@ func TestAwareStorage(t *testing.T) {
t.Cleanup(func() {
require.NoError(t, notAware.Close())
})
aware, err := AwareStorage(db).Querier(0, samples)
s, _ := AwareStorage(db)
aware, err := s.Querier(0, samples)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, aware.Close())
@ -379,7 +392,8 @@ func TestAwareStorage(t *testing.T) {
t.Cleanup(func() {
require.NoError(t, notAware.Close())
})
aware, err := AwareStorage(db).Querier(0, samples)
s, _ := AwareStorage(db)
aware, err := s.Querier(0, samples)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, aware.Close())
@ -557,7 +571,8 @@ func TestAwareStorage(t *testing.T) {
t.Cleanup(func() {
require.NoError(t, notAware.Close())
})
aware, err := AwareStorage(db).Querier(0, samples)
s, _ := AwareStorage(db)
aware, err := s.Querier(0, samples)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, aware.Close())
@ -642,8 +657,9 @@ func TestAwareStorage_PromQL_OverlappingSeries(t *testing.T) {
{series: seriesNew, samples: testFSamples[5:]},
})
s, _ := AwareStorage(db)
q, err := e.NewInstantQuery(
ctx, AwareStorage(db), nil,
ctx, s, nil,
fmt.Sprintf("rate(%v{__schema_url__=%q}[10])", seriesOld.MetricIdentity().Name, testSchemaURL("1.0.0")), timestamp.Time(10),
)
require.NoError(t, err)
@ -662,8 +678,9 @@ func TestAwareStorage_PromQL_OverlappingSeries(t *testing.T) {
{series: seriesNew, samples: testFSamples},
})
s, _ := AwareStorage(db)
q, err := e.NewInstantQuery(
ctx, AwareStorage(db), nil,
ctx, s, nil,
fmt.Sprintf("rate(%v{__schema_url__=%q}[10])", seriesOld.MetricIdentity().Name, testSchemaURL("1.0.0")), timestamp.Time(10),
)
require.NoError(t, err)

View file

@ -1,3 +1,16 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package semconv
import (

View file

@ -1,3 +1,16 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package semconv
import (

View file

@ -408,20 +408,14 @@ func (c *genericMergeSeriesSet) Next() bool {
func (c *genericMergeSeriesSet) At() Labels {
if len(c.currentSets) == 1 {
at := c.currentSets[0].At()
fmt.Println("DEBUG: (one) Returning", at.Labels(), "from", at)
return at
return c.currentSets[0].At()
}
series := make([]Labels, 0, len(c.currentSets))
for _, seriesSet := range c.currentSets {
at := seriesSet.At()
fmt.Println("DEBUG: Got", at.Labels(), "from", at)
series = append(series, at)
}
// DEBUG.
l := c.mergeFunc(series...)
fmt.Println("DEBUG: Returning", l.Labels(), "from", l)
return l
return c.mergeFunc(series...)
}
func (c *genericMergeSeriesSet) Err() error {