Merge branch 'main' into feature/start-time

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
Bartlomiej Plotka 2026-03-17 13:06:25 +01:00 committed by GitHub
commit a02e20d98e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 446 additions and 51 deletions

View file

@ -2314,6 +2314,10 @@ Each target has a meta label `__meta_url` during the
[relabeling phase](#relabel_config). Its value is set to the
URL from which the target was extracted.
There is a list of
[integrations](https://prometheus.io/docs/operating/integrations/#http-service-discovery) with this
discovery mechanism.
```yaml
# URL from which the targets are fetched.
url: <string>

View file

@ -95,3 +95,7 @@ Examples:
}
]
```
## HTTP SD integrations
A list of existing HTTP SD integrations can be found on the [Integrations page](https://prometheus.io/docs/operating/integrations/#http-service-discovery) in the Prometheus documentation.

View file

@ -18,17 +18,24 @@ import (
"context"
"encoding/binary"
"fmt"
"net"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
"github.com/stretchr/testify/require"
"go.yaml.in/yaml/v2"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
@ -235,3 +242,132 @@ func TestSelectAppendable(t *testing.T) {
}
})
}
// pipeListener is an in-memory net.Listener that connects a custom DialContext
// directly to the httptest Server without opening real OS ports.
type pipeListener struct {
conns chan net.Conn
closed chan struct{}
once sync.Once
}
func newPipeListener() *pipeListener {
return &pipeListener{
conns: make(chan net.Conn),
closed: make(chan struct{}),
}
}
func (l *pipeListener) Accept() (net.Conn, error) {
select {
case c := <-l.conns:
return c, nil
case <-l.closed:
return nil, net.ErrClosed
}
}
func (l *pipeListener) Close() error {
l.once.Do(func() { close(l.closed) })
return nil
}
// Dummy Addr implementation to satisfy the net.Listener interface.
type pipeAddr struct{}
func (pipeAddr) Network() string { return "pipe" }
func (pipeAddr) String() string { return "pipe" }
func (*pipeListener) Addr() net.Addr { return pipeAddr{} }
// startFakeHTTPServer spins up a httptest.Server bound to an in-memory
// pipeListener. It returns the listener (to be wired to a custom dialer) and a
// cleanup function to shut down the server.
func startFakeHTTPServer(t *testing.T) (*pipeListener, func()) {
t.Helper()
listener := newPipeListener()
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Abort if the request context is canceled (e.g., due to a scrape timeout).
select {
case <-r.Context().Done():
return
default:
w.Header().Set("Content-Type", "text/plain; version=0.0.4")
fmt.Fprintln(w, "expected_metric 1")
}
})
srv := httptest.NewUnstartedServer(handler)
srv.Listener = listener
// Background goroutines inherit the synctest bubble safely.
srv.Start()
return listener, srv.Close
}
// setupSynctestManager abstracts the boilerplate of creating a mock network,
// starting the fake HTTP server, and configuring the scrape manager for synctest.
func setupSynctestManager(t *testing.T, opts *Options, interval time.Duration) (*Manager, *teststorage.Appendable, func()) {
t.Helper()
app := teststorage.NewAppendable()
listener, cleanup := startFakeHTTPServer(t)
if opts == nil {
opts = &Options{}
}
opts.skipJitterOffsetting = true
// Ensure the scraper creates a new net.Pipe on every dial attempt
// and hands the server-side connection to the mock server's listener.
opts.HTTPClientOptions = []config_util.HTTPClientOption{
config_util.WithDialContextFunc(func(ctx context.Context, _, _ string) (net.Conn, error) {
srvConn, cliConn := net.Pipe()
select {
case listener.conns <- srvConn:
// Give the client side to the scraper.
return cliConn, nil
case <-listener.closed:
return nil, net.ErrClosed
case <-ctx.Done():
return nil, ctx.Err()
}
}),
}
scrapeManager, err := NewManager(
opts,
promslog.New(&promslog.Config{}),
nil, nil, app, prometheus.NewRegistry(),
)
require.NoError(t, err)
cfg := &config.Config{
GlobalConfig: config.GlobalConfig{
ScrapeInterval: model.Duration(interval),
ScrapeTimeout: model.Duration(interval),
ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0},
},
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
}
cfgText, err := yaml.Marshal(*cfg)
require.NoError(t, err)
cfg = loadConfiguration(t, string(cfgText))
require.NoError(t, scrapeManager.ApplyConfig(cfg))
scrapeManager.updateTsets(map[string][]*targetgroup.Group{
"test": {{
Targets: []model.LabelSet{{
model.SchemeLabel: "http",
model.AddressLabel: "test.local",
}},
}},
})
scrapeManager.reload()
return scrapeManager, app, cleanup
}

