feat(scrape)[PART5b]: Add AppenderV2 support to scrape.NewManager constructor (#17872)
Some checks failed
buf.build / lint and publish (push) Has been cancelled
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Build Prometheus for common architectures (push) Has been cancelled
CI / Build Prometheus for all architectures (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
Scorecards supply-chain security / Scorecards analysis (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled

* feat(scrape)[PART5b]: Add AppenderV2 support to scrape.NewManager optionally to V1

Signed-off-by: bwplotka <bwplotka@gmail.com>

* Update scrape/manager.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* fixes after rebase

Signed-off-by: bwplotka <bwplotka@gmail.com>

* Apply suggestions from code review

Co-authored-by: Arve Knudsen <arve.knudsen@gmail.com>
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

---------

Signed-off-by: bwplotka <bwplotka@gmail.com>
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Arve Knudsen <arve.knudsen@gmail.com>
This commit is contained in:
Bartlomiej Plotka 2026-01-23 09:04:05 +00:00 committed by GitHub
parent 0d116b0994
commit bec70227f1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 250 additions and 28 deletions

View file

@ -875,7 +875,7 @@ func main() {
&cfg.scrape,
logger.With("component", "scrape manager"),
logging.NewJSONFileLogger,
fanoutStorage,
fanoutStorage, nil, // TODO(bwplotka): Switch to AppendableV2.
prometheus.DefaultRegisterer,
)
if err != nil {

View file

@ -39,14 +39,32 @@ import (
"github.com/prometheus/prometheus/util/pool"
)
// NewManager is the Manager constructor using Appendable.
func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error), appendable storage.Appendable, registerer prometheus.Registerer) (*Manager, error) {
// NewManager is the Manager constructor using storage.Appendable or storage.AppendableV2.
//
// If unsure which one to use/implement, implement AppendableV2 as it significantly simplifies implementation and allows more
// (passing ST, always-on metadata, exemplars per sample).
//
// NewManager returns error if both appendable and appendableV2 are specified.
//
// Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
// storage.Appendable will be removed soon (ETA: Q2 2026).
func NewManager(
o *Options,
logger *slog.Logger,
newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error),
appendable storage.Appendable,
appendableV2 storage.AppendableV2,
registerer prometheus.Registerer,
) (*Manager, error) {
if o == nil {
o = &Options{}
}
if logger == nil {
logger = promslog.NewNopLogger()
}
if appendable != nil && appendableV2 != nil {
return nil, errors.New("scrape.NewManager: appendable and appendableV2 cannot be provided at the same time")
}
sm, err := newScrapeMetrics(registerer)
if err != nil {
@ -55,6 +73,7 @@ func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(str
m := &Manager{
appendable: appendable,
appendableV2: appendableV2,
opts: o,
logger: logger,
newScrapeFailureLogger: newScrapeFailureLogger,

View file

@ -522,7 +522,7 @@ scrape_configs:
)
opts := Options{}
scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry)
scrapeManager, err := NewManager(&opts, nil, nil, nil, nil, testRegistry)
require.NoError(t, err)
newLoop := func(scrapeLoopOptions) loop {
ch <- struct{}{}
@ -578,7 +578,7 @@ scrape_configs:
func TestManagerTargetsUpdates(t *testing.T) {
opts := Options{}
testRegistry := prometheus.NewRegistry()
m, err := NewManager(&opts, nil, nil, nil, testRegistry)
m, err := NewManager(&opts, nil, nil, nil, nil, testRegistry)
require.NoError(t, err)
ts := make(chan map[string][]*targetgroup.Group)
@ -631,7 +631,7 @@ global:
opts := Options{}
testRegistry := prometheus.NewRegistry()
scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry)
scrapeManager, err := NewManager(&opts, nil, nil, nil, nil, testRegistry)
require.NoError(t, err)
// Load the first config.
@ -701,7 +701,7 @@ scrape_configs:
}
opts := Options{}
scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry)
scrapeManager, err := NewManager(&opts, nil, nil, nil, nil, testRegistry)
require.NoError(t, err)
reload(scrapeManager, cfg1)
@ -735,6 +735,8 @@ func setupTestServer(t *testing.T, typ string, toWrite []byte) *httptest.Server
}
// TestManagerSTZeroIngestion tests scrape manager for various ST cases.
// NOTE(bwplotka): There is no AppenderV2 test for this STZeroIngestion feature as in V2 flow it's
// moved to AppenderV2 implementation (e.g. storage) and it's tested there, e.g. tsdb.TestHeadAppenderV2_Append_EnableSTAsZeroSample.
func TestManagerSTZeroIngestion(t *testing.T) {
t.Parallel()
const (
@ -766,7 +768,7 @@ func TestManagerSTZeroIngestion(t *testing.T) {
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
EnableStartTimestampZeroIngestion: testSTZeroIngest,
skipOffsetting: true,
}, app)
}, app, nil)
defer scrapeManager.Stop()
server := setupTestServer(t, config.ScrapeProtocolsHeaders[testFormat], encoded)
@ -905,6 +907,8 @@ func generateTestHistogram(i int) *dto.Histogram {
return h
}
// NOTE(bwplotka): There is no AppenderV2 test for this STZeroIngestion feature as in V2 flow it's
// moved to AppenderV2 implementation (e.g. storage) and it's tested there, e.g. tsdb.TestHeadAppenderV2_Append_EnableSTAsZeroSample.
func TestManagerSTZeroIngestionHistogram(t *testing.T) {
t.Parallel()
const mName = "expected_histogram"
@ -950,7 +954,7 @@ func TestManagerSTZeroIngestionHistogram(t *testing.T) {
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
EnableStartTimestampZeroIngestion: tc.enableSTZeroIngestion,
skipOffsetting: true,
}, app)
}, app, nil)
defer scrapeManager.Stop()
once := sync.Once{}
@ -1030,7 +1034,7 @@ func TestUnregisterMetrics(t *testing.T) {
// Check that all metrics can be unregistered, allowing a second manager to be created.
for range 2 {
opts := Options{}
manager, err := NewManager(&opts, nil, nil, nil, reg)
manager, err := NewManager(&opts, nil, nil, nil, nil, reg)
require.NotNil(t, manager)
require.NoError(t, err)
// Unregister all metrics.
@ -1043,6 +1047,9 @@ func TestUnregisterMetrics(t *testing.T) {
// This test addresses issue #17216 by ensuring the previously blocking check has been removed.
// The test verifies that the presence of exemplars in the input does not cause errors,
// although exemplars are not preserved during NHCB conversion (as documented below).
//
// NOTE(bwplotka): There is no AppenderV2 test for this STZeroIngestion feature as in V2 flow it's
// moved to AppenderV2 implementation (e.g. storage) and it's tested there, e.g. tsdb.TestHeadAppenderV2_Append_EnableSTAsZeroSample.
func TestNHCBAndSTZeroIngestion(t *testing.T) {
t.Parallel()
@ -1059,7 +1066,7 @@ func TestNHCBAndSTZeroIngestion(t *testing.T) {
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
EnableStartTimestampZeroIngestion: true,
skipOffsetting: true,
}, app)
}, app, nil)
defer scrapeManager.Stop()
once := sync.Once{}
@ -1153,16 +1160,13 @@ func applyConfig(
require.NoError(t, discoveryManager.ApplyConfig(c))
}
func runManagers(t *testing.T, ctx context.Context, opts *Options, app storage.Appendable) (*discovery.Manager, *Manager) {
func runManagers(t *testing.T, ctx context.Context, opts *Options, app storage.Appendable, appV2 storage.AppendableV2) (*discovery.Manager, *Manager) {
t.Helper()
if opts == nil {
opts = &Options{}
}
opts.DiscoveryReloadInterval = model.Duration(100 * time.Millisecond)
if app == nil {
app = teststorage.NewAppendable()
}
reg := prometheus.NewRegistry()
sdMetrics, err := discovery.RegisterSDMetrics(reg, discovery.NewRefreshMetrics(reg))
@ -1178,7 +1182,7 @@ func runManagers(t *testing.T, ctx context.Context, opts *Options, app storage.A
opts,
nil,
nil,
app,
app, appV2,
prometheus.NewRegistry(),
)
require.NoError(t, err)
@ -1251,7 +1255,7 @@ scrape_configs:
- files: ['%s']
`
discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil)
discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil, nil)
defer scrapeManager.Stop()
applyConfig(
@ -1350,7 +1354,7 @@ scrape_configs:
file_sd_configs:
- files: ['%s', '%s']
`
discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil)
discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil, nil)
defer scrapeManager.Stop()
applyConfig(
@ -1409,7 +1413,7 @@ scrape_configs:
file_sd_configs:
- files: ['%s']
`
discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil)
discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil, nil)
defer scrapeManager.Stop()
applyConfig(
@ -1475,7 +1479,7 @@ scrape_configs:
- targets: ['%s']
`
discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil)
discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil, nil)
defer scrapeManager.Stop()
// Apply the initial config with an existing file
@ -1559,7 +1563,7 @@ scrape_configs:
cfg := loadConfiguration(t, cfgText)
m, err := NewManager(&Options{}, nil, nil, teststorage.NewAppendable(), prometheus.NewRegistry())
m, err := NewManager(&Options{}, nil, nil, nil, nil, prometheus.NewRegistry())
require.NoError(t, err)
defer m.Stop()
require.NoError(t, m.ApplyConfig(cfg))

