test(scrape): use httptest for mock server to respect context cancellation

- Replaced manual HTTP string formatting over `net.Pipe` with `httptest.NewUnstartedServer`.
- Implemented an in-memory `pipeListener` to allow the server to handle `net.Pipe` connections directly. This preserves `synctest` time isolation without opening real OS ports.
- Added explicit `r.Context().Done()` handling in the mock HTTP handler to properly simulate aborted requests and scrape timeouts.
- Validates that the request context remains active and is not prematurely cancelled during `ScrapeOnShutdown` scenarios.
- Renamed `skipOffsetting` to `skipJitterOffsetting`.
- Addressed other PR comments.

Signed-off-by: avilevy <avilevy@google.com>
This commit is contained in:
avilevy 2026-03-10 17:45:13 +00:00
parent 6828a6bdda
commit ce22adc8b6
No known key found for this signature in database
5 changed files with 102 additions and 72 deletions

View file

@ -99,8 +99,6 @@ func newTestScrapeLoop(t testing.TB, opts ...func(sl *scrapeLoop)) (_ *scrapeLoo
validationScheme: model.UTF8Validation,
symbolTable: labels.NewSymbolTable(),
appendMetadataToWAL: true, // Tests assumes it's enabled, unless explicitly turned off.
initialScrapeOffset: time.Duration(0),
scrapeOnShutdown: false,
}
for _, o := range opts {
o(sl)

View file

@ -126,10 +126,7 @@ type Options struct {
// FeatureRegistry is the registry for tracking enabled/disabled features.
FeatureRegistry features.Collector
// private option for testability.
skipOffsetting bool
// Option to allow a final scrape before the manager closes. This is useful
// ScrapeOnShutdown enables a final scrape before the manager closes. This is useful
// for Prometheus in agent mode or OTel's prometheusreceiver when used in serverless
// job scenarios, allowing an extra scrape for the short-living edge cases.
//
@ -151,6 +148,9 @@ type Options struct {
// and ensures backend rate limits don't drop valuable shutdown scrapes
// because of an early startup scrape.
InitialScrapeOffset time.Duration
// private option for testability.
skipJitterOffsetting bool
}
// Manager maintains a set of scrape pools and manages start/stop cycles
@ -343,7 +343,7 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error {
// setOffsetSeed relies on osutil.GetFQDN(), which triggers a DNS lookup using
// a global singleflight goroutine. This cross-boundary communication breaks
// synctest's isolation bubble and causes a fatal panic.
if m.opts.skipOffsetting {
if m.opts.skipJitterOffsetting {
m.offsetSeed = 0
} else {
if err := m.setOffsetSeed(cfg.GlobalConfig.ExternalLabels); err != nil {

View file

@ -14,12 +14,10 @@
package scrape
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"maps"
"net"
"net/http"
@ -40,7 +38,6 @@ import (
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
"github.com/prometheus/prometheus/util/testutil/synctest"
"github.com/stretchr/testify/require"
"go.yaml.in/yaml/v2"
"google.golang.org/protobuf/types/known/timestamppb"
@ -58,6 +55,7 @@ import (
"github.com/prometheus/prometheus/util/runutil"
"github.com/prometheus/prometheus/util/teststorage"
"github.com/prometheus/prometheus/util/testutil"
"github.com/prometheus/prometheus/util/testutil/synctest"
)
func TestPopulateLabels(t *testing.T) {
@ -772,7 +770,7 @@ func TestManagerSTZeroIngestion(t *testing.T) {
app := teststorage.NewAppendable()
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
EnableStartTimestampZeroIngestion: testSTZeroIngest,
skipOffsetting: true,
skipJitterOffsetting: true,
}, app, nil)
defer scrapeManager.Stop()
@ -958,7 +956,7 @@ func TestManagerSTZeroIngestionHistogram(t *testing.T) {
app := teststorage.NewAppendable()
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
EnableStartTimestampZeroIngestion: tc.enableSTZeroIngestion,
skipOffsetting: true,
skipJitterOffsetting: true,
}, app, nil)
defer scrapeManager.Stop()
@ -1070,7 +1068,7 @@ func TestNHCBAndSTZeroIngestion(t *testing.T) {
app := teststorage.NewAppendable()
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
EnableStartTimestampZeroIngestion: true,
skipOffsetting: true,
skipJitterOffsetting: true,
}, app, nil)
defer scrapeManager.Stop()
@ -1602,28 +1600,98 @@ scrape_configs:
}
}
// pipeListener is an in-memory net.Listener that connects a custom DialContext
// directly to the httptest Server without opening real OS ports.
type pipeListener struct {
conns chan net.Conn
closed chan struct{}
once sync.Once
}
func newPipeListener() *pipeListener {
return &pipeListener{
conns: make(chan net.Conn),
closed: make(chan struct{}),
}
}
func (l *pipeListener) Accept() (net.Conn, error) {
select {
case c := <-l.conns:
return c, nil
case <-l.closed:
return nil, net.ErrClosed
}
}
func (l *pipeListener) Close() error {
l.once.Do(func() { close(l.closed) })
return nil
}
// Dummy Addr implementation to satisfy the net.Listener interface.
type pipeAddr struct{}
func (pipeAddr) Network() string { return "pipe" }
func (pipeAddr) String() string { return "pipe" }
func (*pipeListener) Addr() net.Addr { return pipeAddr{} }
// startFakeHTTPServer spins up a httptest.Server bound to an in-memory
// pipeListener. It returns the listener (to be wired to a custom dialer) and a
// cleanup function to shut down the server.
func startFakeHTTPServer(t *testing.T) (*pipeListener, func()) {
t.Helper()
listener := newPipeListener()
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Abort if the request context is canceled (e.g., due to a scrape timeout).
select {
case <-r.Context().Done():
return
default:
w.Header().Set("Content-Type", "text/plain; version=0.0.4")
fmt.Fprintln(w, "expected_metric 1")
}
})
srv := httptest.NewUnstartedServer(handler)
srv.Listener = listener
// Background goroutines inherit the synctest bubble safely
srv.Start()
return listener, srv.Close
}
// setupSynctestManager abstracts the boilerplate of creating a mock network,
// starting the fake HTTP server, and configuring the scrape manager for synctest.
func setupSynctestManager(t *testing.T, opts *Options, interval time.Duration) (*Manager, *teststorage.Appendable, func()) {
t.Helper()
app := teststorage.NewAppendable()
srvConn, cliConn := net.Pipe()
cleanup := func() {
srvConn.Close()
cliConn.Close()
}
go startFakeHTTPServer(t, srvConn)
listener, cleanup := startFakeHTTPServer(t)
if opts == nil {
opts = &Options{}
}
opts.skipOffsetting = true // Eliminates random jitter, making timing exact
opts.skipJitterOffsetting = true
// Ensure the scraper creates a new net.Pipe on every dial attempt
// and hands the server-side connection to the mock server's listener.
opts.HTTPClientOptions = []config_util.HTTPClientOption{
config_util.WithDialContextFunc(func(ctx context.Context, network, addr string) (net.Conn, error) {
return cliConn, nil
config_util.WithDialContextFunc(func(ctx context.Context, _, _ string) (net.Conn, error) {
srvConn, cliConn := net.Pipe()
select {
case listener.conns <- srvConn:
// Give the client side to the scraper
return cliConn, nil
case <-listener.closed:
return nil, net.ErrClosed
case <-ctx.Done():
return nil, ctx.Err()
}
}),
}
@ -1661,47 +1729,6 @@ func setupSynctestManager(t *testing.T, opts *Options, interval time.Duration) (
return scrapeManager, app, cleanup
}
// Helper function to act as a fake HTTP server over a net.Conn
func startFakeHTTPServer(t *testing.T, conn net.Conn) {
t.Helper()
reader := bufio.NewReader(conn)
for {
req, err := http.ReadRequest(reader)
if err != nil {
// net.Pipe returns io.ErrClosedPipe when closed during test teardown.
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) {
return
}
t.Errorf("fake HTTP server failed to read request: %v", err)
return
}
_, err = io.Copy(io.Discard, req.Body)
req.Body.Close()
if err != nil {
t.Errorf("fake HTTP server failed to read request body: %v", err)
return
}
body := "expected_metric 1\n"
response := fmt.Sprintf("HTTP/1.1 200 OK\r\n"+
"Content-Type: text/plain; version=0.0.4\r\n"+
"Content-Length: %d\r\n"+
"\r\n"+
"%s", len(body), body)
_, err = conn.Write([]byte(response))
if err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) {
return
}
t.Errorf("fake HTTP server failed to write response: %v", err)
return
}
}
}
func TestManager_InitialScrapeOffset(t *testing.T) {
interval := 10 * time.Second

View file

@ -878,7 +878,7 @@ type scrapeLoop struct {
reportExtraMetrics bool
appendMetadataToWAL bool
passMetadataInContext bool
skipOffsetting bool // For testability.
skipJitterOffsetting bool // For testability.
scrapeOnShutdown bool
initialScrapeOffset time.Duration
// error injection through setForcedError.
@ -1233,7 +1233,7 @@ func newScrapeLoop(opts scrapeLoopOptions) *scrapeLoop {
enableTypeAndUnitLabels: opts.sp.options.EnableTypeAndUnitLabels,
appendMetadataToWAL: opts.sp.options.AppendMetadata,
passMetadataInContext: opts.sp.options.PassMetadataInContext,
skipOffsetting: opts.sp.options.skipOffsetting,
skipJitterOffsetting: opts.sp.options.skipJitterOffsetting,
scrapeOnShutdown: opts.sp.options.ScrapeOnShutdown,
initialScrapeOffset: opts.sp.options.InitialScrapeOffset,
}
@ -1248,9 +1248,9 @@ func (sl *scrapeLoop) setScrapeFailureLogger(l FailureLogger) {
sl.scrapeFailureLogger = l
}
func getScrapeOffset(sl *scrapeLoop) time.Duration {
func (sl *scrapeLoop) getScrapeOffset() time.Duration {
offset := sl.scraper.offset(sl.interval, sl.offsetSeed)
if sl.skipOffsetting {
if sl.skipJitterOffsetting {
offset = time.Duration(0)
}
return sl.initialScrapeOffset + offset
@ -1258,7 +1258,7 @@ func getScrapeOffset(sl *scrapeLoop) time.Duration {
func (sl *scrapeLoop) run(errc chan<- error) {
select {
case <-time.After(getScrapeOffset(sl)):
case <-time.After(sl.getScrapeOffset()):
// Continue after a scraping offset.
case <-sl.shutdownScrape:
sl.cancel()
@ -1533,7 +1533,12 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int
// returned. Cancel the context to stop all writes.
func (sl *scrapeLoop) stop() {
if sl.scrapeOnShutdown {
sl.shutdownScrape <- struct{}{}
select {
case sl.shutdownScrape <- struct{}{}:
case <-sl.stopped:
// Prevents deadlock: shutdownScrape is unbuffered. If the scrape loop
// has already exited, a direct send will block forever.
}
} else {
sl.cancel()
}

View file

@ -6652,7 +6652,7 @@ func testNewScrapeLoopHonorLabelsWiring(t *testing.T, appV2 bool) {
}
sa := selectAppendable(s, appV2)
sp, err := newScrapePool(cfg, sa.V1(), sa.V2(), 0, nil, nil, &Options{skipOffsetting: true}, newTestScrapeMetrics(t))
sp, err := newScrapePool(cfg, sa.V1(), sa.V2(), 0, nil, nil, &Options{skipJitterOffsetting: true}, newTestScrapeMetrics(t))
require.NoError(t, err)
defer sp.stop()