From 794bf774c2d547f963b9c34ccdbcd26af59fc6ae Mon Sep 17 00:00:00 2001 From: bwplotka Date: Fri, 29 Aug 2025 08:16:37 +0100 Subject: [PATCH] Reapply "prw: use Unit and Type labels for metadata when feature flag is enabled (#17033)" This reverts commit f5fab4757733746a708e7b80324b8929c1b84856. --- cmd/prometheus/main.go | 2 +- storage/remote/queue_manager.go | 54 +++++----- storage/remote/queue_manager_test.go | 146 +++++++++++++++++++++++++-- storage/remote/read_test.go | 2 +- storage/remote/storage.go | 4 +- storage/remote/storage_test.go | 10 +- storage/remote/write.go | 7 +- storage/remote/write_test.go | 18 ++-- tsdb/agent/db_test.go | 6 +- web/api/v1/api_test.go | 2 +- 10 files changed, 195 insertions(+), 56 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 63e7302929..8cbdd762f0 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -782,7 +782,7 @@ func main() { var ( localStorage = &readyStorage{stats: tsdb.NewDBStats()} scraper = &readyScrapeManager{} - remoteStorage = remote.NewStorage(logger.With("component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper) + remoteStorage = remote.NewStorage(logger.With("component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.scrape.EnableTypeAndUnitLabels) fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) ) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 7cebdc3e37..dccbbd4ab7 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -40,6 +40,7 @@ import ( "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/prompb" writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" + "github.com/prometheus/prometheus/schema" "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/record" @@ -407,16 +408,17 @@ type QueueManager struct { reshardDisableStartTimestamp atomic.Int64 // Time that reshard was disabled. reshardDisableEndTimestamp atomic.Int64 // Time that reshard is disabled until. - logger *slog.Logger - flushDeadline time.Duration - cfg config.QueueConfig - mcfg config.MetadataConfig - externalLabels []labels.Label - relabelConfigs []*relabel.Config - sendExemplars bool - sendNativeHistograms bool - watcher *wlog.Watcher - metadataWatcher *MetadataWatcher + logger *slog.Logger + flushDeadline time.Duration + cfg config.QueueConfig + mcfg config.MetadataConfig + externalLabels []labels.Label + relabelConfigs []*relabel.Config + sendExemplars bool + sendNativeHistograms bool + enableTypeAndUnitLabels bool + watcher *wlog.Watcher + metadataWatcher *MetadataWatcher clientMtx sync.RWMutex storeClient WriteClient @@ -468,6 +470,7 @@ func NewQueueManager( sm ReadyScrapeManager, enableExemplarRemoteWrite bool, enableNativeHistogramRemoteWrite bool, + enableTypeAndUnitLabels bool, protoMsg config.RemoteWriteProtoMsg, ) *QueueManager { if logger == nil { @@ -482,15 +485,16 @@ func NewQueueManager( logger = logger.With(remoteName, client.Name(), endpoint, client.Endpoint()) t := &QueueManager{ - logger: logger, - flushDeadline: flushDeadline, - cfg: cfg, - mcfg: mCfg, - externalLabels: extLabelsSlice, - relabelConfigs: relabelConfigs, - storeClient: client, - sendExemplars: enableExemplarRemoteWrite, - sendNativeHistograms: enableNativeHistogramRemoteWrite, + logger: logger, + flushDeadline: flushDeadline, + cfg: cfg, + mcfg: mCfg, + externalLabels: extLabelsSlice, + relabelConfigs: relabelConfigs, + storeClient: client, + sendExemplars: enableExemplarRemoteWrite, + sendNativeHistograms: enableNativeHistogramRemoteWrite, + enableTypeAndUnitLabels: enableTypeAndUnitLabels, seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels), seriesMetadata: make(map[chunks.HeadSeriesRef]*metadata.Metadata), @@ -1540,7 +1544,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { } _ = s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, encBuf, compr) case config.RemoteWriteProtoMsgV2: - nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, nUnexpectedMetadata := populateV2TimeSeries(&symbolTable, batch, pendingDataV2, s.qm.sendExemplars, s.qm.sendNativeHistograms) + nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, nUnexpectedMetadata := populateV2TimeSeries(&symbolTable, batch, pendingDataV2, s.qm.sendExemplars, s.qm.sendNativeHistograms, s.qm.enableTypeAndUnitLabels) n := nPendingSamples + nPendingExemplars + nPendingHistograms if nUnexpectedMetadata > 0 { s.qm.logger.Warn("unexpected metadata sType in populateV2TimeSeries", "count", nUnexpectedMetadata) @@ -1911,7 +1915,7 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2 return accumulatedStats, err } -func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries, pendingData []writev2.TimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int, int, int) { +func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries, pendingData []writev2.TimeSeries, sendExemplars, sendNativeHistograms, enableTypeAndUnitLabels bool) (int, int, int, int, int) { var nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, nUnexpectedMetadata int for nPending, d := range batch { pendingData[nPending].Samples = pendingData[nPending].Samples[:0] @@ -1921,11 +1925,15 @@ func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries, pendingData[nPending].Metadata.UnitRef = symbolTable.Symbolize(d.metadata.Unit) nPendingMetadata++ } else { + var m schema.Metadata + if enableTypeAndUnitLabels { + m = schema.NewMetadataFromLabels(d.seriesLabels) + } // Safeguard against sending garbage in case of not having metadata // for whatever reason. - pendingData[nPending].Metadata.Type = writev2.Metadata_METRIC_TYPE_UNSPECIFIED + pendingData[nPending].Metadata.Type = writev2.FromMetadataType(m.Type) + pendingData[nPending].Metadata.UnitRef = symbolTable.Symbolize(m.Unit) pendingData[nPending].Metadata.HelpRef = 0 - pendingData[nPending].Metadata.UnitRef = 0 } if sendExemplars { diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 573b1acfea..58c7840821 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -42,6 +42,7 @@ import ( "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/prompb" writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" + "github.com/prometheus/prometheus/schema" "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/record" @@ -132,7 +133,7 @@ func TestBasicContentNegotiation(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { dir := t.TempDir() - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) defer s.Close() var ( @@ -241,7 +242,7 @@ func TestSampleDelivery(t *testing.T) { } { t.Run(fmt.Sprintf("%s-%s", tc.protoMsg, tc.name), func(t *testing.T) { dir := t.TempDir() - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) defer s.Close() var ( @@ -322,7 +323,7 @@ func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration, pro func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient, protoMsg config.RemoteWriteProtoMsg) *QueueManager { dir := t.TempDir() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, protoMsg) return m } @@ -363,7 +364,7 @@ func TestMetadataDelivery(t *testing.T) { func TestWALMetadataDelivery(t *testing.T) { dir := t.TempDir() - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) defer s.Close() cfg := config.DefaultQueueConfig @@ -782,7 +783,7 @@ func TestDisableReshardOnRetry(t *testing.T) { } ) - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, false, config.RemoteWriteProtoMsgV1) m.StoreSeries(fakeSeries, 0) // Attempt to samples while the manager is running. We immediately stop the @@ -1459,7 +1460,7 @@ func BenchmarkStoreSeries(b *testing.B) { cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, config.RemoteWriteProtoMsgV1) m.externalLabels = tc.externalLabels m.relabelConfigs = tc.relabelConfigs @@ -1938,7 +1939,7 @@ func BenchmarkBuildV2WriteRequest(b *testing.B) { totalSize := 0 for i := 0; i < b.N; i++ { - populateV2TimeSeries(&symbolTable, batch, seriesBuff, true, true) + populateV2TimeSeries(&symbolTable, batch, seriesBuff, true, true, false) req, _, _, err := buildV2WriteRequest(noopLogger, seriesBuff, symbolTable.Symbols(), &pBuf, nil, cEnc, "snappy") if err != nil { b.Fatal(err) @@ -2344,8 +2345,7 @@ func TestPopulateV2TimeSeries_UnexpectedMetadata(t *testing.T) { } nSamples, nExemplars, nHistograms, nMetadata, nUnexpected := populateV2TimeSeries( - &symbolTable, batch, pendingData, false, false, - ) + &symbolTable, batch, pendingData, false, false, false) require.Equal(t, 2, nSamples, "Should count 2 samples") require.Equal(t, 0, nExemplars, "Should count 0 exemplars") @@ -2353,3 +2353,131 @@ func TestPopulateV2TimeSeries_UnexpectedMetadata(t *testing.T) { require.Equal(t, 0, nMetadata, "Should count 0 processed metadata") require.Equal(t, 2, nUnexpected, "Should count 2 unexpected metadata") } + +func TestPopulateV2TimeSeries_TypeAndUnitLabels(t *testing.T) { + symbolTable := writev2.NewSymbolTable() + + testCases := []struct { + name string + typeLabel string + unitLabel string + expectedType writev2.Metadata_MetricType + description string + }{ + { + name: "counter_with_unit", + typeLabel: "counter", + unitLabel: "operations", + expectedType: writev2.Metadata_METRIC_TYPE_COUNTER, + description: "Counter metric with operations unit", + }, + { + name: "gauge_with_bytes", + typeLabel: "gauge", + unitLabel: "bytes", + expectedType: writev2.Metadata_METRIC_TYPE_GAUGE, + description: "Gauge metric with bytes unit", + }, + { + name: "histogram_with_seconds", + typeLabel: "histogram", + unitLabel: "seconds", + expectedType: writev2.Metadata_METRIC_TYPE_HISTOGRAM, + description: "Histogram metric with seconds unit", + }, + { + name: "summary_with_ratio", + typeLabel: "summary", + unitLabel: "ratio", + expectedType: writev2.Metadata_METRIC_TYPE_SUMMARY, + description: "Summary metric with ratio unit", + }, + { + name: "info_no_unit", + typeLabel: "info", + unitLabel: "", + expectedType: writev2.Metadata_METRIC_TYPE_INFO, + description: "Info metric without unit", + }, + { + name: "stateset_no_unit", + typeLabel: "stateset", + unitLabel: "", + expectedType: writev2.Metadata_METRIC_TYPE_STATESET, + description: "Stateset metric without unit", + }, + { + name: "unknown_type", + typeLabel: "unknown_type", + unitLabel: "meters", + expectedType: writev2.Metadata_METRIC_TYPE_UNSPECIFIED, + description: "Unknown type defaults to unspecified", + }, + { + name: "empty_type_with_unit", + typeLabel: "", + unitLabel: "watts", + expectedType: writev2.Metadata_METRIC_TYPE_UNSPECIFIED, + description: "Empty type with unit", + }, + { + name: "type_no_unit", + typeLabel: "gauge", + unitLabel: "", + expectedType: writev2.Metadata_METRIC_TYPE_GAUGE, + description: "Type without unit", + }, + { + name: "no_type_no_unit", + typeLabel: "", + unitLabel: "", + expectedType: writev2.Metadata_METRIC_TYPE_UNSPECIFIED, + description: "No type and no unit", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + batch := make([]timeSeries, 1) + builder := labels.NewScratchBuilder(2) + metadata := schema.Metadata{ + Name: "test_metric_" + tc.name, + Type: model.MetricType(tc.typeLabel), + Unit: tc.unitLabel, + } + + metadata.AddToLabels(&builder) + + batch[0] = timeSeries{ + seriesLabels: builder.Labels(), + value: 123.45, + timestamp: time.Now().UnixMilli(), + sType: tSample, + } + + pendingData := make([]writev2.TimeSeries, 1) + + symbolTable.Reset() + nSamples, nExemplars, nHistograms, _, _ := populateV2TimeSeries( + &symbolTable, + batch, + pendingData, + false, // sendExemplars + false, // sendNativeHistograms + true, // enableTypeAndUnitLabels + ) + + require.Equal(t, 1, nSamples, "Should have 1 sample") + require.Equal(t, 0, nExemplars, "Should have 0 exemplars") + require.Equal(t, 0, nHistograms, "Should have 0 histograms") + + require.Equal(t, tc.expectedType, pendingData[0].Metadata.Type, + "Type should match expected for %s", tc.description) + + unitRef := pendingData[0].Metadata.UnitRef + + symbols := symbolTable.Symbols() + require.Equal(t, tc.unitLabel, symbols[unitRef], "Unit should match") + }) + } +} diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 5997637c53..da0b7f81d4 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -93,7 +93,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) { for _, tc := range cases { t.Run("", func(t *testing.T) { - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteReadConfigs: tc.cfgs, diff --git a/storage/remote/storage.go b/storage/remote/storage.go index 3326d3bcd9..6067b33037 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -64,7 +64,7 @@ type Storage struct { } // NewStorage returns a remote.Storage. -func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager) *Storage { +func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, enableTypeAndUnitLabels bool) *Storage { if l == nil { l = promslog.NewNopLogger() } @@ -76,7 +76,7 @@ func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeC deduper: deduper, localStartTimeCallback: stCallback, } - s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm) + s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, enableTypeAndUnitLabels) return s } diff --git a/storage/remote/storage_test.go b/storage/remote/storage_test.go index 770a5df594..51e290bf48 100644 --- a/storage/remote/storage_test.go +++ b/storage/remote/storage_test.go @@ -29,7 +29,7 @@ import ( func TestStorageLifecycle(t *testing.T) { dir := t.TempDir() - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteWriteConfigs: []*config.RemoteWriteConfig{ @@ -56,7 +56,7 @@ func TestStorageLifecycle(t *testing.T) { func TestUpdateRemoteReadConfigs(t *testing.T) { dir := t.TempDir() - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) conf := &config.Config{ GlobalConfig: config.GlobalConfig{}, @@ -77,7 +77,7 @@ func TestUpdateRemoteReadConfigs(t *testing.T) { func TestFilterExternalLabels(t *testing.T) { dir := t.TempDir() - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) conf := &config.Config{ GlobalConfig: config.GlobalConfig{ @@ -102,7 +102,7 @@ func TestFilterExternalLabels(t *testing.T) { func TestIgnoreExternalLabels(t *testing.T) { dir := t.TempDir() - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) conf := &config.Config{ GlobalConfig: config.GlobalConfig{ @@ -154,7 +154,7 @@ func baseRemoteReadConfig(host string) *config.RemoteReadConfig { // ApplyConfig runs concurrently with Notify // See https://github.com/prometheus/prometheus/issues/12747 func TestWriteStorageApplyConfigsDuringCommit(t *testing.T) { - s := NewStorage(nil, nil, nil, t.TempDir(), defaultFlushDeadline, nil) + s := NewStorage(nil, nil, nil, t.TempDir(), defaultFlushDeadline, nil, false) var wg sync.WaitGroup wg.Add(2000) diff --git a/storage/remote/write.go b/storage/remote/write.go index 3deacfb664..b4458deac2 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -73,11 +73,12 @@ type WriteStorage struct { quit chan struct{} // For timestampTracker. - highestTimestamp *maxTimestamp + highestTimestamp *maxTimestamp + enableTypeAndUnitLabels bool } // NewWriteStorage creates and runs a WriteStorage. -func NewWriteStorage(logger *slog.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager) *WriteStorage { +func NewWriteStorage(logger *slog.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, enableTypeAndUnitLabels bool) *WriteStorage { if logger == nil { logger = promslog.NewNopLogger() } @@ -101,6 +102,7 @@ func NewWriteStorage(logger *slog.Logger, reg prometheus.Registerer, dir string, Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch. Initialized to 0 when no data has been received yet.", }), }, + enableTypeAndUnitLabels: enableTypeAndUnitLabels, } if reg != nil { reg.MustRegister(rws.highestTimestamp) @@ -211,6 +213,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { rws.scraper, rwConf.SendExemplars, rwConf.SendNativeHistograms, + rws.enableTypeAndUnitLabels, rwConf.ProtobufMessage, ) // Keep track of which queues are new so we know which to start. diff --git a/storage/remote/write_test.go b/storage/remote/write_test.go index eb314f106f..7496a2c78c 100644 --- a/storage/remote/write_test.go +++ b/storage/remote/write_test.go @@ -118,7 +118,7 @@ func TestWriteStorageApplyConfig_NoDuplicateWriteConfigs(t *testing.T) { }, } { t.Run("", func(t *testing.T) { - s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil) + s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteWriteConfigs: tc.cfgs, @@ -144,7 +144,7 @@ func TestWriteStorageApplyConfig_RestartOnNameChange(t *testing.T) { hash, err := toHash(cfg) require.NoError(t, err) - s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil) + s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, @@ -166,7 +166,7 @@ func TestWriteStorageApplyConfig_RestartOnNameChange(t *testing.T) { func TestWriteStorageApplyConfig_UpdateWithRegisterer(t *testing.T) { dir := t.TempDir() - s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil) + s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil, false) c1 := &config.RemoteWriteConfig{ Name: "named", URL: &common_config.URL{ @@ -207,7 +207,7 @@ func TestWriteStorageApplyConfig_UpdateWithRegisterer(t *testing.T) { func TestWriteStorageApplyConfig_Lifecycle(t *testing.T) { dir := t.TempDir() - s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil) + s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteWriteConfigs: []*config.RemoteWriteConfig{ @@ -223,7 +223,7 @@ func TestWriteStorageApplyConfig_Lifecycle(t *testing.T) { func TestWriteStorageApplyConfig_UpdateExternalLabels(t *testing.T) { dir := t.TempDir() - s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil) + s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil, false) externalLabels := labels.FromStrings("external", "true") conf := &config.Config{ @@ -251,7 +251,7 @@ func TestWriteStorageApplyConfig_UpdateExternalLabels(t *testing.T) { func TestWriteStorageApplyConfig_Idempotent(t *testing.T) { dir := t.TempDir() - s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil) + s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false) conf := &config.Config{ GlobalConfig: config.GlobalConfig{}, RemoteWriteConfigs: []*config.RemoteWriteConfig{ @@ -275,7 +275,7 @@ func TestWriteStorageApplyConfig_Idempotent(t *testing.T) { func TestWriteStorageApplyConfig_PartialUpdate(t *testing.T) { dir := t.TempDir() - s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil) + s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false) c0 := &config.RemoteWriteConfig{ RemoteTimeout: model.Duration(10 * time.Second), @@ -1082,7 +1082,7 @@ func TestWriteStorage_CanRegisterMetricsAfterClosing(t *testing.T) { dir := t.TempDir() reg := prometheus.NewPedanticRegistry() - s := NewWriteStorage(nil, reg, dir, time.Millisecond, nil) + s := NewWriteStorage(nil, reg, dir, time.Millisecond, nil, false) require.NoError(t, s.Close()) - require.NotPanics(t, func() { NewWriteStorage(nil, reg, dir, time.Millisecond, nil) }) + require.NotPanics(t, func() { NewWriteStorage(nil, reg, dir, time.Millisecond, nil, false) }) } diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index bd3fba5c11..7dc1f812a0 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -91,7 +91,7 @@ func createTestAgentDB(t testing.TB, reg prometheus.Registerer, opts *Options) * t.Helper() dbDir := t.TempDir() - rs := remote.NewStorage(promslog.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil) + rs := remote.NewStorage(promslog.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil, false) t.Cleanup(func() { require.NoError(t, rs.Close()) }) @@ -737,7 +737,7 @@ func TestLockfile(t *testing.T) { tsdbutil.TestDirLockerUsage(t, func(t *testing.T, data string, createLock bool) (*tsdbutil.DirLocker, testutil.Closer) { logger := promslog.NewNopLogger() reg := prometheus.NewRegistry() - rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil) + rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil, false) t.Cleanup(func() { require.NoError(t, rs.Close()) }) @@ -757,7 +757,7 @@ func TestLockfile(t *testing.T) { func Test_ExistingWAL_NextRef(t *testing.T) { dbDir := t.TempDir() - rs := remote.NewStorage(promslog.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil) + rs := remote.NewStorage(promslog.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil, false) defer func() { require.NoError(t, rs.Close()) }() diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 3c23d9f5c8..5c7bfbda05 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -496,7 +496,7 @@ func TestEndpoints(t *testing.T) { remote := remote.NewStorage(promslog.New(&promslogConfig), prometheus.DefaultRegisterer, func() (int64, error) { return 0, nil - }, dbDir, 1*time.Second, nil) + }, dbDir, 1*time.Second, nil, false) err = remote.ApplyConfig(&config.Config{ RemoteReadConfigs: []*config.RemoteReadConfig{