From 2aebd269bc16c051dfb72d532db7c4e7ff76a493 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Wed, 4 Mar 2026 14:51:26 +0100 Subject: [PATCH 1/4] UI: Fix tooltip Y-offset drift for multiple graph panels getBoundingClientRect() was cached in the setSize hook, which only fires on chart creation/resize. The cached viewport-relative coordinates became stale after scrolling, causing the tooltip to appear increasingly offset on charts further down the page. Fixed by calling getBoundingClientRect() on every setCursor invocation to always get accurate viewport-relative coordinates. Signed-off-by: Julius Volz --- .../src/pages/query/uPlotChartHelpers.ts | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/web/ui/mantine-ui/src/pages/query/uPlotChartHelpers.ts b/web/ui/mantine-ui/src/pages/query/uPlotChartHelpers.ts index ba6cdbae41..816ddf7578 100644 --- a/web/ui/mantine-ui/src/pages/query/uPlotChartHelpers.ts +++ b/web/ui/mantine-ui/src/pages/query/uPlotChartHelpers.ts @@ -83,8 +83,6 @@ const formatLabels = (labels: { [key: string]: string }): string => ` const tooltipPlugin = (useLocalTime: boolean, data: AlignedData) => { let over: HTMLDivElement; - let boundingLeft: number; - let boundingTop: number; let selectedSeriesIdx: number | null = null; const overlay = document.createElement("div"); @@ -111,12 +109,6 @@ const tooltipPlugin = (useLocalTime: boolean, data: AlignedData) => { destroy: () => { overlay.remove(); }, - // When the chart is resized, store the bounding box of the overlay. - setSize: () => { - const bbox = over.getBoundingClientRect(); - boundingLeft = bbox.left; - boundingTop = bbox.top; - }, // When a series is selected by hovering close to it, store the // index of the selected series, so we can update the hover tooltip // in setCursor. @@ -150,8 +142,12 @@ const tooltipPlugin = (useLocalTime: boolean, data: AlignedData) => { } const color = series.stroke(u, selectedSeriesIdx); - const x = left + boundingLeft; - const y = top + boundingTop; + // Get the bounding rect fresh on every cursor move to account for + // page scrolling, which would otherwise cause a growing Y offset + // for charts further down the page. + const bbox = over.getBoundingClientRect(); + const x = left + bbox.left; + const y = top + bbox.top; overlay.innerHTML = `
${formatTimestamp(ts, useLocalTime)}
From 6b5c0b327a5ec706029dfebb7bd7bbd6d1611ce6 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Mon, 16 Mar 2026 13:08:47 +0100 Subject: [PATCH 2/4] tsdb: mmap histogram chunks during WAL replay (#18306) * tsdb: mmap histogram chunks during WAL replay The float sample path in processWALSamples calls mmapChunks when a new chunk is created during WAL replay, but the histogram path was missing this call. Without it, histogram head chunks accumulate as a linked list in memory rather than being mmapped, causing unnecessary memory growth during long WAL replays. --------- Signed-off-by: Arve Knudsen --- tsdb/head_test.go | 87 +++++++++++++++++++++++++++++++++++++++++++++++ tsdb/head_wal.go | 1 + 2 files changed, 88 insertions(+) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 132d01c764..56f3b70f5e 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -5654,6 +5654,93 @@ func testOOOMmapReplay(t *testing.T, scenario sampleTypeScenario) { require.NoError(t, h.Close()) } +// TestWALReplayMmapsChunks is a regression test ensuring that +// chunks are mmapped during WAL replay, not accumulated in the +// in-memory linked list. +func TestWALReplayMmapsChunks(t *testing.T) { + for _, tc := range []struct { + name string + append func(app storage.Appender, l labels.Labels, ts int64) error + }{ + { + name: "floats", + append: func(app storage.Appender, l labels.Labels, ts int64) error { + _, err := app.Append(0, l, ts, float64(ts)) + return err + }, + }, + { + name: "histograms", + append: func(app storage.Appender, l labels.Labels, ts int64) error { + _, err := app.AppendHistogram(0, l, ts, tsdbutil.GenerateTestHistogram(ts), nil) + return err + }, + }, + { + name: "float histograms", + append: func(app storage.Appender, l labels.Labels, ts int64) error { + _, err := app.AppendHistogram(0, l, ts, nil, tsdbutil.GenerateTestFloatHistogram(ts)) + return err + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + dir := t.TempDir() + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy) + require.NoError(t, err) + + opts := DefaultHeadOptions() + opts.ChunkRange = 1000 + opts.ChunkDirRoot = dir + + h, err := NewHead(nil, nil, wal, nil, opts, nil) + require.NoError(t, err) + require.NoError(t, h.Init(0)) + + l := labels.FromStrings("foo", "bar") + + // Append 250 samples at 1-minute intervals. With ChunkRange=1000 and + // DefaultSamplesPerChunk=120, this creates multiple chunks that should + // be mmapped during WAL replay. + app := h.Appender(context.Background()) + for i := range 250 { + require.NoError(t, tc.append(app, l, int64(i)*time.Minute.Milliseconds())) + } + require.NoError(t, app.Commit()) + + require.NoError(t, h.Close()) + + // Remove mmapped chunk files so WAL replay must recreate all chunks. + require.NoError(t, os.RemoveAll(filepath.Join(dir, "chunks_head"))) + + // Reopen Head — WAL replay happens in Init. + wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy) + require.NoError(t, err) + h, err = NewHead(nil, nil, wal, nil, opts, nil) + require.NoError(t, err) + require.NoError(t, h.Init(0)) + + ms, ok, err := h.getOrCreate(l.Hash(), l, false) + require.NoError(t, err) + require.False(t, ok) + require.NotNil(t, ms) + + // Chunks must be mmapped during replay, not left in the head linked list. + require.NotEmpty(t, ms.mmappedChunks, "expected chunks to be mmapped during WAL replay") + require.Equal(t, 1, ms.headChunks.len(), "expected only one head chunk after replay") + + // Verify each mmapped chunk is readable from disk. + for _, m := range ms.mmappedChunks { + chk, err := h.chunkDiskMapper.Chunk(m.ref) + require.NoError(t, err) + require.Equal(t, int(m.numSamples), chk.NumSamples()) + } + + require.NoError(t, h.Close()) + }) + } +} + func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) { h, _ := newTestHead(t, 1000, compression.None, false) diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 0581b9306e..5a341b6ab4 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -721,6 +721,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp if chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() + _ = ms.mmapChunks(h.chunkDiskMapper) } if s.t > maxt { maxt = s.t From c669470f079bed927cc58ecbc38a4d8bd6ebef27 Mon Sep 17 00:00:00 2001 From: mihir-dixit2k27 <143348248+mihir-dixit2k27@users.noreply.github.com> Date: Mon, 16 Mar 2026 18:39:54 +0530 Subject: [PATCH 3/4] docs: add HTTP SD integrations cross-reference (#18278) * docs: add HTTP SD integrations cross-reference Signed-off-by: Mihir Dixit * docs: add HTTP SD integrations cross-reference to configuration.md Signed-off-by: Mihir Dixit --------- Signed-off-by: Mihir Dixit --- docs/configuration/configuration.md | 4 ++++ docs/http_sd.md | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 9f03c000ef..334c5da490 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -2314,6 +2314,10 @@ Each target has a meta label `__meta_url` during the [relabeling phase](#relabel_config). Its value is set to the URL from which the target was extracted. +There is a list of +[integrations](https://prometheus.io/docs/operating/integrations/#http-service-discovery) with this +discovery mechanism. + ```yaml # URL from which the targets are fetched. url: diff --git a/docs/http_sd.md b/docs/http_sd.md index d329ce07af..380c2da245 100644 --- a/docs/http_sd.md +++ b/docs/http_sd.md @@ -95,3 +95,7 @@ Examples: } ] ``` + +## HTTP SD integrations + +A list of existing HTTP SD integrations can be found on the [Integrations page](https://prometheus.io/docs/operating/integrations/#http-service-discovery) in the Prometheus documentation. From bdfb3fc2327b49392abb948334b38e35113dda84 Mon Sep 17 00:00:00 2001 From: avilevy18 <105948922+avilevy18@users.noreply.github.com> Date: Tue, 17 Mar 2026 06:02:11 -0400 Subject: [PATCH 4/4] scrape: add option to manager to allow scraping at shutdown; add initial offset option (#18067) * Adding scape on shutdown Signed-off-by: avilevy * scrape: replace skipOffsetting to make the test offset deterministic instead of skipping it entirely Signed-off-by: avilevy * renamed calculateScrapeOffset to getScrapeOffset Signed-off-by: avilevy * test(scrape): refactor time-based manager tests to use synctest Addresses PR feedback to remove flaky, time-based sleeping in the scrape manager tests. Add TestManager_InitialScrapeOffset and TestManager_ScrapeOnShutdown to use the testing/synctest package, completely eliminating real-world time.Sleep delays and making the assertions 100% deterministic. - Replaced httptest.Server with net.Pipe and a custom startFakeHTTPServer helper to ensure all network I/O remains durably blocked inside the synctest bubble. - Leveraged the skipOffsetting option to eliminate random scrape jitter, making the time-travel math exact and predictable. - Using skipOffsetting also safely bypasses the global singleflight DNS lookup in setOffsetSeed, which previously caused cross-bubble panics in synctest. - Extracted shared boilerplate into a setupSynctestManager helper to keep the test cases highly readable and data-driven. Signed-off-by: avilevy * Clarify use cases in InitialScrapeOffset comment Signed-off-by: avilevy * test(scrape): use httptest for mock server to respect context cancellation - Replaced manual HTTP string formatting over `net.Pipe` with `httptest.NewUnstartedServer`. - Implemented an in-memory `pipeListener` to allow the server to handle `net.Pipe` connections directly. This preserves `synctest` time isolation without opening real OS ports. - Added explicit `r.Context().Done()` handling in the mock HTTP handler to properly simulate aborted requests and scrape timeouts. - Validates that the request context remains active and is not prematurely cancelled during `ScrapeOnShutdown` scenarios. - Renamed `skipOffsetting` to `skipJitterOffsetting`. - Addressed other PR comments. Signed-off-by: avilevy * tmp Signed-off-by: bwplotka * exp2 Signed-off-by: bwplotka * fix Signed-off-by: bwplotka * scrape: fix scrapeOnShutdown context bug and refactor test helpers The scrapeOnShutdown feature was failing during manager shutdown because the scrape pool context was being cancelled before the final shutdown scrapes could execute. Fix this by delaying context cancellation in scrapePool.stop() until after all scrape loops have stopped. In addition: - Added test cases to verify scrapeOnShutdown works with InitialScrapeOffset. - Refactored network test helper functions from manager_test.go to helpers_test.go. - Addressed other comments. Signed-off-by: avilevy * Update scrape/scrape.go Co-authored-by: Bartlomiej Plotka Signed-off-by: avilevy18 <105948922+avilevy18@users.noreply.github.com> --------- Signed-off-by: avilevy Signed-off-by: bwplotka Signed-off-by: avilevy18 <105948922+avilevy18@users.noreply.github.com> Co-authored-by: bwplotka --- scrape/helpers_test.go | 136 +++++++++++++++++++++++++++++++++++++++++ scrape/manager.go | 37 ++++++++++- scrape/manager_test.go | 123 +++++++++++++++++++++++++++++++++++-- scrape/scrape.go | 87 ++++++++++++++++---------- scrape/scrape_test.go | 2 +- 5 files changed, 344 insertions(+), 41 deletions(-) diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index 1db229561d..45c89ad8d7 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -18,17 +18,24 @@ import ( "context" "encoding/binary" "fmt" + "net" "net/http" + "net/http/httptest" + "sync" "testing" "time" "github.com/gogo/protobuf/proto" + "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" + config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/common/promslog" "github.com/stretchr/testify/require" + "go.yaml.in/yaml/v2" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -233,3 +240,132 @@ func TestSelectAppendable(t *testing.T) { } }) } + +// pipeListener is an in-memory net.Listener that connects a custom DialContext +// directly to the httptest Server without opening real OS ports. +type pipeListener struct { + conns chan net.Conn + closed chan struct{} + once sync.Once +} + +func newPipeListener() *pipeListener { + return &pipeListener{ + conns: make(chan net.Conn), + closed: make(chan struct{}), + } +} + +func (l *pipeListener) Accept() (net.Conn, error) { + select { + case c := <-l.conns: + return c, nil + case <-l.closed: + return nil, net.ErrClosed + } +} + +func (l *pipeListener) Close() error { + l.once.Do(func() { close(l.closed) }) + return nil +} + +// Dummy Addr implementation to satisfy the net.Listener interface. +type pipeAddr struct{} + +func (pipeAddr) Network() string { return "pipe" } +func (pipeAddr) String() string { return "pipe" } +func (*pipeListener) Addr() net.Addr { return pipeAddr{} } + +// startFakeHTTPServer spins up a httptest.Server bound to an in-memory +// pipeListener. It returns the listener (to be wired to a custom dialer) and a +// cleanup function to shut down the server. +func startFakeHTTPServer(t *testing.T) (*pipeListener, func()) { + t.Helper() + + listener := newPipeListener() + + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Abort if the request context is canceled (e.g., due to a scrape timeout). + select { + case <-r.Context().Done(): + return + default: + w.Header().Set("Content-Type", "text/plain; version=0.0.4") + fmt.Fprintln(w, "expected_metric 1") + } + }) + + srv := httptest.NewUnstartedServer(handler) + srv.Listener = listener + + // Background goroutines inherit the synctest bubble safely. + srv.Start() + + return listener, srv.Close +} + +// setupSynctestManager abstracts the boilerplate of creating a mock network, +// starting the fake HTTP server, and configuring the scrape manager for synctest. +func setupSynctestManager(t *testing.T, opts *Options, interval time.Duration) (*Manager, *teststorage.Appendable, func()) { + t.Helper() + app := teststorage.NewAppendable() + + listener, cleanup := startFakeHTTPServer(t) + + if opts == nil { + opts = &Options{} + } + opts.skipJitterOffsetting = true + + // Ensure the scraper creates a new net.Pipe on every dial attempt + // and hands the server-side connection to the mock server's listener. + opts.HTTPClientOptions = []config_util.HTTPClientOption{ + config_util.WithDialContextFunc(func(ctx context.Context, _, _ string) (net.Conn, error) { + srvConn, cliConn := net.Pipe() + + select { + case listener.conns <- srvConn: + // Give the client side to the scraper. + return cliConn, nil + case <-listener.closed: + return nil, net.ErrClosed + case <-ctx.Done(): + return nil, ctx.Err() + } + }), + } + + scrapeManager, err := NewManager( + opts, + promslog.New(&promslog.Config{}), + nil, nil, app, prometheus.NewRegistry(), + ) + require.NoError(t, err) + + cfg := &config.Config{ + GlobalConfig: config.GlobalConfig{ + ScrapeInterval: model.Duration(interval), + ScrapeTimeout: model.Duration(interval), + ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0}, + }, + ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, + } + cfgText, err := yaml.Marshal(*cfg) + require.NoError(t, err) + cfg = loadConfiguration(t, string(cfgText)) + require.NoError(t, scrapeManager.ApplyConfig(cfg)) + + scrapeManager.updateTsets(map[string][]*targetgroup.Group{ + "test": {{ + Targets: []model.LabelSet{{ + model.SchemeLabel: "http", + model.AddressLabel: "test.local", + }}, + }}, + }) + + scrapeManager.reload() + + return scrapeManager, app, cleanup +} diff --git a/scrape/manager.go b/scrape/manager.go index 24a63b056b..e632b015d7 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -126,8 +126,31 @@ type Options struct { // FeatureRegistry is the registry for tracking enabled/disabled features. FeatureRegistry features.Collector + // ScrapeOnShutdown enables a final scrape before the manager closes. This is useful + // for Prometheus in agent mode or OTel's prometheusreceiver when used in serverless + // job scenarios, allowing an extra scrape for the short-living edge cases. + // + // NOTE: This final scrape ignores the configured scrape interval. + ScrapeOnShutdown bool + + // InitialScrapeOffset applies an additional baseline delay before we begin + // scraping targets. By default, Prometheus calculates a specific offset for + // each target to spread the scraping load evenly across the server. Configuring + // this option adds a fixed duration to that target-specific offset. This allows + // tuning the initial startup delay without overriding the underlying target + // jitter, preserving proper load balancing across the scraper pools. + // + // Setting this offset (e.g., to 10s) is particularly useful in Prometheus + // agent mode and OTel's prometheusreceiver when used in serverless job + // scenarios. It helps avoid readiness races where targets might not be fully + // initialized immediately upon startup. It also prevents capturing + // intermediate state (such as applications crashing shortly after booting), + // and ensures backend rate limits don't drop valuable shutdown scrapes + // because of an early startup scrape. + InitialScrapeOffset time.Duration + // private option for testability. - skipOffsetting bool + skipJitterOffsetting bool } // Manager maintains a set of scrape pools and manages start/stop cycles @@ -316,8 +339,16 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error { m.scrapeFailureLoggers = scrapeFailureLoggers - if err := m.setOffsetSeed(cfg.GlobalConfig.ExternalLabels); err != nil { - return err + // Skip offset seed calculation during tests. + // setOffsetSeed relies on osutil.GetFQDN(), which triggers a DNS lookup using + // a global singleflight goroutine. This cross-boundary communication breaks + // synctest's isolation bubble and causes a fatal panic. + if m.opts.skipJitterOffsetting { + m.offsetSeed = 0 + } else { + if err := m.setOffsetSeed(cfg.GlobalConfig.ExternalLabels); err != nil { + return err + } } // Cleanup and reload pool if the configuration has changed. diff --git a/scrape/manager_test.go b/scrape/manager_test.go index 395cc98a82..3dc05db011 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -53,6 +53,7 @@ import ( "github.com/prometheus/prometheus/util/runutil" "github.com/prometheus/prometheus/util/teststorage" "github.com/prometheus/prometheus/util/testutil" + "github.com/prometheus/prometheus/util/testutil/synctest" ) func TestPopulateLabels(t *testing.T) { @@ -767,7 +768,7 @@ func TestManagerSTZeroIngestion(t *testing.T) { app := teststorage.NewAppendable() discoveryManager, scrapeManager := runManagers(t, ctx, &Options{ EnableStartTimestampZeroIngestion: testSTZeroIngest, - skipOffsetting: true, + skipJitterOffsetting: true, }, app, nil) defer scrapeManager.Stop() @@ -953,7 +954,7 @@ func TestManagerSTZeroIngestionHistogram(t *testing.T) { app := teststorage.NewAppendable() discoveryManager, scrapeManager := runManagers(t, ctx, &Options{ EnableStartTimestampZeroIngestion: tc.enableSTZeroIngestion, - skipOffsetting: true, + skipJitterOffsetting: true, }, app, nil) defer scrapeManager.Stop() @@ -1065,7 +1066,7 @@ func TestNHCBAndSTZeroIngestion(t *testing.T) { app := teststorage.NewAppendable() discoveryManager, scrapeManager := runManagers(t, ctx, &Options{ EnableStartTimestampZeroIngestion: true, - skipOffsetting: true, + skipJitterOffsetting: true, }, app, nil) defer scrapeManager.Stop() @@ -1584,7 +1585,7 @@ scrape_configs: // Disable end of run staleness markers for some targets. m.DisableEndOfRunStalenessMarkers("one", targetsToDisable) - // This should be a no-op + // This should be a no-op. m.DisableEndOfRunStalenessMarkers("non-existent-job", targetsToDisable) // Check that the end of run staleness markers are disabled for the correct targets. @@ -1596,3 +1597,117 @@ scrape_configs: } } } + +func TestManager_InitialScrapeOffset(t *testing.T) { + interval := 10 * time.Second + + for _, tcase := range []struct { + name string + initialScrapeOffset time.Duration + runDuration time.Duration + expectedSamples int + }{ + { + name: "zero offset scrapes immediately", + expectedSamples: 1, + }, + { + name: "zero offset scrapes twice after one interval", + runDuration: interval, + expectedSamples: 2, + }, + { + name: "large offset prevents immediate scrape", + initialScrapeOffset: 1 * time.Hour, + runDuration: 59 * time.Minute, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + opts := &Options{InitialScrapeOffset: tcase.initialScrapeOffset} + scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, interval) + defer cleanupConns() + + // Wait for the scrape manager to block on its timers. + synctest.Wait() + + // Fast-forward the fake clock by the test case's run duration. + time.Sleep(tcase.runDuration) + synctest.Wait() + + // Stop the manager to clean up background goroutines. + scrapeManager.Stop() + + require.Len(t, findSamplesForMetric(app.ResultSamples(), "expected_metric"), tcase.expectedSamples) + }) + }) + } +} + +func TestManager_ScrapeOnShutdown(t *testing.T) { + interval := 10 * time.Second + + for _, tcase := range []struct { + name string + scrapeOnShutdown bool + initialScrapeOffset time.Duration + runDuration time.Duration + expectedSamplesTotal int + }{ + { + name: "no scrape on shutdown", + scrapeOnShutdown: false, + expectedSamplesTotal: 1, + }, + { + name: "scrape on shutdown", + scrapeOnShutdown: true, + expectedSamplesTotal: 2, + }, + { + name: "scrape on shutdown after some scrapes", + scrapeOnShutdown: true, + runDuration: interval, + expectedSamplesTotal: 3, + }, + { + name: "scrape on shutdown with initial offset", + scrapeOnShutdown: true, + initialScrapeOffset: 10 * time.Second, + runDuration: 5 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "scrape on shutdown with short running instance (offset 5s)", + scrapeOnShutdown: true, + initialScrapeOffset: 5 * time.Second, + runDuration: 8 * time.Second, + expectedSamplesTotal: 2, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + opts := &Options{ + ScrapeOnShutdown: tcase.scrapeOnShutdown, + InitialScrapeOffset: tcase.initialScrapeOffset, + } + scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, interval) + defer cleanupConns() + + // Wait for the initial scrape to happen exactly at t=0. + synctest.Wait() + + // Fast-forward fake time to simulate scheduled scrapes before shutdown. + if tcase.runDuration > 0 { + time.Sleep(tcase.runDuration) + synctest.Wait() + } + + // Stop the manager. This triggers the ScrapeOnShutdown logic synchronously. + scrapeManager.Stop() + + require.Len(t, findSamplesForMetric(app.ResultSamples(), "expected_metric"), tcase.expectedSamplesTotal) + }) + }) + } +} diff --git a/scrape/scrape.go b/scrape/scrape.go index abddd06603..55d0eaf70b 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -238,7 +238,6 @@ func (sp *scrapePool) SetScrapeFailureLogger(l FailureLogger) { func (sp *scrapePool) stop() { sp.mtx.Lock() defer sp.mtx.Unlock() - sp.cancel() var wg sync.WaitGroup sp.targetMtx.Lock() @@ -258,6 +257,10 @@ func (sp *scrapePool) stop() { sp.targetMtx.Unlock() wg.Wait() + // Cancel the context after all loops have stopped. This is required for + // scrapeOnShutdown to work properly, as the shutdown scrape uses this + // context (via sl.parentCtx) and would fail if the context was cancelled early. + sp.cancel() sp.client.CloseIdleConnections() if sp.config != nil { @@ -820,11 +823,17 @@ type cacheEntry struct { } type scrapeLoop struct { - // Parameters. - ctx context.Context - cancel func() - stopped chan struct{} - parentCtx context.Context + // ctx represents a local context that is cancellable via s.cancel. + // It's meant to synchronize run() with stop(). + // It inherits parentCtx. + ctx context.Context + cancel func() + stopped chan struct{} + // parentCtx represents manager-level context, typically connected + // to process shutdown. + parentCtx context.Context + // appenderCtx is a parentCtx with some extra context for appender + // implementations. Potentially remove-able with removal of AppenderV1. appenderCtx context.Context l *slog.Logger cache *scrapeCache @@ -865,8 +874,9 @@ type scrapeLoop struct { reportExtraMetrics bool appendMetadataToWAL bool passMetadataInContext bool - skipOffsetting bool // For testability. - + skipJitterOffsetting bool // For testability. + scrapeOnShutdown bool + initialScrapeOffset time.Duration // error injection through setForcedError. forcedErr error forcedErrMtx sync.Mutex @@ -1218,7 +1228,9 @@ func newScrapeLoop(opts scrapeLoopOptions) *scrapeLoop { enableTypeAndUnitLabels: opts.sp.options.EnableTypeAndUnitLabels, appendMetadataToWAL: opts.sp.options.AppendMetadata, passMetadataInContext: opts.sp.options.PassMetadataInContext, - skipOffsetting: opts.sp.options.skipOffsetting, + skipJitterOffsetting: opts.sp.options.skipJitterOffsetting, + scrapeOnShutdown: opts.sp.options.ScrapeOnShutdown, + initialScrapeOffset: opts.sp.options.InitialScrapeOffset, } } @@ -1231,31 +1243,49 @@ func (sl *scrapeLoop) setScrapeFailureLogger(l FailureLogger) { sl.scrapeFailureLogger = l } +func (sl *scrapeLoop) getScrapeOffset() time.Duration { + offset := sl.scraper.offset(sl.interval, sl.offsetSeed) + if sl.skipJitterOffsetting { + offset = time.Duration(0) + } + return sl.initialScrapeOffset + offset +} + func (sl *scrapeLoop) run(errc chan<- error) { - if !sl.skipOffsetting { + var ( + last time.Time + alignedScrapeTime = time.Now().Round(0) + ticker = time.NewTicker(sl.interval) + ) + defer func() { + if sl.scrapeOnShutdown { + last = sl.scrapeAndReport(last, time.Now().Round(0), errc) + } + // Let the stop() know it can continue. + close(sl.stopped) + if sl.parentCtx.Err() == nil { + if !sl.disabledEndOfRunStalenessMarkers.Load() { + sl.endOfRunStaleness(last, ticker, sl.interval) + } + } + ticker.Stop() + }() + + // Initial offset and jitter offset, if any. + offset := sl.getScrapeOffset() + if offset > 0 { select { - case <-time.After(sl.scraper.offset(sl.interval, sl.offsetSeed)): + case <-time.After(offset): // Continue after a scraping offset. case <-sl.ctx.Done(): - close(sl.stopped) return } } - var last time.Time - - alignedScrapeTime := time.Now().Round(0) - ticker := time.NewTicker(sl.interval) - defer ticker.Stop() - -mainLoop: for { select { - case <-sl.parentCtx.Done(): - close(sl.stopped) - return case <-sl.ctx.Done(): - break mainLoop + return default: } @@ -1282,20 +1312,11 @@ mainLoop: last = sl.scrapeAndReport(last, scrapeTime, errc) select { - case <-sl.parentCtx.Done(): - close(sl.stopped) - return case <-sl.ctx.Done(): - break mainLoop + return case <-ticker.C: } } - - close(sl.stopped) - - if !sl.disabledEndOfRunStalenessMarkers.Load() { - sl.endOfRunStaleness(last, ticker, sl.interval) - } } func (sl *scrapeLoop) appender() scrapeLoopAppendAdapter { diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 7b0cd022dd..63547869be 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -6653,7 +6653,7 @@ func testNewScrapeLoopHonorLabelsWiring(t *testing.T, appV2 bool) { } sa := selectAppendable(s, appV2) - sp, err := newScrapePool(cfg, sa.V1(), sa.V2(), 0, nil, nil, &Options{skipOffsetting: true}, newTestScrapeMetrics(t)) + sp, err := newScrapePool(cfg, sa.V1(), sa.V2(), 0, nil, nil, &Options{skipJitterOffsetting: true}, newTestScrapeMetrics(t)) require.NoError(t, err) defer sp.stop()