scrape: add option to manager to allow scraping at shutdown; add initial offset option (#18067)
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Compliance testing (push) Waiting to run
CI / Build Prometheus for common architectures (push) Waiting to run
CI / Build Prometheus for all architectures (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run

* Adding scape on shutdown

Signed-off-by: avilevy <avilevy@google.com>

* scrape: replace skipOffsetting to make the test offset deterministic instead of skipping it entirely

Signed-off-by: avilevy <avilevy@google.com>

* renamed calculateScrapeOffset to getScrapeOffset

Signed-off-by: avilevy <avilevy@google.com>

* test(scrape): refactor time-based manager tests to use synctest

Addresses PR feedback to remove flaky, time-based sleeping in the scrape manager tests.

Add TestManager_InitialScrapeOffset and TestManager_ScrapeOnShutdown to use the testing/synctest package, completely eliminating real-world time.Sleep delays and making the assertions 100% deterministic.

- Replaced httptest.Server with net.Pipe and a custom startFakeHTTPServer helper to ensure all network I/O remains durably blocked inside the synctest bubble.
- Leveraged the skipOffsetting option to eliminate random scrape jitter, making the time-travel math exact and predictable.
- Using skipOffsetting also safely bypasses the global singleflight DNS lookup in setOffsetSeed, which previously caused cross-bubble panics in synctest.
- Extracted shared boilerplate into a setupSynctestManager helper to keep the test cases highly readable and data-driven.

Signed-off-by: avilevy <avilevy@google.com>

* Clarify use cases in InitialScrapeOffset comment

Signed-off-by: avilevy <avilevy@google.com>

* test(scrape): use httptest for mock server to respect context cancellation

- Replaced manual HTTP string formatting over `net.Pipe` with `httptest.NewUnstartedServer`.
- Implemented an in-memory `pipeListener` to allow the server to handle `net.Pipe` connections directly. This preserves `synctest` time isolation without opening real OS ports.
- Added explicit `r.Context().Done()` handling in the mock HTTP handler to properly simulate aborted requests and scrape timeouts.
- Validates that the request context remains active and is not prematurely cancelled during `ScrapeOnShutdown` scenarios.
- Renamed `skipOffsetting` to `skipJitterOffsetting`.
- Addressed other PR comments.

Signed-off-by: avilevy <avilevy@google.com>

* tmp

Signed-off-by: bwplotka <bwplotka@gmail.com>

* exp2

Signed-off-by: bwplotka <bwplotka@gmail.com>

* fix

Signed-off-by: bwplotka <bwplotka@gmail.com>

* scrape: fix scrapeOnShutdown context bug and refactor test helpers
The scrapeOnShutdown feature was failing during manager shutdown because
the scrape pool context was being cancelled before the final shutdown
scrapes could execute. Fix this by delaying context cancellation
in scrapePool.stop() until after all scrape loops have stopped.
In addition:
- Added test cases to verify scrapeOnShutdown works with InitialScrapeOffset.
- Refactored network test helper functions from manager_test.go to
  helpers_test.go.
- Addressed other comments.

Signed-off-by: avilevy <avilevy@google.com>

* Update scrape/scrape.go

Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
Signed-off-by: avilevy18 <105948922+avilevy18@users.noreply.github.com>

---------

Signed-off-by: avilevy <avilevy@google.com>
Signed-off-by: bwplotka <bwplotka@gmail.com>
Signed-off-by: avilevy18 <105948922+avilevy18@users.noreply.github.com>
Co-authored-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
avilevy18 2026-03-17 06:02:11 -04:00 committed by GitHub
parent c669470f07
commit bdfb3fc232
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 344 additions and 41 deletions

View file

@ -18,17 +18,24 @@ import (
"context"
"encoding/binary"
"fmt"
"net"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
"github.com/stretchr/testify/require"
"go.yaml.in/yaml/v2"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
@ -233,3 +240,132 @@ func TestSelectAppendable(t *testing.T) {
}
})
}
// pipeListener is an in-memory net.Listener that connects a custom DialContext
// directly to the httptest Server without opening real OS ports.
type pipeListener struct {
conns chan net.Conn
closed chan struct{}
once sync.Once
}
func newPipeListener() *pipeListener {
return &pipeListener{
conns: make(chan net.Conn),
closed: make(chan struct{}),
}
}
func (l *pipeListener) Accept() (net.Conn, error) {
select {
case c := <-l.conns:
return c, nil
case <-l.closed:
return nil, net.ErrClosed
}
}
func (l *pipeListener) Close() error {
l.once.Do(func() { close(l.closed) })
return nil
}
// Dummy Addr implementation to satisfy the net.Listener interface.
type pipeAddr struct{}
func (pipeAddr) Network() string { return "pipe" }
func (pipeAddr) String() string { return "pipe" }
func (*pipeListener) Addr() net.Addr { return pipeAddr{} }
// startFakeHTTPServer spins up a httptest.Server bound to an in-memory
// pipeListener. It returns the listener (to be wired to a custom dialer) and a
// cleanup function to shut down the server.
func startFakeHTTPServer(t *testing.T) (*pipeListener, func()) {
t.Helper()
listener := newPipeListener()
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Abort if the request context is canceled (e.g., due to a scrape timeout).
select {
case <-r.Context().Done():
return
default:
w.Header().Set("Content-Type", "text/plain; version=0.0.4")
fmt.Fprintln(w, "expected_metric 1")
}
})
srv := httptest.NewUnstartedServer(handler)
srv.Listener = listener
// Background goroutines inherit the synctest bubble safely.
srv.Start()
return listener, srv.Close
}
// setupSynctestManager abstracts the boilerplate of creating a mock network,
// starting the fake HTTP server, and configuring the scrape manager for synctest.
func setupSynctestManager(t *testing.T, opts *Options, interval time.Duration) (*Manager, *teststorage.Appendable, func()) {
t.Helper()
app := teststorage.NewAppendable()
listener, cleanup := startFakeHTTPServer(t)
if opts == nil {
opts = &Options{}
}
opts.skipJitterOffsetting = true
// Ensure the scraper creates a new net.Pipe on every dial attempt
// and hands the server-side connection to the mock server's listener.
opts.HTTPClientOptions = []config_util.HTTPClientOption{
config_util.WithDialContextFunc(func(ctx context.Context, _, _ string) (net.Conn, error) {
srvConn, cliConn := net.Pipe()
select {
case listener.conns <- srvConn:
// Give the client side to the scraper.
return cliConn, nil
case <-listener.closed:
return nil, net.ErrClosed
case <-ctx.Done():
return nil, ctx.Err()
}
}),
}
scrapeManager, err := NewManager(
opts,
promslog.New(&promslog.Config{}),
nil, nil, app, prometheus.NewRegistry(),
)
require.NoError(t, err)
cfg := &config.Config{
GlobalConfig: config.GlobalConfig{
ScrapeInterval: model.Duration(interval),
ScrapeTimeout: model.Duration(interval),
ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0},
},
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
}
cfgText, err := yaml.Marshal(*cfg)
require.NoError(t, err)
cfg = loadConfiguration(t, string(cfgText))
require.NoError(t, scrapeManager.ApplyConfig(cfg))
scrapeManager.updateTsets(map[string][]*targetgroup.Group{
"test": {{
Targets: []model.LabelSet{{
model.SchemeLabel: "http",
model.AddressLabel: "test.local",
}},
}},
})
scrapeManager.reload()
return scrapeManager, app, cleanup
}