View file

@ -144,8 +144,31 @@ type Options struct {
// FeatureRegistry is the registry for tracking enabled/disabled features.
FeatureRegistry features.Collector
// ScrapeOnShutdown enables a final scrape before the manager closes. This is useful
// for Prometheus in agent mode or OTel's prometheusreceiver when used in serverless
// job scenarios, allowing an extra scrape for the short-living edge cases.
//
// NOTE: This final scrape ignores the configured scrape interval.
ScrapeOnShutdown bool
// InitialScrapeOffset applies an additional baseline delay before we begin
// scraping targets. By default, Prometheus calculates a specific offset for
// each target to spread the scraping load evenly across the server. Configuring
// this option adds a fixed duration to that target-specific offset. This allows
// tuning the initial startup delay without overriding the underlying target
// jitter, preserving proper load balancing across the scraper pools.
//
// Setting this offset (e.g., to 10s) is particularly useful in Prometheus
// agent mode and OTel's prometheusreceiver when used in serverless job
// scenarios. It helps avoid readiness races where targets might not be fully
// initialized immediately upon startup. It also prevents capturing
// intermediate state (such as applications crashing shortly after booting),
// and ensures backend rate limits don't drop valuable shutdown scrapes
// because of an early startup scrape.
InitialScrapeOffset time.Duration
// private option for testability.
skipOffsetting bool
skipJitterOffsetting bool
}
// Manager maintains a set of scrape pools and manages start/stop cycles
@ -334,8 +357,16 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error {
m.scrapeFailureLoggers = scrapeFailureLoggers
if err := m.setOffsetSeed(cfg.GlobalConfig.ExternalLabels); err != nil {
return err
// Skip offset seed calculation during tests.
// setOffsetSeed relies on osutil.GetFQDN(), which triggers a DNS lookup using
// a global singleflight goroutine. This cross-boundary communication breaks
// synctest's isolation bubble and causes a fatal panic.
if m.opts.skipJitterOffsetting {
m.offsetSeed = 0
} else {
if err := m.setOffsetSeed(cfg.GlobalConfig.ExternalLabels); err != nil {
return err
}
}
// Cleanup and reload pool if the configuration has changed.

View file

@ -53,6 +53,7 @@ import (
"github.com/prometheus/prometheus/util/runutil"
"github.com/prometheus/prometheus/util/teststorage"
"github.com/prometheus/prometheus/util/testutil"
"github.com/prometheus/prometheus/util/testutil/synctest"
)
func TestPopulateLabels(t *testing.T) {
@ -768,7 +769,7 @@ func TestManagerSTZeroIngestion(t *testing.T) {
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
EnableStartTimestampZeroIngestion: testSTZeroIngest,
ParseST: testSTZeroIngest,
skipOffsetting: true,
skipJitterOffsetting: true,
}, app, nil)
defer scrapeManager.Stop()
@ -955,7 +956,7 @@ func TestManagerSTZeroIngestionHistogram(t *testing.T) {
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
EnableStartTimestampZeroIngestion: tc.enableSTZeroIngestion,
ParseST: tc.enableSTZeroIngestion,
skipOffsetting: true,
skipJitterOffsetting: true,
}, app, nil)
defer scrapeManager.Stop()
@ -1068,7 +1069,7 @@ func TestNHCBAndSTZeroIngestion(t *testing.T) {
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
EnableStartTimestampZeroIngestion: true,
ParseST: true,
skipOffsetting: true,
skipJitterOffsetting: true,
}, app, nil)
defer scrapeManager.Stop()
@ -1587,7 +1588,7 @@ scrape_configs:
// Disable end of run staleness markers for some targets.
m.DisableEndOfRunStalenessMarkers("one", targetsToDisable)
// This should be a no-op
// This should be a no-op.
m.DisableEndOfRunStalenessMarkers("non-existent-job", targetsToDisable)
// Check that the end of run staleness markers are disabled for the correct targets.
@ -1599,3 +1600,117 @@ scrape_configs:
}
}
}
func TestManager_InitialScrapeOffset(t *testing.T) {
interval := 10 * time.Second
for _, tcase := range []struct {
name string
initialScrapeOffset time.Duration
runDuration time.Duration
expectedSamples int
}{
{
name: "zero offset scrapes immediately",
expectedSamples: 1,
},
{
name: "zero offset scrapes twice after one interval",
runDuration: interval,
expectedSamples: 2,
},
{
name: "large offset prevents immediate scrape",
initialScrapeOffset: 1 * time.Hour,
runDuration: 59 * time.Minute,
},
} {
t.Run(tcase.name, func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
opts := &Options{InitialScrapeOffset: tcase.initialScrapeOffset}
scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, interval)
defer cleanupConns()
// Wait for the scrape manager to block on its timers.
synctest.Wait()
// Fast-forward the fake clock by the test case's run duration.
time.Sleep(tcase.runDuration)
synctest.Wait()
// Stop the manager to clean up background goroutines.
scrapeManager.Stop()
require.Len(t, findSamplesForMetric(app.ResultSamples(), "expected_metric"), tcase.expectedSamples)
})
})
}
}
func TestManager_ScrapeOnShutdown(t *testing.T) {
interval := 10 * time.Second
for _, tcase := range []struct {
name string
scrapeOnShutdown bool
initialScrapeOffset time.Duration
runDuration time.Duration
expectedSamplesTotal int
}{
{
name: "no scrape on shutdown",
scrapeOnShutdown: false,
expectedSamplesTotal: 1,
},
{
name: "scrape on shutdown",
scrapeOnShutdown: true,
expectedSamplesTotal: 2,
},
{
name: "scrape on shutdown after some scrapes",
scrapeOnShutdown: true,
runDuration: interval,
expectedSamplesTotal: 3,
},
{
name: "scrape on shutdown with initial offset",
scrapeOnShutdown: true,
initialScrapeOffset: 10 * time.Second,
runDuration: 5 * time.Second,
expectedSamplesTotal: 1,
},
{
name: "scrape on shutdown with short running instance (offset 5s)",
scrapeOnShutdown: true,
initialScrapeOffset: 5 * time.Second,
runDuration: 8 * time.Second,
expectedSamplesTotal: 2,
},
} {
t.Run(tcase.name, func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
opts := &Options{
ScrapeOnShutdown: tcase.scrapeOnShutdown,
InitialScrapeOffset: tcase.initialScrapeOffset,
}
scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, interval)
defer cleanupConns()
// Wait for the initial scrape to happen exactly at t=0.
synctest.Wait()
// Fast-forward fake time to simulate scheduled scrapes before shutdown.
if tcase.runDuration > 0 {
time.Sleep(tcase.runDuration)
synctest.Wait()
}
// Stop the manager. This triggers the ScrapeOnShutdown logic synchronously.
scrapeManager.Stop()
require.Len(t, findSamplesForMetric(app.ResultSamples(), "expected_metric"), tcase.expectedSamplesTotal)
})
})
}
}

