diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index ac7964ee51..1db229561d 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -99,8 +99,6 @@ func newTestScrapeLoop(t testing.TB, opts ...func(sl *scrapeLoop)) (_ *scrapeLoo validationScheme: model.UTF8Validation, symbolTable: labels.NewSymbolTable(), appendMetadataToWAL: true, // Tests assumes it's enabled, unless explicitly turned off. - initialScrapeOffset: time.Duration(0), - scrapeOnShutdown: false, } for _, o := range opts { o(sl) diff --git a/scrape/manager.go b/scrape/manager.go index c6ed865683..e632b015d7 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -126,10 +126,7 @@ type Options struct { // FeatureRegistry is the registry for tracking enabled/disabled features. FeatureRegistry features.Collector - // private option for testability. - skipOffsetting bool - - // Option to allow a final scrape before the manager closes. This is useful + // ScrapeOnShutdown enables a final scrape before the manager closes. This is useful // for Prometheus in agent mode or OTel's prometheusreceiver when used in serverless // job scenarios, allowing an extra scrape for the short-living edge cases. // @@ -151,6 +148,9 @@ type Options struct { // and ensures backend rate limits don't drop valuable shutdown scrapes // because of an early startup scrape. InitialScrapeOffset time.Duration + + // private option for testability. + skipJitterOffsetting bool } // Manager maintains a set of scrape pools and manages start/stop cycles @@ -343,7 +343,7 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error { // setOffsetSeed relies on osutil.GetFQDN(), which triggers a DNS lookup using // a global singleflight goroutine. This cross-boundary communication breaks // synctest's isolation bubble and causes a fatal panic. - if m.opts.skipOffsetting { + if m.opts.skipJitterOffsetting { m.offsetSeed = 0 } else { if err := m.setOffsetSeed(cfg.GlobalConfig.ExternalLabels); err != nil { diff --git a/scrape/manager_test.go b/scrape/manager_test.go index 534bbeefa6..568f567c52 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -14,12 +14,10 @@ package scrape import ( - "bufio" "bytes" "context" "errors" "fmt" - "io" "maps" "net" "net/http" @@ -40,7 +38,6 @@ import ( "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" "github.com/prometheus/common/promslog" - "github.com/prometheus/prometheus/util/testutil/synctest" "github.com/stretchr/testify/require" "go.yaml.in/yaml/v2" "google.golang.org/protobuf/types/known/timestamppb" @@ -58,6 +55,7 @@ import ( "github.com/prometheus/prometheus/util/runutil" "github.com/prometheus/prometheus/util/teststorage" "github.com/prometheus/prometheus/util/testutil" + "github.com/prometheus/prometheus/util/testutil/synctest" ) func TestPopulateLabels(t *testing.T) { @@ -772,7 +770,7 @@ func TestManagerSTZeroIngestion(t *testing.T) { app := teststorage.NewAppendable() discoveryManager, scrapeManager := runManagers(t, ctx, &Options{ EnableStartTimestampZeroIngestion: testSTZeroIngest, - skipOffsetting: true, + skipJitterOffsetting: true, }, app, nil) defer scrapeManager.Stop() @@ -958,7 +956,7 @@ func TestManagerSTZeroIngestionHistogram(t *testing.T) { app := teststorage.NewAppendable() discoveryManager, scrapeManager := runManagers(t, ctx, &Options{ EnableStartTimestampZeroIngestion: tc.enableSTZeroIngestion, - skipOffsetting: true, + skipJitterOffsetting: true, }, app, nil) defer scrapeManager.Stop() @@ -1070,7 +1068,7 @@ func TestNHCBAndSTZeroIngestion(t *testing.T) { app := teststorage.NewAppendable() discoveryManager, scrapeManager := runManagers(t, ctx, &Options{ EnableStartTimestampZeroIngestion: true, - skipOffsetting: true, + skipJitterOffsetting: true, }, app, nil) defer scrapeManager.Stop() @@ -1602,28 +1600,98 @@ scrape_configs: } } +// pipeListener is an in-memory net.Listener that connects a custom DialContext +// directly to the httptest Server without opening real OS ports. +type pipeListener struct { + conns chan net.Conn + closed chan struct{} + once sync.Once +} + +func newPipeListener() *pipeListener { + return &pipeListener{ + conns: make(chan net.Conn), + closed: make(chan struct{}), + } +} + +func (l *pipeListener) Accept() (net.Conn, error) { + select { + case c := <-l.conns: + return c, nil + case <-l.closed: + return nil, net.ErrClosed + } +} + +func (l *pipeListener) Close() error { + l.once.Do(func() { close(l.closed) }) + return nil +} + +// Dummy Addr implementation to satisfy the net.Listener interface. +type pipeAddr struct{} + +func (pipeAddr) Network() string { return "pipe" } +func (pipeAddr) String() string { return "pipe" } +func (*pipeListener) Addr() net.Addr { return pipeAddr{} } + +// startFakeHTTPServer spins up a httptest.Server bound to an in-memory +// pipeListener. It returns the listener (to be wired to a custom dialer) and a +// cleanup function to shut down the server. +func startFakeHTTPServer(t *testing.T) (*pipeListener, func()) { + t.Helper() + + listener := newPipeListener() + + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Abort if the request context is canceled (e.g., due to a scrape timeout). + select { + case <-r.Context().Done(): + return + default: + w.Header().Set("Content-Type", "text/plain; version=0.0.4") + fmt.Fprintln(w, "expected_metric 1") + } + }) + + srv := httptest.NewUnstartedServer(handler) + srv.Listener = listener + + // Background goroutines inherit the synctest bubble safely + srv.Start() + + return listener, srv.Close +} + // setupSynctestManager abstracts the boilerplate of creating a mock network, // starting the fake HTTP server, and configuring the scrape manager for synctest. func setupSynctestManager(t *testing.T, opts *Options, interval time.Duration) (*Manager, *teststorage.Appendable, func()) { t.Helper() app := teststorage.NewAppendable() - srvConn, cliConn := net.Pipe() - - cleanup := func() { - srvConn.Close() - cliConn.Close() - } - - go startFakeHTTPServer(t, srvConn) + listener, cleanup := startFakeHTTPServer(t) if opts == nil { opts = &Options{} } - opts.skipOffsetting = true // Eliminates random jitter, making timing exact + opts.skipJitterOffsetting = true + + // Ensure the scraper creates a new net.Pipe on every dial attempt + // and hands the server-side connection to the mock server's listener. opts.HTTPClientOptions = []config_util.HTTPClientOption{ - config_util.WithDialContextFunc(func(ctx context.Context, network, addr string) (net.Conn, error) { - return cliConn, nil + config_util.WithDialContextFunc(func(ctx context.Context, _, _ string) (net.Conn, error) { + srvConn, cliConn := net.Pipe() + + select { + case listener.conns <- srvConn: + // Give the client side to the scraper + return cliConn, nil + case <-listener.closed: + return nil, net.ErrClosed + case <-ctx.Done(): + return nil, ctx.Err() + } }), } @@ -1661,47 +1729,6 @@ func setupSynctestManager(t *testing.T, opts *Options, interval time.Duration) ( return scrapeManager, app, cleanup } -// Helper function to act as a fake HTTP server over a net.Conn -func startFakeHTTPServer(t *testing.T, conn net.Conn) { - t.Helper() - reader := bufio.NewReader(conn) - for { - req, err := http.ReadRequest(reader) - if err != nil { - // net.Pipe returns io.ErrClosedPipe when closed during test teardown. - if errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) { - return - } - t.Errorf("fake HTTP server failed to read request: %v", err) - return - } - - _, err = io.Copy(io.Discard, req.Body) - req.Body.Close() - if err != nil { - t.Errorf("fake HTTP server failed to read request body: %v", err) - return - } - - body := "expected_metric 1\n" - - response := fmt.Sprintf("HTTP/1.1 200 OK\r\n"+ - "Content-Type: text/plain; version=0.0.4\r\n"+ - "Content-Length: %d\r\n"+ - "\r\n"+ - "%s", len(body), body) - - _, err = conn.Write([]byte(response)) - if err != nil { - if errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) { - return - } - t.Errorf("fake HTTP server failed to write response: %v", err) - return - } - } -} - func TestManager_InitialScrapeOffset(t *testing.T) { interval := 10 * time.Second diff --git a/scrape/scrape.go b/scrape/scrape.go index b2af698043..c73d26a262 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -878,7 +878,7 @@ type scrapeLoop struct { reportExtraMetrics bool appendMetadataToWAL bool passMetadataInContext bool - skipOffsetting bool // For testability. + skipJitterOffsetting bool // For testability. scrapeOnShutdown bool initialScrapeOffset time.Duration // error injection through setForcedError. @@ -1233,7 +1233,7 @@ func newScrapeLoop(opts scrapeLoopOptions) *scrapeLoop { enableTypeAndUnitLabels: opts.sp.options.EnableTypeAndUnitLabels, appendMetadataToWAL: opts.sp.options.AppendMetadata, passMetadataInContext: opts.sp.options.PassMetadataInContext, - skipOffsetting: opts.sp.options.skipOffsetting, + skipJitterOffsetting: opts.sp.options.skipJitterOffsetting, scrapeOnShutdown: opts.sp.options.ScrapeOnShutdown, initialScrapeOffset: opts.sp.options.InitialScrapeOffset, } @@ -1248,9 +1248,9 @@ func (sl *scrapeLoop) setScrapeFailureLogger(l FailureLogger) { sl.scrapeFailureLogger = l } -func getScrapeOffset(sl *scrapeLoop) time.Duration { +func (sl *scrapeLoop) getScrapeOffset() time.Duration { offset := sl.scraper.offset(sl.interval, sl.offsetSeed) - if sl.skipOffsetting { + if sl.skipJitterOffsetting { offset = time.Duration(0) } return sl.initialScrapeOffset + offset @@ -1258,7 +1258,7 @@ func getScrapeOffset(sl *scrapeLoop) time.Duration { func (sl *scrapeLoop) run(errc chan<- error) { select { - case <-time.After(getScrapeOffset(sl)): + case <-time.After(sl.getScrapeOffset()): // Continue after a scraping offset. case <-sl.shutdownScrape: sl.cancel() @@ -1533,7 +1533,12 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int // returned. Cancel the context to stop all writes. func (sl *scrapeLoop) stop() { if sl.scrapeOnShutdown { - sl.shutdownScrape <- struct{}{} + select { + case sl.shutdownScrape <- struct{}{}: + case <-sl.stopped: + // Prevents deadlock: shutdownScrape is unbuffered. If the scrape loop + // has already exited, a direct send will block forever. + } } else { sl.cancel() } diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index cab2b2918a..72ef0f8f4a 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -6652,7 +6652,7 @@ func testNewScrapeLoopHonorLabelsWiring(t *testing.T, appV2 bool) { } sa := selectAppendable(s, appV2) - sp, err := newScrapePool(cfg, sa.V1(), sa.V2(), 0, nil, nil, &Options{skipOffsetting: true}, newTestScrapeMetrics(t)) + sp, err := newScrapePool(cfg, sa.V1(), sa.V2(), 0, nil, nil, &Options{skipJitterOffsetting: true}, newTestScrapeMetrics(t)) require.NoError(t, err) defer sp.stop()