View file

@ -126,8 +126,31 @@ type Options struct {
// FeatureRegistry is the registry for tracking enabled/disabled features.
FeatureRegistry features.Collector
// ScrapeOnShutdown enables a final scrape before the manager closes. This is useful
// for Prometheus in agent mode or OTel's prometheusreceiver when used in serverless
// job scenarios, allowing an extra scrape for the short-living edge cases.
//
// NOTE: This final scrape ignores the configured scrape interval.
ScrapeOnShutdown bool
// InitialScrapeOffset applies an additional baseline delay before we begin
// scraping targets. By default, Prometheus calculates a specific offset for
// each target to spread the scraping load evenly across the server. Configuring
// this option adds a fixed duration to that target-specific offset. This allows
// tuning the initial startup delay without overriding the underlying target
// jitter, preserving proper load balancing across the scraper pools.
//
// Setting this offset (e.g., to 10s) is particularly useful in Prometheus
// agent mode and OTel's prometheusreceiver when used in serverless job
// scenarios. It helps avoid readiness races where targets might not be fully
// initialized immediately upon startup. It also prevents capturing
// intermediate state (such as applications crashing shortly after booting),
// and ensures backend rate limits don't drop valuable shutdown scrapes
// because of an early startup scrape.
InitialScrapeOffset time.Duration
// private option for testability.
skipOffsetting bool
skipJitterOffsetting bool
}
// Manager maintains a set of scrape pools and manages start/stop cycles
@ -316,8 +339,16 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error {
m.scrapeFailureLoggers = scrapeFailureLoggers
if err := m.setOffsetSeed(cfg.GlobalConfig.ExternalLabels); err != nil {
return err
// Skip offset seed calculation during tests.
// setOffsetSeed relies on osutil.GetFQDN(), which triggers a DNS lookup using
// a global singleflight goroutine. This cross-boundary communication breaks
// synctest's isolation bubble and causes a fatal panic.
if m.opts.skipJitterOffsetting {
m.offsetSeed = 0
} else {
if err := m.setOffsetSeed(cfg.GlobalConfig.ExternalLabels); err != nil {
return err
}
}
// Cleanup and reload pool if the configuration has changed.

View file

@ -53,6 +53,7 @@ import (
"github.com/prometheus/prometheus/util/runutil"
"github.com/prometheus/prometheus/util/teststorage"
"github.com/prometheus/prometheus/util/testutil"
"github.com/prometheus/prometheus/util/testutil/synctest"
)
func TestPopulateLabels(t *testing.T) {
@ -767,7 +768,7 @@ func TestManagerSTZeroIngestion(t *testing.T) {
app := teststorage.NewAppendable()
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
EnableStartTimestampZeroIngestion: testSTZeroIngest,
skipOffsetting: true,
skipJitterOffsetting: true,
}, app, nil)
defer scrapeManager.Stop()
@ -953,7 +954,7 @@ func TestManagerSTZeroIngestionHistogram(t *testing.T) {
app := teststorage.NewAppendable()
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
EnableStartTimestampZeroIngestion: tc.enableSTZeroIngestion,
skipOffsetting: true,
skipJitterOffsetting: true,
}, app, nil)
defer scrapeManager.Stop()
@ -1065,7 +1066,7 @@ func TestNHCBAndSTZeroIngestion(t *testing.T) {
app := teststorage.NewAppendable()
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
EnableStartTimestampZeroIngestion: true,
skipOffsetting: true,
skipJitterOffsetting: true,
}, app, nil)
defer scrapeManager.Stop()
@ -1584,7 +1585,7 @@ scrape_configs:
// Disable end of run staleness markers for some targets.
m.DisableEndOfRunStalenessMarkers("one", targetsToDisable)
// This should be a no-op
// This should be a no-op.
m.DisableEndOfRunStalenessMarkers("non-existent-job", targetsToDisable)
// Check that the end of run staleness markers are disabled for the correct targets.
@ -1596,3 +1597,117 @@ scrape_configs:
}
}
}
func TestManager_InitialScrapeOffset(t *testing.T) {
interval := 10 * time.Second
for _, tcase := range []struct {
name string
initialScrapeOffset time.Duration
runDuration time.Duration
expectedSamples int
}{
{
name: "zero offset scrapes immediately",
expectedSamples: 1,
},
{
name: "zero offset scrapes twice after one interval",
runDuration: interval,
expectedSamples: 2,
},
{
name: "large offset prevents immediate scrape",
initialScrapeOffset: 1 * time.Hour,
runDuration: 59 * time.Minute,
},
} {
t.Run(tcase.name, func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
opts := &Options{InitialScrapeOffset: tcase.initialScrapeOffset}
scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, interval)
defer cleanupConns()
// Wait for the scrape manager to block on its timers.
synctest.Wait()
// Fast-forward the fake clock by the test case's run duration.
time.Sleep(tcase.runDuration)
synctest.Wait()
// Stop the manager to clean up background goroutines.
scrapeManager.Stop()
require.Len(t, findSamplesForMetric(app.ResultSamples(), "expected_metric"), tcase.expectedSamples)
})
})
}
}
func TestManager_ScrapeOnShutdown(t *testing.T) {
interval := 10 * time.Second
for _, tcase := range []struct {
name string
scrapeOnShutdown bool
initialScrapeOffset time.Duration
runDuration time.Duration
expectedSamplesTotal int
}{
{
name: "no scrape on shutdown",
scrapeOnShutdown: false,
expectedSamplesTotal: 1,
},
{
name: "scrape on shutdown",
scrapeOnShutdown: true,
expectedSamplesTotal: 2,
},
{
name: "scrape on shutdown after some scrapes",
scrapeOnShutdown: true,
runDuration: interval,
expectedSamplesTotal: 3,
},
{
name: "scrape on shutdown with initial offset",
scrapeOnShutdown: true,
initialScrapeOffset: 10 * time.Second,
runDuration: 5 * time.Second,
expectedSamplesTotal: 1,
},
{
name: "scrape on shutdown with short running instance (offset 5s)",
scrapeOnShutdown: true,
initialScrapeOffset: 5 * time.Second,
runDuration: 8 * time.Second,
expectedSamplesTotal: 2,
},
} {
t.Run(tcase.name, func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
opts := &Options{
ScrapeOnShutdown: tcase.scrapeOnShutdown,
InitialScrapeOffset: tcase.initialScrapeOffset,
}
scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, interval)
defer cleanupConns()
// Wait for the initial scrape to happen exactly at t=0.
synctest.Wait()
// Fast-forward fake time to simulate scheduled scrapes before shutdown.
if tcase.runDuration > 0 {
time.Sleep(tcase.runDuration)
synctest.Wait()
}
// Stop the manager. This triggers the ScrapeOnShutdown logic synchronously.
scrapeManager.Stop()
require.Len(t, findSamplesForMetric(app.ResultSamples(), "expected_metric"), tcase.expectedSamplesTotal)
})
})
}
}

