From bec70227f12c3f8614cf0cf7badf6a3ee7e4ea04 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Fri, 23 Jan 2026 09:04:05 +0000 Subject: [PATCH] feat(scrape)[PART5b]: Add AppenderV2 support to scrape.NewManager constructor (#17872) * feat(scrape)[PART5b]: Add AppenderV2 support to scrape.NewManager optionally to V1 Signed-off-by: bwplotka * Update scrape/manager.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Bartlomiej Plotka * fixes after rebase Signed-off-by: bwplotka * Apply suggestions from code review Co-authored-by: Arve Knudsen Signed-off-by: Bartlomiej Plotka --------- Signed-off-by: bwplotka Signed-off-by: Bartlomiej Plotka Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Arve Knudsen --- cmd/prometheus/main.go | 2 +- scrape/manager.go | 23 +++- scrape/manager_test.go | 40 +++---- scrape/scrape_test.go | 8 +- tsdb/head_append_v2_test.go | 205 +++++++++++++++++++++++++++++++++++- 5 files changed, 250 insertions(+), 28 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 210d3ddc4e..06d5540380 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -875,7 +875,7 @@ func main() { &cfg.scrape, logger.With("component", "scrape manager"), logging.NewJSONFileLogger, - fanoutStorage, + fanoutStorage, nil, // TODO(bwplotka): Switch to AppendableV2. prometheus.DefaultRegisterer, ) if err != nil { diff --git a/scrape/manager.go b/scrape/manager.go index ef226ad507..aafd8c1931 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -39,14 +39,32 @@ import ( "github.com/prometheus/prometheus/util/pool" ) -// NewManager is the Manager constructor using Appendable. -func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error), appendable storage.Appendable, registerer prometheus.Registerer) (*Manager, error) { +// NewManager is the Manager constructor using storage.Appendable or storage.AppendableV2. +// +// If unsure which one to use/implement, implement AppendableV2 as it significantly simplifies implementation and allows more +// (passing ST, always-on metadata, exemplars per sample). +// +// NewManager returns error if both appendable and appendableV2 are specified. +// +// Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632). +// storage.Appendable will be removed soon (ETA: Q2 2026). +func NewManager( + o *Options, + logger *slog.Logger, + newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error), + appendable storage.Appendable, + appendableV2 storage.AppendableV2, + registerer prometheus.Registerer, +) (*Manager, error) { if o == nil { o = &Options{} } if logger == nil { logger = promslog.NewNopLogger() } + if appendable != nil && appendableV2 != nil { + return nil, errors.New("scrape.NewManager: appendable and appendableV2 cannot be provided at the same time") + } sm, err := newScrapeMetrics(registerer) if err != nil { @@ -55,6 +73,7 @@ func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(str m := &Manager{ appendable: appendable, + appendableV2: appendableV2, opts: o, logger: logger, newScrapeFailureLogger: newScrapeFailureLogger, diff --git a/scrape/manager_test.go b/scrape/manager_test.go index 8b289cb7e2..17152e8eb1 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -522,7 +522,7 @@ scrape_configs: ) opts := Options{} - scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry) + scrapeManager, err := NewManager(&opts, nil, nil, nil, nil, testRegistry) require.NoError(t, err) newLoop := func(scrapeLoopOptions) loop { ch <- struct{}{} @@ -578,7 +578,7 @@ scrape_configs: func TestManagerTargetsUpdates(t *testing.T) { opts := Options{} testRegistry := prometheus.NewRegistry() - m, err := NewManager(&opts, nil, nil, nil, testRegistry) + m, err := NewManager(&opts, nil, nil, nil, nil, testRegistry) require.NoError(t, err) ts := make(chan map[string][]*targetgroup.Group) @@ -631,7 +631,7 @@ global: opts := Options{} testRegistry := prometheus.NewRegistry() - scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry) + scrapeManager, err := NewManager(&opts, nil, nil, nil, nil, testRegistry) require.NoError(t, err) // Load the first config. @@ -701,7 +701,7 @@ scrape_configs: } opts := Options{} - scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry) + scrapeManager, err := NewManager(&opts, nil, nil, nil, nil, testRegistry) require.NoError(t, err) reload(scrapeManager, cfg1) @@ -735,6 +735,8 @@ func setupTestServer(t *testing.T, typ string, toWrite []byte) *httptest.Server } // TestManagerSTZeroIngestion tests scrape manager for various ST cases. +// NOTE(bwplotka): There is no AppenderV2 test for this STZeroIngestion feature as in V2 flow it's +// moved to AppenderV2 implementation (e.g. storage) and it's tested there, e.g. tsdb.TestHeadAppenderV2_Append_EnableSTAsZeroSample. func TestManagerSTZeroIngestion(t *testing.T) { t.Parallel() const ( @@ -766,7 +768,7 @@ func TestManagerSTZeroIngestion(t *testing.T) { discoveryManager, scrapeManager := runManagers(t, ctx, &Options{ EnableStartTimestampZeroIngestion: testSTZeroIngest, skipOffsetting: true, - }, app) + }, app, nil) defer scrapeManager.Stop() server := setupTestServer(t, config.ScrapeProtocolsHeaders[testFormat], encoded) @@ -905,6 +907,8 @@ func generateTestHistogram(i int) *dto.Histogram { return h } +// NOTE(bwplotka): There is no AppenderV2 test for this STZeroIngestion feature as in V2 flow it's +// moved to AppenderV2 implementation (e.g. storage) and it's tested there, e.g. tsdb.TestHeadAppenderV2_Append_EnableSTAsZeroSample. func TestManagerSTZeroIngestionHistogram(t *testing.T) { t.Parallel() const mName = "expected_histogram" @@ -950,7 +954,7 @@ func TestManagerSTZeroIngestionHistogram(t *testing.T) { discoveryManager, scrapeManager := runManagers(t, ctx, &Options{ EnableStartTimestampZeroIngestion: tc.enableSTZeroIngestion, skipOffsetting: true, - }, app) + }, app, nil) defer scrapeManager.Stop() once := sync.Once{} @@ -1030,7 +1034,7 @@ func TestUnregisterMetrics(t *testing.T) { // Check that all metrics can be unregistered, allowing a second manager to be created. for range 2 { opts := Options{} - manager, err := NewManager(&opts, nil, nil, nil, reg) + manager, err := NewManager(&opts, nil, nil, nil, nil, reg) require.NotNil(t, manager) require.NoError(t, err) // Unregister all metrics. @@ -1043,6 +1047,9 @@ func TestUnregisterMetrics(t *testing.T) { // This test addresses issue #17216 by ensuring the previously blocking check has been removed. // The test verifies that the presence of exemplars in the input does not cause errors, // although exemplars are not preserved during NHCB conversion (as documented below). +// +// NOTE(bwplotka): There is no AppenderV2 test for this STZeroIngestion feature as in V2 flow it's +// moved to AppenderV2 implementation (e.g. storage) and it's tested there, e.g. tsdb.TestHeadAppenderV2_Append_EnableSTAsZeroSample. func TestNHCBAndSTZeroIngestion(t *testing.T) { t.Parallel() @@ -1059,7 +1066,7 @@ func TestNHCBAndSTZeroIngestion(t *testing.T) { discoveryManager, scrapeManager := runManagers(t, ctx, &Options{ EnableStartTimestampZeroIngestion: true, skipOffsetting: true, - }, app) + }, app, nil) defer scrapeManager.Stop() once := sync.Once{} @@ -1153,16 +1160,13 @@ func applyConfig( require.NoError(t, discoveryManager.ApplyConfig(c)) } -func runManagers(t *testing.T, ctx context.Context, opts *Options, app storage.Appendable) (*discovery.Manager, *Manager) { +func runManagers(t *testing.T, ctx context.Context, opts *Options, app storage.Appendable, appV2 storage.AppendableV2) (*discovery.Manager, *Manager) { t.Helper() if opts == nil { opts = &Options{} } opts.DiscoveryReloadInterval = model.Duration(100 * time.Millisecond) - if app == nil { - app = teststorage.NewAppendable() - } reg := prometheus.NewRegistry() sdMetrics, err := discovery.RegisterSDMetrics(reg, discovery.NewRefreshMetrics(reg)) @@ -1178,7 +1182,7 @@ func runManagers(t *testing.T, ctx context.Context, opts *Options, app storage.A opts, nil, nil, - app, + app, appV2, prometheus.NewRegistry(), ) require.NoError(t, err) @@ -1251,7 +1255,7 @@ scrape_configs: - files: ['%s'] ` - discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil) + discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil, nil) defer scrapeManager.Stop() applyConfig( @@ -1350,7 +1354,7 @@ scrape_configs: file_sd_configs: - files: ['%s', '%s'] ` - discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil) + discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil, nil) defer scrapeManager.Stop() applyConfig( @@ -1409,7 +1413,7 @@ scrape_configs: file_sd_configs: - files: ['%s'] ` - discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil) + discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil, nil) defer scrapeManager.Stop() applyConfig( @@ -1475,7 +1479,7 @@ scrape_configs: - targets: ['%s'] ` - discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil) + discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil, nil) defer scrapeManager.Stop() // Apply the initial config with an existing file @@ -1559,7 +1563,7 @@ scrape_configs: cfg := loadConfiguration(t, cfgText) - m, err := NewManager(&Options{}, nil, nil, teststorage.NewAppendable(), prometheus.NewRegistry()) + m, err := NewManager(&Options{}, nil, nil, nil, nil, prometheus.NewRegistry()) require.NoError(t, err) defer m.Stop() require.NoError(t, m.ApplyConfig(cfg)) diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 74fdf8a962..f9a0834bd1 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -5751,14 +5751,10 @@ scrape_configs: s := teststorage.New(t) reg := prometheus.NewRegistry() - mng, err := NewManager(&Options{DiscoveryReloadInterval: model.Duration(10 * time.Millisecond)}, nil, nil, s, reg) + sa := selectAppendable(s, appV2) + mng, err := NewManager(&Options{DiscoveryReloadInterval: model.Duration(10 * time.Millisecond)}, nil, nil, sa.V1(), sa.V2(), reg) require.NoError(t, err) - if appV2 { - mng.appendableV2 = s - mng.appendable = nil - } - cfg, err := config.Load(configStr, promslog.NewNopLogger()) require.NoError(t, err) require.NoError(t, mng.ApplyConfig(cfg)) diff --git a/tsdb/head_append_v2_test.go b/tsdb/head_append_v2_test.go index 91f6ba81cc..20401c16fe 100644 --- a/tsdb/head_append_v2_test.go +++ b/tsdb/head_append_v2_test.go @@ -4111,10 +4111,18 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) { // Make sure counter resets hints are non-zero, so we can detect ST histogram samples. testHistogram := tsdbutil.GenerateTestHistogram(1) testHistogram.CounterResetHint = histogram.NotCounterReset + testFloatHistogram := tsdbutil.GenerateTestFloatHistogram(1) testFloatHistogram.CounterResetHint = histogram.NotCounterReset + + testNHCB := tsdbutil.GenerateTestCustomBucketsHistogram(1) + testNHCB.CounterResetHint = histogram.NotCounterReset + + testFloatNHCB := tsdbutil.GenerateTestCustomBucketsFloatHistogram(1) + testFloatNHCB.CounterResetHint = histogram.NotCounterReset + // TODO(beorn7): Once issue #15346 is fixed, the CounterResetHint of the - // following two zero histograms should be histogram.CounterReset. + // following zero histograms should be histogram.CounterReset. testZeroHistogram := &histogram.Histogram{ Schema: testHistogram.Schema, ZeroThreshold: testHistogram.ZeroThreshold, @@ -4131,6 +4139,19 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) { PositiveBuckets: []float64{0, 0, 0, 0}, NegativeBuckets: []float64{0, 0, 0, 0}, } + testZeroNHCB := &histogram.Histogram{ + Schema: testNHCB.Schema, + PositiveSpans: testNHCB.PositiveSpans, + PositiveBuckets: []int64{0, 0, 0, 0}, + CustomValues: testNHCB.CustomValues, + } + testZeroFloatNHCB := &histogram.FloatHistogram{ + Schema: testFloatNHCB.Schema, + PositiveSpans: testFloatNHCB.PositiveSpans, + PositiveBuckets: []float64{0, 0, 0, 0}, + CustomValues: testFloatNHCB.CustomValues, + } + type appendableSamples struct { ts int64 fSample float64 @@ -4183,6 +4204,34 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) { } }(), }, + { + name: "In order ct+normal sample/NHCB", + appendableSamples: []appendableSamples{ + {ts: 100, h: testNHCB, st: 1}, + {ts: 101, h: testNHCB, st: 1}, + }, + expectedSamples: func() []chunks.Sample { + return []chunks.Sample{ + sample{t: 1, h: testZeroNHCB}, + sample{t: 100, h: testNHCB}, + sample{t: 101, h: testNHCB}, + } + }(), + }, + { + name: "In order ct+normal sample/floatNHCB", + appendableSamples: []appendableSamples{ + {ts: 100, fh: testFloatNHCB, st: 1}, + {ts: 101, fh: testFloatNHCB, st: 1}, + }, + expectedSamples: func() []chunks.Sample { + return []chunks.Sample{ + sample{t: 1, fh: testZeroFloatNHCB}, + sample{t: 100, fh: testFloatNHCB}, + sample{t: 101, fh: testFloatNHCB}, + } + }(), + }, { name: "Consecutive appends with same st ignore st/floatSample", appendableSamples: []appendableSamples{ @@ -4223,6 +4272,34 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) { } }(), }, + { + name: "Consecutive appends with same st ignore st/NHCB", + appendableSamples: []appendableSamples{ + {ts: 100, h: testNHCB, st: 1}, + {ts: 101, h: testNHCB, st: 1}, + }, + expectedSamples: func() []chunks.Sample { + return []chunks.Sample{ + sample{t: 1, h: testZeroNHCB}, + sample{t: 100, h: testNHCB}, + sample{t: 101, h: testNHCB}, + } + }(), + }, + { + name: "Consecutive appends with same st ignore st/floatNHCB", + appendableSamples: []appendableSamples{ + {ts: 100, fh: testFloatNHCB, st: 1}, + {ts: 101, fh: testFloatNHCB, st: 1}, + }, + expectedSamples: func() []chunks.Sample { + return []chunks.Sample{ + sample{t: 1, fh: testZeroFloatNHCB}, + sample{t: 100, fh: testFloatNHCB}, + sample{t: 101, fh: testFloatNHCB}, + } + }(), + }, { name: "Consecutive appends with newer st do not ignore st/floatSample", appendableSamples: []appendableSamples{ @@ -4262,6 +4339,32 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) { sample{t: 102, fh: testFloatHistogram}, }, }, + { + name: "Consecutive appends with newer st do not ignore st/NHCB", + appendableSamples: []appendableSamples{ + {ts: 100, h: testNHCB, st: 1}, + {ts: 102, h: testNHCB, st: 101}, + }, + expectedSamples: []chunks.Sample{ + sample{t: 1, h: testZeroNHCB}, + sample{t: 100, h: testNHCB}, + sample{t: 101, h: testZeroNHCB}, + sample{t: 102, h: testNHCB}, + }, + }, + { + name: "Consecutive appends with newer st do not ignore st/floatNHCB", + appendableSamples: []appendableSamples{ + {ts: 100, fh: testFloatNHCB, st: 1}, + {ts: 102, fh: testFloatNHCB, st: 101}, + }, + expectedSamples: []chunks.Sample{ + sample{t: 1, fh: testZeroFloatNHCB}, + sample{t: 100, fh: testFloatNHCB}, + sample{t: 101, fh: testZeroFloatNHCB}, + sample{t: 102, fh: testFloatNHCB}, + }, + }, { name: "ST equals to previous sample timestamp is ignored/floatSample", appendableSamples: []appendableSamples{ @@ -4302,6 +4405,34 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) { } }(), }, + { + name: "ST equals to previous sample timestamp is ignored/NHCB", + appendableSamples: []appendableSamples{ + {ts: 100, h: testNHCB, st: 1}, + {ts: 101, h: testNHCB, st: 100}, + }, + expectedSamples: func() []chunks.Sample { + return []chunks.Sample{ + sample{t: 1, h: testZeroNHCB}, + sample{t: 100, h: testNHCB}, + sample{t: 101, h: testNHCB}, + } + }(), + }, + { + name: "ST equals to previous sample timestamp is ignored/floatNHCB", + appendableSamples: []appendableSamples{ + {ts: 100, fh: testFloatNHCB, st: 1}, + {ts: 101, fh: testFloatNHCB, st: 100}, + }, + expectedSamples: func() []chunks.Sample { + return []chunks.Sample{ + sample{t: 1, fh: testZeroFloatNHCB}, + sample{t: 100, fh: testFloatNHCB}, + sample{t: 101, fh: testFloatNHCB}, + } + }(), + }, { name: "ST lower than minValidTime/float", appendableSamples: []appendableSamples{ @@ -4349,6 +4480,40 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) { } }(), }, + { + name: "ST lower than minValidTime/NHCB", + appendableSamples: []appendableSamples{ + {ts: 100, h: testNHCB, st: -1}, + }, + // ST results ErrOutOfBounds, but ST append is best effort, so + // ST should be ignored, but sample appended. + expectedSamples: func() []chunks.Sample { + // NOTE: Without ST, on query, first histogram sample will get + // CounterReset adjusted to 0. + firstSample := testNHCB.Copy() + firstSample.CounterResetHint = histogram.UnknownCounterReset + return []chunks.Sample{ + sample{t: 100, h: firstSample}, + } + }(), + }, + { + name: "ST lower than minValidTime/floatNHCB", + appendableSamples: []appendableSamples{ + {ts: 100, fh: testFloatNHCB, st: -1}, + }, + // ST results ErrOutOfBounds, but ST append is best effort, so + // ST should be ignored, but sample appended. + expectedSamples: func() []chunks.Sample { + // NOTE: Without ST, on query, first histogram sample will get + // CounterReset adjusted to 0. + firstSample := testFloatNHCB.Copy() + firstSample.CounterResetHint = histogram.UnknownCounterReset + return []chunks.Sample{ + sample{t: 100, fh: firstSample}, + } + }(), + }, { name: "ST duplicates an existing sample/float", appendableSamples: []appendableSamples{ @@ -4402,6 +4567,44 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) { } }(), }, + { + name: "ST duplicates an existing sample/NHCB", + appendableSamples: []appendableSamples{ + {ts: 100, h: testNHCB}, + {ts: 200, h: testNHCB, st: 100}, + }, + // ST results ErrDuplicateSampleForTimestamp, but ST append is best effort, so + // ST should be ignored, but sample appended. + expectedSamples: func() []chunks.Sample { + // NOTE: Without ST, on query, first histogram sample will get + // CounterReset adjusted to 0. + firstSample := testNHCB.Copy() + firstSample.CounterResetHint = histogram.UnknownCounterReset + return []chunks.Sample{ + sample{t: 100, h: firstSample}, + sample{t: 200, h: testNHCB}, + } + }(), + }, + { + name: "ST duplicates an existing sample/floatNHCB", + appendableSamples: []appendableSamples{ + {ts: 100, fh: testFloatNHCB}, + {ts: 200, fh: testFloatNHCB, st: 100}, + }, + // ST results ErrDuplicateSampleForTimestamp, but ST append is best effort, so + // ST should ignored, but sample appended. + expectedSamples: func() []chunks.Sample { + // NOTE: Without ST, on query, first histogram sample will get + // CounterReset adjusted to 0. + firstSample := testFloatNHCB.Copy() + firstSample.CounterResetHint = histogram.UnknownCounterReset + return []chunks.Sample{ + sample{t: 100, fh: firstSample}, + sample{t: 200, fh: testFloatNHCB}, + } + }(), + }, } { t.Run(tc.name, func(t *testing.T) { opts := newTestHeadDefaultOptions(DefaultBlockDuration, false)