mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-18 18:25:24 -05:00
Reapply "prw: use Unit and Type labels for metadata when feature flag is enabled (#17033)"
This reverts commit f5fab47577.
This commit is contained in:
parent
f5fab47577
commit
794bf774c2
10 changed files with 195 additions and 56 deletions
|
|
@ -782,7 +782,7 @@ func main() {
|
|||
var (
|
||||
localStorage = &readyStorage{stats: tsdb.NewDBStats()}
|
||||
scraper = &readyScrapeManager{}
|
||||
remoteStorage = remote.NewStorage(logger.With("component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper)
|
||||
remoteStorage = remote.NewStorage(logger.With("component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.scrape.EnableTypeAndUnitLabels)
|
||||
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@ import (
|
|||
"github.com/prometheus/prometheus/model/timestamp"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
|
||||
"github.com/prometheus/prometheus/schema"
|
||||
"github.com/prometheus/prometheus/scrape"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
"github.com/prometheus/prometheus/tsdb/record"
|
||||
|
|
@ -407,16 +408,17 @@ type QueueManager struct {
|
|||
reshardDisableStartTimestamp atomic.Int64 // Time that reshard was disabled.
|
||||
reshardDisableEndTimestamp atomic.Int64 // Time that reshard is disabled until.
|
||||
|
||||
logger *slog.Logger
|
||||
flushDeadline time.Duration
|
||||
cfg config.QueueConfig
|
||||
mcfg config.MetadataConfig
|
||||
externalLabels []labels.Label
|
||||
relabelConfigs []*relabel.Config
|
||||
sendExemplars bool
|
||||
sendNativeHistograms bool
|
||||
watcher *wlog.Watcher
|
||||
metadataWatcher *MetadataWatcher
|
||||
logger *slog.Logger
|
||||
flushDeadline time.Duration
|
||||
cfg config.QueueConfig
|
||||
mcfg config.MetadataConfig
|
||||
externalLabels []labels.Label
|
||||
relabelConfigs []*relabel.Config
|
||||
sendExemplars bool
|
||||
sendNativeHistograms bool
|
||||
enableTypeAndUnitLabels bool
|
||||
watcher *wlog.Watcher
|
||||
metadataWatcher *MetadataWatcher
|
||||
|
||||
clientMtx sync.RWMutex
|
||||
storeClient WriteClient
|
||||
|
|
@ -468,6 +470,7 @@ func NewQueueManager(
|
|||
sm ReadyScrapeManager,
|
||||
enableExemplarRemoteWrite bool,
|
||||
enableNativeHistogramRemoteWrite bool,
|
||||
enableTypeAndUnitLabels bool,
|
||||
protoMsg config.RemoteWriteProtoMsg,
|
||||
) *QueueManager {
|
||||
if logger == nil {
|
||||
|
|
@ -482,15 +485,16 @@ func NewQueueManager(
|
|||
|
||||
logger = logger.With(remoteName, client.Name(), endpoint, client.Endpoint())
|
||||
t := &QueueManager{
|
||||
logger: logger,
|
||||
flushDeadline: flushDeadline,
|
||||
cfg: cfg,
|
||||
mcfg: mCfg,
|
||||
externalLabels: extLabelsSlice,
|
||||
relabelConfigs: relabelConfigs,
|
||||
storeClient: client,
|
||||
sendExemplars: enableExemplarRemoteWrite,
|
||||
sendNativeHistograms: enableNativeHistogramRemoteWrite,
|
||||
logger: logger,
|
||||
flushDeadline: flushDeadline,
|
||||
cfg: cfg,
|
||||
mcfg: mCfg,
|
||||
externalLabels: extLabelsSlice,
|
||||
relabelConfigs: relabelConfigs,
|
||||
storeClient: client,
|
||||
sendExemplars: enableExemplarRemoteWrite,
|
||||
sendNativeHistograms: enableNativeHistogramRemoteWrite,
|
||||
enableTypeAndUnitLabels: enableTypeAndUnitLabels,
|
||||
|
||||
seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels),
|
||||
seriesMetadata: make(map[chunks.HeadSeriesRef]*metadata.Metadata),
|
||||
|
|
@ -1540,7 +1544,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
|
|||
}
|
||||
_ = s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, encBuf, compr)
|
||||
case config.RemoteWriteProtoMsgV2:
|
||||
nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, nUnexpectedMetadata := populateV2TimeSeries(&symbolTable, batch, pendingDataV2, s.qm.sendExemplars, s.qm.sendNativeHistograms)
|
||||
nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, nUnexpectedMetadata := populateV2TimeSeries(&symbolTable, batch, pendingDataV2, s.qm.sendExemplars, s.qm.sendNativeHistograms, s.qm.enableTypeAndUnitLabels)
|
||||
n := nPendingSamples + nPendingExemplars + nPendingHistograms
|
||||
if nUnexpectedMetadata > 0 {
|
||||
s.qm.logger.Warn("unexpected metadata sType in populateV2TimeSeries", "count", nUnexpectedMetadata)
|
||||
|
|
@ -1911,7 +1915,7 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2
|
|||
return accumulatedStats, err
|
||||
}
|
||||
|
||||
func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries, pendingData []writev2.TimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int, int, int) {
|
||||
func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries, pendingData []writev2.TimeSeries, sendExemplars, sendNativeHistograms, enableTypeAndUnitLabels bool) (int, int, int, int, int) {
|
||||
var nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, nUnexpectedMetadata int
|
||||
for nPending, d := range batch {
|
||||
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
|
||||
|
|
@ -1921,11 +1925,15 @@ func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries,
|
|||
pendingData[nPending].Metadata.UnitRef = symbolTable.Symbolize(d.metadata.Unit)
|
||||
nPendingMetadata++
|
||||
} else {
|
||||
var m schema.Metadata
|
||||
if enableTypeAndUnitLabels {
|
||||
m = schema.NewMetadataFromLabels(d.seriesLabels)
|
||||
}
|
||||
// Safeguard against sending garbage in case of not having metadata
|
||||
// for whatever reason.
|
||||
pendingData[nPending].Metadata.Type = writev2.Metadata_METRIC_TYPE_UNSPECIFIED
|
||||
pendingData[nPending].Metadata.Type = writev2.FromMetadataType(m.Type)
|
||||
pendingData[nPending].Metadata.UnitRef = symbolTable.Symbolize(m.Unit)
|
||||
pendingData[nPending].Metadata.HelpRef = 0
|
||||
pendingData[nPending].Metadata.UnitRef = 0
|
||||
}
|
||||
|
||||
if sendExemplars {
|
||||
|
|
|
|||
|
|
@ -42,6 +42,7 @@ import (
|
|||
"github.com/prometheus/prometheus/model/timestamp"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
|
||||
"github.com/prometheus/prometheus/schema"
|
||||
"github.com/prometheus/prometheus/scrape"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
"github.com/prometheus/prometheus/tsdb/record"
|
||||
|
|
@ -132,7 +133,7 @@ func TestBasicContentNegotiation(t *testing.T) {
|
|||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
|
||||
defer s.Close()
|
||||
|
||||
var (
|
||||
|
|
@ -241,7 +242,7 @@ func TestSampleDelivery(t *testing.T) {
|
|||
} {
|
||||
t.Run(fmt.Sprintf("%s-%s", tc.protoMsg, tc.name), func(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
|
||||
defer s.Close()
|
||||
|
||||
var (
|
||||
|
|
@ -322,7 +323,7 @@ func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration, pro
|
|||
func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient, protoMsg config.RemoteWriteProtoMsg) *QueueManager {
|
||||
dir := t.TempDir()
|
||||
metrics := newQueueManagerMetrics(nil, "", "")
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg)
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, protoMsg)
|
||||
|
||||
return m
|
||||
}
|
||||
|
|
@ -363,7 +364,7 @@ func TestMetadataDelivery(t *testing.T) {
|
|||
|
||||
func TestWALMetadataDelivery(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
|
||||
defer s.Close()
|
||||
|
||||
cfg := config.DefaultQueueConfig
|
||||
|
|
@ -782,7 +783,7 @@ func TestDisableReshardOnRetry(t *testing.T) {
|
|||
}
|
||||
)
|
||||
|
||||
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
|
||||
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, false, config.RemoteWriteProtoMsgV1)
|
||||
m.StoreSeries(fakeSeries, 0)
|
||||
|
||||
// Attempt to samples while the manager is running. We immediately stop the
|
||||
|
|
@ -1459,7 +1460,7 @@ func BenchmarkStoreSeries(b *testing.B) {
|
|||
cfg := config.DefaultQueueConfig
|
||||
mcfg := config.DefaultMetadataConfig
|
||||
metrics := newQueueManagerMetrics(nil, "", "")
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, config.RemoteWriteProtoMsgV1)
|
||||
m.externalLabels = tc.externalLabels
|
||||
m.relabelConfigs = tc.relabelConfigs
|
||||
|
||||
|
|
@ -1938,7 +1939,7 @@ func BenchmarkBuildV2WriteRequest(b *testing.B) {
|
|||
|
||||
totalSize := 0
|
||||
for i := 0; i < b.N; i++ {
|
||||
populateV2TimeSeries(&symbolTable, batch, seriesBuff, true, true)
|
||||
populateV2TimeSeries(&symbolTable, batch, seriesBuff, true, true, false)
|
||||
req, _, _, err := buildV2WriteRequest(noopLogger, seriesBuff, symbolTable.Symbols(), &pBuf, nil, cEnc, "snappy")
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
|
|
@ -2344,8 +2345,7 @@ func TestPopulateV2TimeSeries_UnexpectedMetadata(t *testing.T) {
|
|||
}
|
||||
|
||||
nSamples, nExemplars, nHistograms, nMetadata, nUnexpected := populateV2TimeSeries(
|
||||
&symbolTable, batch, pendingData, false, false,
|
||||
)
|
||||
&symbolTable, batch, pendingData, false, false, false)
|
||||
|
||||
require.Equal(t, 2, nSamples, "Should count 2 samples")
|
||||
require.Equal(t, 0, nExemplars, "Should count 0 exemplars")
|
||||
|
|
@ -2353,3 +2353,131 @@ func TestPopulateV2TimeSeries_UnexpectedMetadata(t *testing.T) {
|
|||
require.Equal(t, 0, nMetadata, "Should count 0 processed metadata")
|
||||
require.Equal(t, 2, nUnexpected, "Should count 2 unexpected metadata")
|
||||
}
|
||||
|
||||
func TestPopulateV2TimeSeries_TypeAndUnitLabels(t *testing.T) {
|
||||
symbolTable := writev2.NewSymbolTable()
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
typeLabel string
|
||||
unitLabel string
|
||||
expectedType writev2.Metadata_MetricType
|
||||
description string
|
||||
}{
|
||||
{
|
||||
name: "counter_with_unit",
|
||||
typeLabel: "counter",
|
||||
unitLabel: "operations",
|
||||
expectedType: writev2.Metadata_METRIC_TYPE_COUNTER,
|
||||
description: "Counter metric with operations unit",
|
||||
},
|
||||
{
|
||||
name: "gauge_with_bytes",
|
||||
typeLabel: "gauge",
|
||||
unitLabel: "bytes",
|
||||
expectedType: writev2.Metadata_METRIC_TYPE_GAUGE,
|
||||
description: "Gauge metric with bytes unit",
|
||||
},
|
||||
{
|
||||
name: "histogram_with_seconds",
|
||||
typeLabel: "histogram",
|
||||
unitLabel: "seconds",
|
||||
expectedType: writev2.Metadata_METRIC_TYPE_HISTOGRAM,
|
||||
description: "Histogram metric with seconds unit",
|
||||
},
|
||||
{
|
||||
name: "summary_with_ratio",
|
||||
typeLabel: "summary",
|
||||
unitLabel: "ratio",
|
||||
expectedType: writev2.Metadata_METRIC_TYPE_SUMMARY,
|
||||
description: "Summary metric with ratio unit",
|
||||
},
|
||||
{
|
||||
name: "info_no_unit",
|
||||
typeLabel: "info",
|
||||
unitLabel: "",
|
||||
expectedType: writev2.Metadata_METRIC_TYPE_INFO,
|
||||
description: "Info metric without unit",
|
||||
},
|
||||
{
|
||||
name: "stateset_no_unit",
|
||||
typeLabel: "stateset",
|
||||
unitLabel: "",
|
||||
expectedType: writev2.Metadata_METRIC_TYPE_STATESET,
|
||||
description: "Stateset metric without unit",
|
||||
},
|
||||
{
|
||||
name: "unknown_type",
|
||||
typeLabel: "unknown_type",
|
||||
unitLabel: "meters",
|
||||
expectedType: writev2.Metadata_METRIC_TYPE_UNSPECIFIED,
|
||||
description: "Unknown type defaults to unspecified",
|
||||
},
|
||||
{
|
||||
name: "empty_type_with_unit",
|
||||
typeLabel: "",
|
||||
unitLabel: "watts",
|
||||
expectedType: writev2.Metadata_METRIC_TYPE_UNSPECIFIED,
|
||||
description: "Empty type with unit",
|
||||
},
|
||||
{
|
||||
name: "type_no_unit",
|
||||
typeLabel: "gauge",
|
||||
unitLabel: "",
|
||||
expectedType: writev2.Metadata_METRIC_TYPE_GAUGE,
|
||||
description: "Type without unit",
|
||||
},
|
||||
{
|
||||
name: "no_type_no_unit",
|
||||
typeLabel: "",
|
||||
unitLabel: "",
|
||||
expectedType: writev2.Metadata_METRIC_TYPE_UNSPECIFIED,
|
||||
description: "No type and no unit",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
batch := make([]timeSeries, 1)
|
||||
builder := labels.NewScratchBuilder(2)
|
||||
metadata := schema.Metadata{
|
||||
Name: "test_metric_" + tc.name,
|
||||
Type: model.MetricType(tc.typeLabel),
|
||||
Unit: tc.unitLabel,
|
||||
}
|
||||
|
||||
metadata.AddToLabels(&builder)
|
||||
|
||||
batch[0] = timeSeries{
|
||||
seriesLabels: builder.Labels(),
|
||||
value: 123.45,
|
||||
timestamp: time.Now().UnixMilli(),
|
||||
sType: tSample,
|
||||
}
|
||||
|
||||
pendingData := make([]writev2.TimeSeries, 1)
|
||||
|
||||
symbolTable.Reset()
|
||||
nSamples, nExemplars, nHistograms, _, _ := populateV2TimeSeries(
|
||||
&symbolTable,
|
||||
batch,
|
||||
pendingData,
|
||||
false, // sendExemplars
|
||||
false, // sendNativeHistograms
|
||||
true, // enableTypeAndUnitLabels
|
||||
)
|
||||
|
||||
require.Equal(t, 1, nSamples, "Should have 1 sample")
|
||||
require.Equal(t, 0, nExemplars, "Should have 0 exemplars")
|
||||
require.Equal(t, 0, nHistograms, "Should have 0 histograms")
|
||||
|
||||
require.Equal(t, tc.expectedType, pendingData[0].Metadata.Type,
|
||||
"Type should match expected for %s", tc.description)
|
||||
|
||||
unitRef := pendingData[0].Metadata.UnitRef
|
||||
|
||||
symbols := symbolTable.Symbols()
|
||||
require.Equal(t, tc.unitLabel, symbols[unitRef], "Unit should match")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -93,7 +93,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) {
|
|||
|
||||
for _, tc := range cases {
|
||||
t.Run("", func(t *testing.T) {
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.DefaultGlobalConfig,
|
||||
RemoteReadConfigs: tc.cfgs,
|
||||
|
|
|
|||
|
|
@ -64,7 +64,7 @@ type Storage struct {
|
|||
}
|
||||
|
||||
// NewStorage returns a remote.Storage.
|
||||
func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager) *Storage {
|
||||
func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, enableTypeAndUnitLabels bool) *Storage {
|
||||
if l == nil {
|
||||
l = promslog.NewNopLogger()
|
||||
}
|
||||
|
|
@ -76,7 +76,7 @@ func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeC
|
|||
deduper: deduper,
|
||||
localStartTimeCallback: stCallback,
|
||||
}
|
||||
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm)
|
||||
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, enableTypeAndUnitLabels)
|
||||
return s
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ import (
|
|||
func TestStorageLifecycle(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.DefaultGlobalConfig,
|
||||
RemoteWriteConfigs: []*config.RemoteWriteConfig{
|
||||
|
|
@ -56,7 +56,7 @@ func TestStorageLifecycle(t *testing.T) {
|
|||
func TestUpdateRemoteReadConfigs(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
|
||||
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.GlobalConfig{},
|
||||
|
|
@ -77,7 +77,7 @@ func TestUpdateRemoteReadConfigs(t *testing.T) {
|
|||
func TestFilterExternalLabels(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
|
||||
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.GlobalConfig{
|
||||
|
|
@ -102,7 +102,7 @@ func TestFilterExternalLabels(t *testing.T) {
|
|||
func TestIgnoreExternalLabels(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
|
||||
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.GlobalConfig{
|
||||
|
|
@ -154,7 +154,7 @@ func baseRemoteReadConfig(host string) *config.RemoteReadConfig {
|
|||
// ApplyConfig runs concurrently with Notify
|
||||
// See https://github.com/prometheus/prometheus/issues/12747
|
||||
func TestWriteStorageApplyConfigsDuringCommit(t *testing.T) {
|
||||
s := NewStorage(nil, nil, nil, t.TempDir(), defaultFlushDeadline, nil)
|
||||
s := NewStorage(nil, nil, nil, t.TempDir(), defaultFlushDeadline, nil, false)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2000)
|
||||
|
|
|
|||
|
|
@ -73,11 +73,12 @@ type WriteStorage struct {
|
|||
quit chan struct{}
|
||||
|
||||
// For timestampTracker.
|
||||
highestTimestamp *maxTimestamp
|
||||
highestTimestamp *maxTimestamp
|
||||
enableTypeAndUnitLabels bool
|
||||
}
|
||||
|
||||
// NewWriteStorage creates and runs a WriteStorage.
|
||||
func NewWriteStorage(logger *slog.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager) *WriteStorage {
|
||||
func NewWriteStorage(logger *slog.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, enableTypeAndUnitLabels bool) *WriteStorage {
|
||||
if logger == nil {
|
||||
logger = promslog.NewNopLogger()
|
||||
}
|
||||
|
|
@ -101,6 +102,7 @@ func NewWriteStorage(logger *slog.Logger, reg prometheus.Registerer, dir string,
|
|||
Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch. Initialized to 0 when no data has been received yet.",
|
||||
}),
|
||||
},
|
||||
enableTypeAndUnitLabels: enableTypeAndUnitLabels,
|
||||
}
|
||||
if reg != nil {
|
||||
reg.MustRegister(rws.highestTimestamp)
|
||||
|
|
@ -211,6 +213,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
|
|||
rws.scraper,
|
||||
rwConf.SendExemplars,
|
||||
rwConf.SendNativeHistograms,
|
||||
rws.enableTypeAndUnitLabels,
|
||||
rwConf.ProtobufMessage,
|
||||
)
|
||||
// Keep track of which queues are new so we know which to start.
|
||||
|
|
|
|||
|
|
@ -118,7 +118,7 @@ func TestWriteStorageApplyConfig_NoDuplicateWriteConfigs(t *testing.T) {
|
|||
},
|
||||
} {
|
||||
t.Run("", func(t *testing.T) {
|
||||
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil)
|
||||
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false)
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.DefaultGlobalConfig,
|
||||
RemoteWriteConfigs: tc.cfgs,
|
||||
|
|
@ -144,7 +144,7 @@ func TestWriteStorageApplyConfig_RestartOnNameChange(t *testing.T) {
|
|||
hash, err := toHash(cfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil)
|
||||
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false)
|
||||
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.DefaultGlobalConfig,
|
||||
|
|
@ -166,7 +166,7 @@ func TestWriteStorageApplyConfig_RestartOnNameChange(t *testing.T) {
|
|||
func TestWriteStorageApplyConfig_UpdateWithRegisterer(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil)
|
||||
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil, false)
|
||||
c1 := &config.RemoteWriteConfig{
|
||||
Name: "named",
|
||||
URL: &common_config.URL{
|
||||
|
|
@ -207,7 +207,7 @@ func TestWriteStorageApplyConfig_UpdateWithRegisterer(t *testing.T) {
|
|||
func TestWriteStorageApplyConfig_Lifecycle(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil)
|
||||
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.DefaultGlobalConfig,
|
||||
RemoteWriteConfigs: []*config.RemoteWriteConfig{
|
||||
|
|
@ -223,7 +223,7 @@ func TestWriteStorageApplyConfig_Lifecycle(t *testing.T) {
|
|||
func TestWriteStorageApplyConfig_UpdateExternalLabels(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil)
|
||||
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil, false)
|
||||
|
||||
externalLabels := labels.FromStrings("external", "true")
|
||||
conf := &config.Config{
|
||||
|
|
@ -251,7 +251,7 @@ func TestWriteStorageApplyConfig_UpdateExternalLabels(t *testing.T) {
|
|||
func TestWriteStorageApplyConfig_Idempotent(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil)
|
||||
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.GlobalConfig{},
|
||||
RemoteWriteConfigs: []*config.RemoteWriteConfig{
|
||||
|
|
@ -275,7 +275,7 @@ func TestWriteStorageApplyConfig_Idempotent(t *testing.T) {
|
|||
func TestWriteStorageApplyConfig_PartialUpdate(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil)
|
||||
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
|
||||
|
||||
c0 := &config.RemoteWriteConfig{
|
||||
RemoteTimeout: model.Duration(10 * time.Second),
|
||||
|
|
@ -1082,7 +1082,7 @@ func TestWriteStorage_CanRegisterMetricsAfterClosing(t *testing.T) {
|
|||
dir := t.TempDir()
|
||||
reg := prometheus.NewPedanticRegistry()
|
||||
|
||||
s := NewWriteStorage(nil, reg, dir, time.Millisecond, nil)
|
||||
s := NewWriteStorage(nil, reg, dir, time.Millisecond, nil, false)
|
||||
require.NoError(t, s.Close())
|
||||
require.NotPanics(t, func() { NewWriteStorage(nil, reg, dir, time.Millisecond, nil) })
|
||||
require.NotPanics(t, func() { NewWriteStorage(nil, reg, dir, time.Millisecond, nil, false) })
|
||||
}
|
||||
|
|
|
|||
|
|
@ -91,7 +91,7 @@ func createTestAgentDB(t testing.TB, reg prometheus.Registerer, opts *Options) *
|
|||
t.Helper()
|
||||
|
||||
dbDir := t.TempDir()
|
||||
rs := remote.NewStorage(promslog.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil)
|
||||
rs := remote.NewStorage(promslog.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil, false)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, rs.Close())
|
||||
})
|
||||
|
|
@ -737,7 +737,7 @@ func TestLockfile(t *testing.T) {
|
|||
tsdbutil.TestDirLockerUsage(t, func(t *testing.T, data string, createLock bool) (*tsdbutil.DirLocker, testutil.Closer) {
|
||||
logger := promslog.NewNopLogger()
|
||||
reg := prometheus.NewRegistry()
|
||||
rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil)
|
||||
rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil, false)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, rs.Close())
|
||||
})
|
||||
|
|
@ -757,7 +757,7 @@ func TestLockfile(t *testing.T) {
|
|||
|
||||
func Test_ExistingWAL_NextRef(t *testing.T) {
|
||||
dbDir := t.TempDir()
|
||||
rs := remote.NewStorage(promslog.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil)
|
||||
rs := remote.NewStorage(promslog.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil, false)
|
||||
defer func() {
|
||||
require.NoError(t, rs.Close())
|
||||
}()
|
||||
|
|
|
|||
|
|
@ -496,7 +496,7 @@ func TestEndpoints(t *testing.T) {
|
|||
|
||||
remote := remote.NewStorage(promslog.New(&promslogConfig), prometheus.DefaultRegisterer, func() (int64, error) {
|
||||
return 0, nil
|
||||
}, dbDir, 1*time.Second, nil)
|
||||
}, dbDir, 1*time.Second, nil, false)
|
||||
|
||||
err = remote.ApplyConfig(&config.Config{
|
||||
RemoteReadConfigs: []*config.RemoteReadConfig{
|
||||
|
|
|
|||
Loading…
Reference in a new issue