diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index 1db229561d..45c89ad8d7 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -18,17 +18,24 @@ import ( "context" "encoding/binary" "fmt" + "net" "net/http" + "net/http/httptest" + "sync" "testing" "time" "github.com/gogo/protobuf/proto" + "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" + config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/common/promslog" "github.com/stretchr/testify/require" + "go.yaml.in/yaml/v2" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -233,3 +240,132 @@ func TestSelectAppendable(t *testing.T) { } }) } + +// pipeListener is an in-memory net.Listener that connects a custom DialContext +// directly to the httptest Server without opening real OS ports. +type pipeListener struct { + conns chan net.Conn + closed chan struct{} + once sync.Once +} + +func newPipeListener() *pipeListener { + return &pipeListener{ + conns: make(chan net.Conn), + closed: make(chan struct{}), + } +} + +func (l *pipeListener) Accept() (net.Conn, error) { + select { + case c := <-l.conns: + return c, nil + case <-l.closed: + return nil, net.ErrClosed + } +} + +func (l *pipeListener) Close() error { + l.once.Do(func() { close(l.closed) }) + return nil +} + +// Dummy Addr implementation to satisfy the net.Listener interface. +type pipeAddr struct{} + +func (pipeAddr) Network() string { return "pipe" } +func (pipeAddr) String() string { return "pipe" } +func (*pipeListener) Addr() net.Addr { return pipeAddr{} } + +// startFakeHTTPServer spins up a httptest.Server bound to an in-memory +// pipeListener. It returns the listener (to be wired to a custom dialer) and a +// cleanup function to shut down the server. +func startFakeHTTPServer(t *testing.T) (*pipeListener, func()) { + t.Helper() + + listener := newPipeListener() + + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Abort if the request context is canceled (e.g., due to a scrape timeout). + select { + case <-r.Context().Done(): + return + default: + w.Header().Set("Content-Type", "text/plain; version=0.0.4") + fmt.Fprintln(w, "expected_metric 1") + } + }) + + srv := httptest.NewUnstartedServer(handler) + srv.Listener = listener + + // Background goroutines inherit the synctest bubble safely. + srv.Start() + + return listener, srv.Close +} + +// setupSynctestManager abstracts the boilerplate of creating a mock network, +// starting the fake HTTP server, and configuring the scrape manager for synctest. +func setupSynctestManager(t *testing.T, opts *Options, interval time.Duration) (*Manager, *teststorage.Appendable, func()) { + t.Helper() + app := teststorage.NewAppendable() + + listener, cleanup := startFakeHTTPServer(t) + + if opts == nil { + opts = &Options{} + } + opts.skipJitterOffsetting = true + + // Ensure the scraper creates a new net.Pipe on every dial attempt + // and hands the server-side connection to the mock server's listener. + opts.HTTPClientOptions = []config_util.HTTPClientOption{ + config_util.WithDialContextFunc(func(ctx context.Context, _, _ string) (net.Conn, error) { + srvConn, cliConn := net.Pipe() + + select { + case listener.conns <- srvConn: + // Give the client side to the scraper. + return cliConn, nil + case <-listener.closed: + return nil, net.ErrClosed + case <-ctx.Done(): + return nil, ctx.Err() + } + }), + } + + scrapeManager, err := NewManager( + opts, + promslog.New(&promslog.Config{}), + nil, nil, app, prometheus.NewRegistry(), + ) + require.NoError(t, err) + + cfg := &config.Config{ + GlobalConfig: config.GlobalConfig{ + ScrapeInterval: model.Duration(interval), + ScrapeTimeout: model.Duration(interval), + ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0}, + }, + ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, + } + cfgText, err := yaml.Marshal(*cfg) + require.NoError(t, err) + cfg = loadConfiguration(t, string(cfgText)) + require.NoError(t, scrapeManager.ApplyConfig(cfg)) + + scrapeManager.updateTsets(map[string][]*targetgroup.Group{ + "test": {{ + Targets: []model.LabelSet{{ + model.SchemeLabel: "http", + model.AddressLabel: "test.local", + }}, + }}, + }) + + scrapeManager.reload() + + return scrapeManager, app, cleanup +} diff --git a/scrape/manager.go b/scrape/manager.go index 24a63b056b..e632b015d7 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -126,8 +126,31 @@ type Options struct { // FeatureRegistry is the registry for tracking enabled/disabled features. FeatureRegistry features.Collector + // ScrapeOnShutdown enables a final scrape before the manager closes. This is useful + // for Prometheus in agent mode or OTel's prometheusreceiver when used in serverless + // job scenarios, allowing an extra scrape for the short-living edge cases. + // + // NOTE: This final scrape ignores the configured scrape interval. + ScrapeOnShutdown bool + + // InitialScrapeOffset applies an additional baseline delay before we begin + // scraping targets. By default, Prometheus calculates a specific offset for + // each target to spread the scraping load evenly across the server. Configuring + // this option adds a fixed duration to that target-specific offset. This allows + // tuning the initial startup delay without overriding the underlying target + // jitter, preserving proper load balancing across the scraper pools. + // + // Setting this offset (e.g., to 10s) is particularly useful in Prometheus + // agent mode and OTel's prometheusreceiver when used in serverless job + // scenarios. It helps avoid readiness races where targets might not be fully + // initialized immediately upon startup. It also prevents capturing + // intermediate state (such as applications crashing shortly after booting), + // and ensures backend rate limits don't drop valuable shutdown scrapes + // because of an early startup scrape. + InitialScrapeOffset time.Duration + // private option for testability. - skipOffsetting bool + skipJitterOffsetting bool } // Manager maintains a set of scrape pools and manages start/stop cycles @@ -316,8 +339,16 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error { m.scrapeFailureLoggers = scrapeFailureLoggers - if err := m.setOffsetSeed(cfg.GlobalConfig.ExternalLabels); err != nil { - return err + // Skip offset seed calculation during tests. + // setOffsetSeed relies on osutil.GetFQDN(), which triggers a DNS lookup using + // a global singleflight goroutine. This cross-boundary communication breaks + // synctest's isolation bubble and causes a fatal panic. + if m.opts.skipJitterOffsetting { + m.offsetSeed = 0 + } else { + if err := m.setOffsetSeed(cfg.GlobalConfig.ExternalLabels); err != nil { + return err + } } // Cleanup and reload pool if the configuration has changed. diff --git a/scrape/manager_test.go b/scrape/manager_test.go index 395cc98a82..3dc05db011 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -53,6 +53,7 @@ import ( "github.com/prometheus/prometheus/util/runutil" "github.com/prometheus/prometheus/util/teststorage" "github.com/prometheus/prometheus/util/testutil" + "github.com/prometheus/prometheus/util/testutil/synctest" ) func TestPopulateLabels(t *testing.T) { @@ -767,7 +768,7 @@ func TestManagerSTZeroIngestion(t *testing.T) { app := teststorage.NewAppendable() discoveryManager, scrapeManager := runManagers(t, ctx, &Options{ EnableStartTimestampZeroIngestion: testSTZeroIngest, - skipOffsetting: true, + skipJitterOffsetting: true, }, app, nil) defer scrapeManager.Stop() @@ -953,7 +954,7 @@ func TestManagerSTZeroIngestionHistogram(t *testing.T) { app := teststorage.NewAppendable() discoveryManager, scrapeManager := runManagers(t, ctx, &Options{ EnableStartTimestampZeroIngestion: tc.enableSTZeroIngestion, - skipOffsetting: true, + skipJitterOffsetting: true, }, app, nil) defer scrapeManager.Stop() @@ -1065,7 +1066,7 @@ func TestNHCBAndSTZeroIngestion(t *testing.T) { app := teststorage.NewAppendable() discoveryManager, scrapeManager := runManagers(t, ctx, &Options{ EnableStartTimestampZeroIngestion: true, - skipOffsetting: true, + skipJitterOffsetting: true, }, app, nil) defer scrapeManager.Stop() @@ -1584,7 +1585,7 @@ scrape_configs: // Disable end of run staleness markers for some targets. m.DisableEndOfRunStalenessMarkers("one", targetsToDisable) - // This should be a no-op + // This should be a no-op. m.DisableEndOfRunStalenessMarkers("non-existent-job", targetsToDisable) // Check that the end of run staleness markers are disabled for the correct targets. @@ -1596,3 +1597,117 @@ scrape_configs: } } } + +func TestManager_InitialScrapeOffset(t *testing.T) { + interval := 10 * time.Second + + for _, tcase := range []struct { + name string + initialScrapeOffset time.Duration + runDuration time.Duration + expectedSamples int + }{ + { + name: "zero offset scrapes immediately", + expectedSamples: 1, + }, + { + name: "zero offset scrapes twice after one interval", + runDuration: interval, + expectedSamples: 2, + }, + { + name: "large offset prevents immediate scrape", + initialScrapeOffset: 1 * time.Hour, + runDuration: 59 * time.Minute, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + opts := &Options{InitialScrapeOffset: tcase.initialScrapeOffset} + scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, interval) + defer cleanupConns() + + // Wait for the scrape manager to block on its timers. + synctest.Wait() + + // Fast-forward the fake clock by the test case's run duration. + time.Sleep(tcase.runDuration) + synctest.Wait() + + // Stop the manager to clean up background goroutines. + scrapeManager.Stop() + + require.Len(t, findSamplesForMetric(app.ResultSamples(), "expected_metric"), tcase.expectedSamples) + }) + }) + } +} + +func TestManager_ScrapeOnShutdown(t *testing.T) { + interval := 10 * time.Second + + for _, tcase := range []struct { + name string + scrapeOnShutdown bool + initialScrapeOffset time.Duration + runDuration time.Duration + expectedSamplesTotal int + }{ + { + name: "no scrape on shutdown", + scrapeOnShutdown: false, + expectedSamplesTotal: 1, + }, + { + name: "scrape on shutdown", + scrapeOnShutdown: true, + expectedSamplesTotal: 2, + }, + { + name: "scrape on shutdown after some scrapes", + scrapeOnShutdown: true, + runDuration: interval, + expectedSamplesTotal: 3, + }, + { + name: "scrape on shutdown with initial offset", + scrapeOnShutdown: true, + initialScrapeOffset: 10 * time.Second, + runDuration: 5 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "scrape on shutdown with short running instance (offset 5s)", + scrapeOnShutdown: true, + initialScrapeOffset: 5 * time.Second, + runDuration: 8 * time.Second, + expectedSamplesTotal: 2, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + opts := &Options{ + ScrapeOnShutdown: tcase.scrapeOnShutdown, + InitialScrapeOffset: tcase.initialScrapeOffset, + } + scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, interval) + defer cleanupConns() + + // Wait for the initial scrape to happen exactly at t=0. + synctest.Wait() + + // Fast-forward fake time to simulate scheduled scrapes before shutdown. + if tcase.runDuration > 0 { + time.Sleep(tcase.runDuration) + synctest.Wait() + } + + // Stop the manager. This triggers the ScrapeOnShutdown logic synchronously. + scrapeManager.Stop() + + require.Len(t, findSamplesForMetric(app.ResultSamples(), "expected_metric"), tcase.expectedSamplesTotal) + }) + }) + } +} diff --git a/scrape/scrape.go b/scrape/scrape.go index abddd06603..55d0eaf70b 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -238,7 +238,6 @@ func (sp *scrapePool) SetScrapeFailureLogger(l FailureLogger) { func (sp *scrapePool) stop() { sp.mtx.Lock() defer sp.mtx.Unlock() - sp.cancel() var wg sync.WaitGroup sp.targetMtx.Lock() @@ -258,6 +257,10 @@ func (sp *scrapePool) stop() { sp.targetMtx.Unlock() wg.Wait() + // Cancel the context after all loops have stopped. This is required for + // scrapeOnShutdown to work properly, as the shutdown scrape uses this + // context (via sl.parentCtx) and would fail if the context was cancelled early. + sp.cancel() sp.client.CloseIdleConnections() if sp.config != nil { @@ -820,11 +823,17 @@ type cacheEntry struct { } type scrapeLoop struct { - // Parameters. - ctx context.Context - cancel func() - stopped chan struct{} - parentCtx context.Context + // ctx represents a local context that is cancellable via s.cancel. + // It's meant to synchronize run() with stop(). + // It inherits parentCtx. + ctx context.Context + cancel func() + stopped chan struct{} + // parentCtx represents manager-level context, typically connected + // to process shutdown. + parentCtx context.Context + // appenderCtx is a parentCtx with some extra context for appender + // implementations. Potentially remove-able with removal of AppenderV1. appenderCtx context.Context l *slog.Logger cache *scrapeCache @@ -865,8 +874,9 @@ type scrapeLoop struct { reportExtraMetrics bool appendMetadataToWAL bool passMetadataInContext bool - skipOffsetting bool // For testability. - + skipJitterOffsetting bool // For testability. + scrapeOnShutdown bool + initialScrapeOffset time.Duration // error injection through setForcedError. forcedErr error forcedErrMtx sync.Mutex @@ -1218,7 +1228,9 @@ func newScrapeLoop(opts scrapeLoopOptions) *scrapeLoop { enableTypeAndUnitLabels: opts.sp.options.EnableTypeAndUnitLabels, appendMetadataToWAL: opts.sp.options.AppendMetadata, passMetadataInContext: opts.sp.options.PassMetadataInContext, - skipOffsetting: opts.sp.options.skipOffsetting, + skipJitterOffsetting: opts.sp.options.skipJitterOffsetting, + scrapeOnShutdown: opts.sp.options.ScrapeOnShutdown, + initialScrapeOffset: opts.sp.options.InitialScrapeOffset, } } @@ -1231,31 +1243,49 @@ func (sl *scrapeLoop) setScrapeFailureLogger(l FailureLogger) { sl.scrapeFailureLogger = l } +func (sl *scrapeLoop) getScrapeOffset() time.Duration { + offset := sl.scraper.offset(sl.interval, sl.offsetSeed) + if sl.skipJitterOffsetting { + offset = time.Duration(0) + } + return sl.initialScrapeOffset + offset +} + func (sl *scrapeLoop) run(errc chan<- error) { - if !sl.skipOffsetting { + var ( + last time.Time + alignedScrapeTime = time.Now().Round(0) + ticker = time.NewTicker(sl.interval) + ) + defer func() { + if sl.scrapeOnShutdown { + last = sl.scrapeAndReport(last, time.Now().Round(0), errc) + } + // Let the stop() know it can continue. + close(sl.stopped) + if sl.parentCtx.Err() == nil { + if !sl.disabledEndOfRunStalenessMarkers.Load() { + sl.endOfRunStaleness(last, ticker, sl.interval) + } + } + ticker.Stop() + }() + + // Initial offset and jitter offset, if any. + offset := sl.getScrapeOffset() + if offset > 0 { select { - case <-time.After(sl.scraper.offset(sl.interval, sl.offsetSeed)): + case <-time.After(offset): // Continue after a scraping offset. case <-sl.ctx.Done(): - close(sl.stopped) return } } - var last time.Time - - alignedScrapeTime := time.Now().Round(0) - ticker := time.NewTicker(sl.interval) - defer ticker.Stop() - -mainLoop: for { select { - case <-sl.parentCtx.Done(): - close(sl.stopped) - return case <-sl.ctx.Done(): - break mainLoop + return default: } @@ -1282,20 +1312,11 @@ mainLoop: last = sl.scrapeAndReport(last, scrapeTime, errc) select { - case <-sl.parentCtx.Done(): - close(sl.stopped) - return case <-sl.ctx.Done(): - break mainLoop + return case <-ticker.C: } } - - close(sl.stopped) - - if !sl.disabledEndOfRunStalenessMarkers.Load() { - sl.endOfRunStaleness(last, ticker, sl.interval) - } } func (sl *scrapeLoop) appender() scrapeLoopAppendAdapter { diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 7b0cd022dd..63547869be 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -6653,7 +6653,7 @@ func testNewScrapeLoopHonorLabelsWiring(t *testing.T, appV2 bool) { } sa := selectAppendable(s, appV2) - sp, err := newScrapePool(cfg, sa.V1(), sa.V2(), 0, nil, nil, &Options{skipOffsetting: true}, newTestScrapeMetrics(t)) + sp, err := newScrapePool(cfg, sa.V1(), sa.V2(), 0, nil, nil, &Options{skipJitterOffsetting: true}, newTestScrapeMetrics(t)) require.NoError(t, err) defer sp.stop()