View file

@ -238,7 +238,6 @@ func (sp *scrapePool) SetScrapeFailureLogger(l FailureLogger) {
func (sp *scrapePool) stop() {
sp.mtx.Lock()
defer sp.mtx.Unlock()
sp.cancel()
var wg sync.WaitGroup
sp.targetMtx.Lock()
@ -258,6 +257,10 @@ func (sp *scrapePool) stop() {
sp.targetMtx.Unlock()
wg.Wait()
// Cancel the context after all loops have stopped. This is required for
// scrapeOnShutdown to work properly, as the shutdown scrape uses this
// context (via sl.parentCtx) and would fail if the context was cancelled early.
sp.cancel()
sp.client.CloseIdleConnections()
if sp.config != nil {
@ -820,11 +823,17 @@ type cacheEntry struct {
}
type scrapeLoop struct {
// Parameters.
ctx context.Context
cancel func()
stopped chan struct{}
parentCtx context.Context
// ctx represents a local context that is cancellable via s.cancel.
// It's meant to synchronize run() with stop().
// It inherits parentCtx.
ctx context.Context
cancel func()
stopped chan struct{}
// parentCtx represents manager-level context, typically connected
// to process shutdown.
parentCtx context.Context
// appenderCtx is a parentCtx with some extra context for appender
// implementations. Potentially remove-able with removal of AppenderV1.
appenderCtx context.Context
l *slog.Logger
cache *scrapeCache
@ -866,8 +875,9 @@ type scrapeLoop struct {
reportExtraMetrics bool
appendMetadataToWAL bool
passMetadataInContext bool
skipOffsetting bool // For testability.
skipJitterOffsetting bool // For testability.
scrapeOnShutdown bool
initialScrapeOffset time.Duration
// error injection through setForcedError.
forcedErr error
forcedErrMtx sync.Mutex
@ -1224,7 +1234,9 @@ func newScrapeLoop(opts scrapeLoopOptions) *scrapeLoop {
enableTypeAndUnitLabels: opts.sp.options.EnableTypeAndUnitLabels,
appendMetadataToWAL: opts.sp.options.AppendMetadata,
passMetadataInContext: opts.sp.options.PassMetadataInContext,
skipOffsetting: opts.sp.options.skipOffsetting,
skipJitterOffsetting: opts.sp.options.skipJitterOffsetting,
scrapeOnShutdown: opts.sp.options.ScrapeOnShutdown,
initialScrapeOffset: opts.sp.options.InitialScrapeOffset,
}
}
@ -1237,31 +1249,49 @@ func (sl *scrapeLoop) setScrapeFailureLogger(l FailureLogger) {
sl.scrapeFailureLogger = l
}
func (sl *scrapeLoop) getScrapeOffset() time.Duration {
offset := sl.scraper.offset(sl.interval, sl.offsetSeed)
if sl.skipJitterOffsetting {
offset = time.Duration(0)
}
return sl.initialScrapeOffset + offset
}
func (sl *scrapeLoop) run(errc chan<- error) {
if !sl.skipOffsetting {
var (
last time.Time
alignedScrapeTime = time.Now().Round(0)
ticker = time.NewTicker(sl.interval)
)
defer func() {
if sl.scrapeOnShutdown {
last = sl.scrapeAndReport(last, time.Now().Round(0), errc)
}
// Let the stop() know it can continue.
close(sl.stopped)
if sl.parentCtx.Err() == nil {
if !sl.disabledEndOfRunStalenessMarkers.Load() {
sl.endOfRunStaleness(last, ticker, sl.interval)
}
}
ticker.Stop()
}()
// Initial offset and jitter offset, if any.
offset := sl.getScrapeOffset()
if offset > 0 {
select {
case <-time.After(sl.scraper.offset(sl.interval, sl.offsetSeed)):
case <-time.After(offset):
// Continue after a scraping offset.
case <-sl.ctx.Done():
close(sl.stopped)
return
}
}
var last time.Time
alignedScrapeTime := time.Now().Round(0)
ticker := time.NewTicker(sl.interval)
defer ticker.Stop()
mainLoop:
for {
select {
case <-sl.parentCtx.Done():
close(sl.stopped)
return
case <-sl.ctx.Done():
break mainLoop
return
default:
}
@ -1288,20 +1318,11 @@ mainLoop:
last = sl.scrapeAndReport(last, scrapeTime, errc)
select {
case <-sl.parentCtx.Done():
close(sl.stopped)
return
case <-sl.ctx.Done():
break mainLoop
return
case <-ticker.C:
}
}
close(sl.stopped)
if !sl.disabledEndOfRunStalenessMarkers.Load() {
sl.endOfRunStaleness(last, ticker, sl.interval)
}
}
func (sl *scrapeLoop) appender() scrapeLoopAppendAdapter {

View file

@ -6723,7 +6723,7 @@ func testNewScrapeLoopHonorLabelsWiring(t *testing.T, appV2 bool) {
}
sa := selectAppendable(s, appV2)
sp, err := newScrapePool(cfg, sa.V1(), sa.V2(), 0, nil, nil, &Options{skipOffsetting: true}, newTestScrapeMetrics(t))
sp, err := newScrapePool(cfg, sa.V1(), sa.V2(), 0, nil, nil, &Options{skipJitterOffsetting: true}, newTestScrapeMetrics(t))
require.NoError(t, err)
defer sp.stop()

View file

@ -5758,6 +5758,93 @@ func testOOOMmapReplay(t *testing.T, scenario sampleTypeScenario) {
require.NoError(t, h.Close())
}
// TestWALReplayMmapsChunks is a regression test ensuring that
// chunks are mmapped during WAL replay, not accumulated in the
// in-memory linked list.
func TestWALReplayMmapsChunks(t *testing.T) {
for _, tc := range []struct {
name string
append func(app storage.Appender, l labels.Labels, ts int64) error
}{
{
name: "floats",
append: func(app storage.Appender, l labels.Labels, ts int64) error {
_, err := app.Append(0, l, ts, float64(ts))
return err
},
},
{
name: "histograms",
append: func(app storage.Appender, l labels.Labels, ts int64) error {
_, err := app.AppendHistogram(0, l, ts, tsdbutil.GenerateTestHistogram(ts), nil)
return err
},
},
{
name: "float histograms",
append: func(app storage.Appender, l labels.Labels, ts int64) error {
_, err := app.AppendHistogram(0, l, ts, nil, tsdbutil.GenerateTestFloatHistogram(ts))
return err
},
},
} {
t.Run(tc.name, func(t *testing.T) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy)
require.NoError(t, err)
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = dir
h, err := NewHead(nil, nil, wal, nil, opts, nil)
require.NoError(t, err)
require.NoError(t, h.Init(0))
l := labels.FromStrings("foo", "bar")
// Append 250 samples at 1-minute intervals. With ChunkRange=1000 and
// DefaultSamplesPerChunk=120, this creates multiple chunks that should
// be mmapped during WAL replay.
app := h.Appender(context.Background())
for i := range 250 {
require.NoError(t, tc.append(app, l, int64(i)*time.Minute.Milliseconds()))
}
require.NoError(t, app.Commit())
require.NoError(t, h.Close())
// Remove mmapped chunk files so WAL replay must recreate all chunks.
require.NoError(t, os.RemoveAll(filepath.Join(dir, "chunks_head")))
// Reopen Head — WAL replay happens in Init.
wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy)
require.NoError(t, err)
h, err = NewHead(nil, nil, wal, nil, opts, nil)
require.NoError(t, err)
require.NoError(t, h.Init(0))
ms, ok, err := h.getOrCreate(l.Hash(), l, false)
require.NoError(t, err)
require.False(t, ok)
require.NotNil(t, ms)
// Chunks must be mmapped during replay, not left in the head linked list.
require.NotEmpty(t, ms.mmappedChunks, "expected chunks to be mmapped during WAL replay")
require.Equal(t, 1, ms.headChunks.len(), "expected only one head chunk after replay")
// Verify each mmapped chunk is readable from disk.
for _, m := range ms.mmappedChunks {
chk, err := h.chunkDiskMapper.Chunk(m.ref)
require.NoError(t, err)
require.Equal(t, int(m.numSamples), chk.NumSamples())
}
require.NoError(t, h.Close())
})
}
}
func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) {
h, _ := newTestHead(t, 1000, compression.None, false)

View file

@ -724,6 +724,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
if chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
_ = ms.mmapChunks(h.chunkDiskMapper)
}
if s.t > maxt {
maxt = s.t

View file

@ -83,8 +83,6 @@ const formatLabels = (labels: { [key: string]: string }): string => `
const tooltipPlugin = (useLocalTime: boolean, data: AlignedData) => {
let over: HTMLDivElement;
let boundingLeft: number;
let boundingTop: number;
let selectedSeriesIdx: number | null = null;
const overlay = document.createElement("div");
@ -111,12 +109,6 @@ const tooltipPlugin = (useLocalTime: boolean, data: AlignedData) => {
destroy: () => {
overlay.remove();
},
// When the chart is resized, store the bounding box of the overlay.
setSize: () => {
const bbox = over.getBoundingClientRect();
boundingLeft = bbox.left;
boundingTop = bbox.top;
},
// When a series is selected by hovering close to it, store the
// index of the selected series, so we can update the hover tooltip
// in setCursor.
@ -150,8 +142,12 @@ const tooltipPlugin = (useLocalTime: boolean, data: AlignedData) => {
}
const color = series.stroke(u, selectedSeriesIdx);
const x = left + boundingLeft;
const y = top + boundingTop;
// Get the bounding rect fresh on every cursor move to account for
// page scrolling, which would otherwise cause a growing Y offset
// for charts further down the page.
const bbox = over.getBoundingClientRect();
const x = left + bbox.left;
const y = top + bbox.top;
overlay.innerHTML = `
<div class="date">${formatTimestamp(ts, useLocalTime)}</div>