View file

@ -238,7 +238,6 @@ func (sp *scrapePool) SetScrapeFailureLogger(l FailureLogger) {
func (sp *scrapePool) stop() {
sp.mtx.Lock()
defer sp.mtx.Unlock()
sp.cancel()
var wg sync.WaitGroup
sp.targetMtx.Lock()
@ -258,6 +257,10 @@ func (sp *scrapePool) stop() {
sp.targetMtx.Unlock()
wg.Wait()
// Cancel the context after all loops have stopped. This is required for
// scrapeOnShutdown to work properly, as the shutdown scrape uses this
// context (via sl.parentCtx) and would fail if the context was cancelled early.
sp.cancel()
sp.client.CloseIdleConnections()
if sp.config != nil {
@ -820,11 +823,17 @@ type cacheEntry struct {
}
type scrapeLoop struct {
// Parameters.
ctx context.Context
cancel func()
stopped chan struct{}
parentCtx context.Context
// ctx represents a local context that is cancellable via s.cancel.
// It's meant to synchronize run() with stop().
// It inherits parentCtx.
ctx context.Context
cancel func()
stopped chan struct{}
// parentCtx represents manager-level context, typically connected
// to process shutdown.
parentCtx context.Context
// appenderCtx is a parentCtx with some extra context for appender
// implementations. Potentially remove-able with removal of AppenderV1.
appenderCtx context.Context
l *slog.Logger
cache *scrapeCache
@ -865,8 +874,9 @@ type scrapeLoop struct {
reportExtraMetrics bool
appendMetadataToWAL bool
passMetadataInContext bool
skipOffsetting bool // For testability.
skipJitterOffsetting bool // For testability.
scrapeOnShutdown bool
initialScrapeOffset time.Duration
// error injection through setForcedError.
forcedErr error
forcedErrMtx sync.Mutex
@ -1218,7 +1228,9 @@ func newScrapeLoop(opts scrapeLoopOptions) *scrapeLoop {
enableTypeAndUnitLabels: opts.sp.options.EnableTypeAndUnitLabels,
appendMetadataToWAL: opts.sp.options.AppendMetadata,
passMetadataInContext: opts.sp.options.PassMetadataInContext,
skipOffsetting: opts.sp.options.skipOffsetting,
skipJitterOffsetting: opts.sp.options.skipJitterOffsetting,
scrapeOnShutdown: opts.sp.options.ScrapeOnShutdown,
initialScrapeOffset: opts.sp.options.InitialScrapeOffset,
}
}
@ -1231,31 +1243,49 @@ func (sl *scrapeLoop) setScrapeFailureLogger(l FailureLogger) {
sl.scrapeFailureLogger = l
}
func (sl *scrapeLoop) getScrapeOffset() time.Duration {
offset := sl.scraper.offset(sl.interval, sl.offsetSeed)
if sl.skipJitterOffsetting {
offset = time.Duration(0)
}
return sl.initialScrapeOffset + offset
}
func (sl *scrapeLoop) run(errc chan<- error) {
if !sl.skipOffsetting {
var (
last time.Time
alignedScrapeTime = time.Now().Round(0)
ticker = time.NewTicker(sl.interval)
)
defer func() {
if sl.scrapeOnShutdown {
last = sl.scrapeAndReport(last, time.Now().Round(0), errc)
}
// Let the stop() know it can continue.
close(sl.stopped)
if sl.parentCtx.Err() == nil {
if !sl.disabledEndOfRunStalenessMarkers.Load() {
sl.endOfRunStaleness(last, ticker, sl.interval)
}
}
ticker.Stop()
}()
// Initial offset and jitter offset, if any.
offset := sl.getScrapeOffset()
if offset > 0 {
select {
case <-time.After(sl.scraper.offset(sl.interval, sl.offsetSeed)):
case <-time.After(offset):
// Continue after a scraping offset.
case <-sl.ctx.Done():
close(sl.stopped)
return
}
}
var last time.Time
alignedScrapeTime := time.Now().Round(0)
ticker := time.NewTicker(sl.interval)
defer ticker.Stop()
mainLoop:
for {
select {
case <-sl.parentCtx.Done():
close(sl.stopped)
return
case <-sl.ctx.Done():
break mainLoop
return
default:
}
@ -1282,20 +1312,11 @@ mainLoop:
last = sl.scrapeAndReport(last, scrapeTime, errc)
select {
case <-sl.parentCtx.Done():
close(sl.stopped)
return
case <-sl.ctx.Done():
break mainLoop
return
case <-ticker.C:
}
}
close(sl.stopped)
if !sl.disabledEndOfRunStalenessMarkers.Load() {
sl.endOfRunStaleness(last, ticker, sl.interval)
}
}
func (sl *scrapeLoop) appender() scrapeLoopAppendAdapter {

View file

@ -6653,7 +6653,7 @@ func testNewScrapeLoopHonorLabelsWiring(t *testing.T, appV2 bool) {
}
sa := selectAppendable(s, appV2)
sp, err := newScrapePool(cfg, sa.V1(), sa.V2(), 0, nil, nil, &Options{skipOffsetting: true}, newTestScrapeMetrics(t))
sp, err := newScrapePool(cfg, sa.V1(), sa.V2(), 0, nil, nil, &Options{skipJitterOffsetting: true}, newTestScrapeMetrics(t))
require.NoError(t, err)
defer sp.stop()