From 695db71c68252646586ba1d90c6f35bb850cc8f2 Mon Sep 17 00:00:00 2001 From: Ridwan Sharif Date: Tue, 17 Mar 2026 18:10:10 +0000 Subject: [PATCH] scrape: add test for distribution of scrapes Signed-off-by: Ridwan Sharif --- scrape/scrape_test.go | 106 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 63547869be..432230219b 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -24,6 +24,7 @@ import ( "log/slog" "maps" "math" + "net" "net/http" "net/http/httptest" "net/url" @@ -51,6 +52,7 @@ import ( sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.uber.org/atomic" "go.uber.org/goleak" + "go.yaml.in/yaml/v2" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" @@ -69,6 +71,7 @@ import ( "github.com/prometheus/prometheus/util/pool" "github.com/prometheus/prometheus/util/teststorage" "github.com/prometheus/prometheus/util/testutil" + "github.com/prometheus/prometheus/util/testutil/synctest" ) func TestMain(m *testing.M) { @@ -6786,3 +6789,106 @@ func TestScrapePoolSetScrapeFailureLoggerRace(t *testing.T) { wg.Wait() } + +func TestScrapeOffsetDistribution(t *testing.T) { + interval := 5 * time.Second + + synctest.Test(t, func(t *testing.T) { + startTime := time.Now() + + listener := newPipeListener() + + var mu sync.Mutex + scrapeTimes := make(map[string][]time.Duration) + + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + select { + case <-r.Context().Done(): + return + default: + mu.Lock() + target := r.URL.Path + scrapeTimes[target] = append(scrapeTimes[target], time.Since(startTime)) + mu.Unlock() + + w.Header().Set("Content-Type", "text/plain; version=0.0.4") + fmt.Fprintln(w, "expected_metric 1") + } + }) + + srv := httptest.NewUnstartedServer(handler) + srv.Listener = listener + srv.Start() + t.Cleanup(srv.Close) + + app := teststorage.NewAppendable() + opts := &Options{ + HTTPClientOptions: []config_util.HTTPClientOption{ + config_util.WithDialContextFunc(func(ctx context.Context, _, _ string) (net.Conn, error) { + srvConn, cliConn := net.Pipe() + select { + case listener.conns <- srvConn: + return cliConn, nil + case <-listener.closed: + return nil, net.ErrClosed + case <-ctx.Done(): + return nil, ctx.Err() + } + }), + }, + } + scrapeManager, err := NewManager(opts, promslog.NewNopLogger(), nil, app, nil, prometheus.NewRegistry()) + require.NoError(t, err) + + var targets []model.LabelSet + for i := range 5 { + targets = append(targets, model.LabelSet{ + model.SchemeLabel: "http", + model.AddressLabel: model.LabelValue(fmt.Sprintf("target-%d.local", i)), + model.MetricsPathLabel: model.LabelValue(fmt.Sprintf("/metrics/%d", i)), + }) + } + + scrapeManager.updateTsets(map[string][]*targetgroup.Group{ + "test": {{Targets: targets}}, + }) + + cfg := &config.Config{ + GlobalConfig: config.GlobalConfig{ + ScrapeInterval: model.Duration(interval), + ScrapeTimeout: model.Duration(interval), + ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto}, + }, + ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, + } + cfgText, err := yaml.Marshal(*cfg) + require.NoError(t, err) + cfg = loadConfiguration(t, string(cfgText)) + require.NoError(t, scrapeManager.ApplyConfig(cfg)) + + scrapeManager.reload() + + time.Sleep(22 * time.Second) + synctest.Wait() + + scrapeManager.Stop() + + maxScrapes := 0 + for _, times := range scrapeTimes { + if len(times) > maxScrapes { + maxScrapes = len(times) + } + } + require.Positive(t, maxScrapes, "Expected at least one scrape") + + for i := 0; i < maxScrapes; i++ { + uniqueTimes := make(map[time.Duration]struct{}) + for _, times := range scrapeTimes { + if i < len(times) { + uniqueTimes[times[i]] = struct{}{} + } + } + require.Greater(t, len(uniqueTimes), 2, "Expected targets to be scraped at staggered offsets rather than simultaneously at scrape index %d", i) + } + }) +}