View file

@ -5751,14 +5751,10 @@ scrape_configs:
s := teststorage.New(t)
reg := prometheus.NewRegistry()
mng, err := NewManager(&Options{DiscoveryReloadInterval: model.Duration(10 * time.Millisecond)}, nil, nil, s, reg)
sa := selectAppendable(s, appV2)
mng, err := NewManager(&Options{DiscoveryReloadInterval: model.Duration(10 * time.Millisecond)}, nil, nil, sa.V1(), sa.V2(), reg)
require.NoError(t, err)
if appV2 {
mng.appendableV2 = s
mng.appendable = nil
}
cfg, err := config.Load(configStr, promslog.NewNopLogger())
require.NoError(t, err)
require.NoError(t, mng.ApplyConfig(cfg))

View file

@ -4111,10 +4111,18 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) {
// Make sure counter resets hints are non-zero, so we can detect ST histogram samples.
testHistogram := tsdbutil.GenerateTestHistogram(1)
testHistogram.CounterResetHint = histogram.NotCounterReset
testFloatHistogram := tsdbutil.GenerateTestFloatHistogram(1)
testFloatHistogram.CounterResetHint = histogram.NotCounterReset
testNHCB := tsdbutil.GenerateTestCustomBucketsHistogram(1)
testNHCB.CounterResetHint = histogram.NotCounterReset
testFloatNHCB := tsdbutil.GenerateTestCustomBucketsFloatHistogram(1)
testFloatNHCB.CounterResetHint = histogram.NotCounterReset
// TODO(beorn7): Once issue #15346 is fixed, the CounterResetHint of the
// following two zero histograms should be histogram.CounterReset.
// following zero histograms should be histogram.CounterReset.
testZeroHistogram := &histogram.Histogram{
Schema: testHistogram.Schema,
ZeroThreshold: testHistogram.ZeroThreshold,
@ -4131,6 +4139,19 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) {
PositiveBuckets: []float64{0, 0, 0, 0},
NegativeBuckets: []float64{0, 0, 0, 0},
}
testZeroNHCB := &histogram.Histogram{
Schema: testNHCB.Schema,
PositiveSpans: testNHCB.PositiveSpans,
PositiveBuckets: []int64{0, 0, 0, 0},
CustomValues: testNHCB.CustomValues,
}
testZeroFloatNHCB := &histogram.FloatHistogram{
Schema: testFloatNHCB.Schema,
PositiveSpans: testFloatNHCB.PositiveSpans,
PositiveBuckets: []float64{0, 0, 0, 0},
CustomValues: testFloatNHCB.CustomValues,
}
type appendableSamples struct {
ts int64
fSample float64
@ -4183,6 +4204,34 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) {
}
}(),
},
{
name: "In order ct+normal sample/NHCB",
appendableSamples: []appendableSamples{
{ts: 100, h: testNHCB, st: 1},
{ts: 101, h: testNHCB, st: 1},
},
expectedSamples: func() []chunks.Sample {
return []chunks.Sample{
sample{t: 1, h: testZeroNHCB},
sample{t: 100, h: testNHCB},
sample{t: 101, h: testNHCB},
}
}(),
},
{
name: "In order ct+normal sample/floatNHCB",
appendableSamples: []appendableSamples{
{ts: 100, fh: testFloatNHCB, st: 1},
{ts: 101, fh: testFloatNHCB, st: 1},
},
expectedSamples: func() []chunks.Sample {
return []chunks.Sample{
sample{t: 1, fh: testZeroFloatNHCB},
sample{t: 100, fh: testFloatNHCB},
sample{t: 101, fh: testFloatNHCB},
}
}(),
},
{
name: "Consecutive appends with same st ignore st/floatSample",
appendableSamples: []appendableSamples{
@ -4223,6 +4272,34 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) {
}
}(),
},
{
name: "Consecutive appends with same st ignore st/NHCB",
appendableSamples: []appendableSamples{
{ts: 100, h: testNHCB, st: 1},
{ts: 101, h: testNHCB, st: 1},
},
expectedSamples: func() []chunks.Sample {
return []chunks.Sample{
sample{t: 1, h: testZeroNHCB},
sample{t: 100, h: testNHCB},
sample{t: 101, h: testNHCB},
}
}(),
},
{
name: "Consecutive appends with same st ignore st/floatNHCB",
appendableSamples: []appendableSamples{
{ts: 100, fh: testFloatNHCB, st: 1},
{ts: 101, fh: testFloatNHCB, st: 1},
},
expectedSamples: func() []chunks.Sample {
return []chunks.Sample{
sample{t: 1, fh: testZeroFloatNHCB},
sample{t: 100, fh: testFloatNHCB},
sample{t: 101, fh: testFloatNHCB},
}
}(),
},
{
name: "Consecutive appends with newer st do not ignore st/floatSample",
appendableSamples: []appendableSamples{
@ -4262,6 +4339,32 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) {
sample{t: 102, fh: testFloatHistogram},
},
},
{
name: "Consecutive appends with newer st do not ignore st/NHCB",
appendableSamples: []appendableSamples{
{ts: 100, h: testNHCB, st: 1},
{ts: 102, h: testNHCB, st: 101},
},
expectedSamples: []chunks.Sample{
sample{t: 1, h: testZeroNHCB},
sample{t: 100, h: testNHCB},
sample{t: 101, h: testZeroNHCB},
sample{t: 102, h: testNHCB},
},
},
{
name: "Consecutive appends with newer st do not ignore st/floatNHCB",
appendableSamples: []appendableSamples{
{ts: 100, fh: testFloatNHCB, st: 1},
{ts: 102, fh: testFloatNHCB, st: 101},
},
expectedSamples: []chunks.Sample{
sample{t: 1, fh: testZeroFloatNHCB},
sample{t: 100, fh: testFloatNHCB},
sample{t: 101, fh: testZeroFloatNHCB},
sample{t: 102, fh: testFloatNHCB},
},
},
{
name: "ST equals to previous sample timestamp is ignored/floatSample",
appendableSamples: []appendableSamples{
@ -4302,6 +4405,34 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) {
}
}(),
},
{
name: "ST equals to previous sample timestamp is ignored/NHCB",
appendableSamples: []appendableSamples{
{ts: 100, h: testNHCB, st: 1},
{ts: 101, h: testNHCB, st: 100},
},
expectedSamples: func() []chunks.Sample {
return []chunks.Sample{
sample{t: 1, h: testZeroNHCB},
sample{t: 100, h: testNHCB},
sample{t: 101, h: testNHCB},
}
}(),
},
{
name: "ST equals to previous sample timestamp is ignored/floatNHCB",
appendableSamples: []appendableSamples{
{ts: 100, fh: testFloatNHCB, st: 1},
{ts: 101, fh: testFloatNHCB, st: 100},
},
expectedSamples: func() []chunks.Sample {
return []chunks.Sample{
sample{t: 1, fh: testZeroFloatNHCB},
sample{t: 100, fh: testFloatNHCB},
sample{t: 101, fh: testFloatNHCB},
}
}(),
},
{
name: "ST lower than minValidTime/float",
appendableSamples: []appendableSamples{
@ -4349,6 +4480,40 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) {
}
}(),
},
{
name: "ST lower than minValidTime/NHCB",
appendableSamples: []appendableSamples{
{ts: 100, h: testNHCB, st: -1},
},
// ST results ErrOutOfBounds, but ST append is best effort, so
// ST should be ignored, but sample appended.
expectedSamples: func() []chunks.Sample {
// NOTE: Without ST, on query, first histogram sample will get
// CounterReset adjusted to 0.
firstSample := testNHCB.Copy()
firstSample.CounterResetHint = histogram.UnknownCounterReset
return []chunks.Sample{
sample{t: 100, h: firstSample},
}
}(),
},
{
name: "ST lower than minValidTime/floatNHCB",
appendableSamples: []appendableSamples{
{ts: 100, fh: testFloatNHCB, st: -1},
},
// ST results ErrOutOfBounds, but ST append is best effort, so
// ST should be ignored, but sample appended.
expectedSamples: func() []chunks.Sample {
// NOTE: Without ST, on query, first histogram sample will get
// CounterReset adjusted to 0.
firstSample := testFloatNHCB.Copy()
firstSample.CounterResetHint = histogram.UnknownCounterReset
return []chunks.Sample{
sample{t: 100, fh: firstSample},
}
}(),
},
{
name: "ST duplicates an existing sample/float",
appendableSamples: []appendableSamples{
@ -4402,6 +4567,44 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) {
}
}(),
},
{
name: "ST duplicates an existing sample/NHCB",
appendableSamples: []appendableSamples{
{ts: 100, h: testNHCB},
{ts: 200, h: testNHCB, st: 100},
},
// ST results ErrDuplicateSampleForTimestamp, but ST append is best effort, so
// ST should be ignored, but sample appended.
expectedSamples: func() []chunks.Sample {
// NOTE: Without ST, on query, first histogram sample will get
// CounterReset adjusted to 0.
firstSample := testNHCB.Copy()
firstSample.CounterResetHint = histogram.UnknownCounterReset
return []chunks.Sample{
sample{t: 100, h: firstSample},
sample{t: 200, h: testNHCB},
}
}(),
},
{
name: "ST duplicates an existing sample/floatNHCB",
appendableSamples: []appendableSamples{
{ts: 100, fh: testFloatNHCB},
{ts: 200, fh: testFloatNHCB, st: 100},
},
// ST results ErrDuplicateSampleForTimestamp, but ST append is best effort, so
// ST should ignored, but sample appended.
expectedSamples: func() []chunks.Sample {
// NOTE: Without ST, on query, first histogram sample will get
// CounterReset adjusted to 0.
firstSample := testFloatNHCB.Copy()
firstSample.CounterResetHint = histogram.UnknownCounterReset
return []chunks.Sample{
sample{t: 100, fh: firstSample},
sample{t: 200, fh: testFloatNHCB},
}
}(),
},
} {
t.Run(tc.name, func(t *testing.T) {
opts := newTestHeadDefaultOptions(DefaultBlockDuration, false)