mirror of
https://github.com/prometheus/prometheus.git
synced 2026-05-28 04:02:21 -04:00
scrape: add test for distribution of scrapes
Signed-off-by: Ridwan Sharif <ridwanmsharif@google.com>
This commit is contained in:
parent
caa250a29c
commit
695db71c68
1 changed files with 106 additions and 0 deletions
|
|
@ -24,6 +24,7 @@ import (
|
|||
"log/slog"
|
||||
"maps"
|
||||
"math"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
|
|
@ -51,6 +52,7 @@ import (
|
|||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/goleak"
|
||||
"go.yaml.in/yaml/v2"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/discovery"
|
||||
|
|
@ -69,6 +71,7 @@ import (
|
|||
"github.com/prometheus/prometheus/util/pool"
|
||||
"github.com/prometheus/prometheus/util/teststorage"
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
"github.com/prometheus/prometheus/util/testutil/synctest"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
|
|
@ -6786,3 +6789,106 @@ func TestScrapePoolSetScrapeFailureLoggerRace(t *testing.T) {
|
|||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestScrapeOffsetDistribution(t *testing.T) {
|
||||
interval := 5 * time.Second
|
||||
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
startTime := time.Now()
|
||||
|
||||
listener := newPipeListener()
|
||||
|
||||
var mu sync.Mutex
|
||||
scrapeTimes := make(map[string][]time.Duration)
|
||||
|
||||
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
select {
|
||||
case <-r.Context().Done():
|
||||
return
|
||||
default:
|
||||
mu.Lock()
|
||||
target := r.URL.Path
|
||||
scrapeTimes[target] = append(scrapeTimes[target], time.Since(startTime))
|
||||
mu.Unlock()
|
||||
|
||||
w.Header().Set("Content-Type", "text/plain; version=0.0.4")
|
||||
fmt.Fprintln(w, "expected_metric 1")
|
||||
}
|
||||
})
|
||||
|
||||
srv := httptest.NewUnstartedServer(handler)
|
||||
srv.Listener = listener
|
||||
srv.Start()
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
app := teststorage.NewAppendable()
|
||||
opts := &Options{
|
||||
HTTPClientOptions: []config_util.HTTPClientOption{
|
||||
config_util.WithDialContextFunc(func(ctx context.Context, _, _ string) (net.Conn, error) {
|
||||
srvConn, cliConn := net.Pipe()
|
||||
select {
|
||||
case listener.conns <- srvConn:
|
||||
return cliConn, nil
|
||||
case <-listener.closed:
|
||||
return nil, net.ErrClosed
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}),
|
||||
},
|
||||
}
|
||||
scrapeManager, err := NewManager(opts, promslog.NewNopLogger(), nil, app, nil, prometheus.NewRegistry())
|
||||
require.NoError(t, err)
|
||||
|
||||
var targets []model.LabelSet
|
||||
for i := range 5 {
|
||||
targets = append(targets, model.LabelSet{
|
||||
model.SchemeLabel: "http",
|
||||
model.AddressLabel: model.LabelValue(fmt.Sprintf("target-%d.local", i)),
|
||||
model.MetricsPathLabel: model.LabelValue(fmt.Sprintf("/metrics/%d", i)),
|
||||
})
|
||||
}
|
||||
|
||||
scrapeManager.updateTsets(map[string][]*targetgroup.Group{
|
||||
"test": {{Targets: targets}},
|
||||
})
|
||||
|
||||
cfg := &config.Config{
|
||||
GlobalConfig: config.GlobalConfig{
|
||||
ScrapeInterval: model.Duration(interval),
|
||||
ScrapeTimeout: model.Duration(interval),
|
||||
ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto},
|
||||
},
|
||||
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
|
||||
}
|
||||
cfgText, err := yaml.Marshal(*cfg)
|
||||
require.NoError(t, err)
|
||||
cfg = loadConfiguration(t, string(cfgText))
|
||||
require.NoError(t, scrapeManager.ApplyConfig(cfg))
|
||||
|
||||
scrapeManager.reload()
|
||||
|
||||
time.Sleep(22 * time.Second)
|
||||
synctest.Wait()
|
||||
|
||||
scrapeManager.Stop()
|
||||
|
||||
maxScrapes := 0
|
||||
for _, times := range scrapeTimes {
|
||||
if len(times) > maxScrapes {
|
||||
maxScrapes = len(times)
|
||||
}
|
||||
}
|
||||
require.Positive(t, maxScrapes, "Expected at least one scrape")
|
||||
|
||||
for i := 0; i < maxScrapes; i++ {
|
||||
uniqueTimes := make(map[time.Duration]struct{})
|
||||
for _, times := range scrapeTimes {
|
||||
if i < len(times) {
|
||||
uniqueTimes[times[i]] = struct{}{}
|
||||
}
|
||||
}
|
||||
require.Greater(t, len(uniqueTimes), 2, "Expected targets to be scraped at staggered offsets rather than simultaneously at scrape index %d", i)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue