tests(scrape): add TestScrapeLoopAppend_WithStorage

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2026-01-20 15:19:59 +00:00
parent 664b255699
commit 656f20b196
3 changed files with 234 additions and 61 deletions

View file

@ -1545,6 +1545,184 @@ func TestPromTextToProto(t *testing.T) {
require.Equal(t, "promhttp_metric_handler_requests_total", got[236])
}
// TestScrapeLoopAppend_WithStorage tests appends and storage integration for the large input files that are also used in
// benchmarks.
func TestScrapeLoopAppend_WithStorage(t *testing.T) {
ts := time.Now()
for _, appV2 := range []bool{false, true} {
for _, tc := range []struct {
name string
parsableText []byte
expectedSamplesLen int
testAppendedSamples func(t *testing.T, committed []sample)
testExemplars func(t *testing.T, eq storage.ExemplarQuerier)
}{
{
name: "1Fam2000Gauges",
parsableText: makeTestGauges(2000),
expectedSamplesLen: 2000,
testAppendedSamples: func(t *testing.T, committed []sample) {
var expectedMF string
if appV2 {
// Only AppenderV2 supports metric family passing.
expectedMF = "metric_a"
}
// Verify a few samples.
testutil.RequireEqual(t, sample{
MF: expectedMF,
M: metadata.Metadata{Type: model.MetricTypeGauge, Help: "help text"},
L: labels.FromStrings(model.MetricNameLabel, "metric_a", "foo", "0", "bar", "0"), V: 1, T: timestamp.FromTime(ts),
}, committed[0])
testutil.RequireEqual(t, sample{
MF: expectedMF,
M: metadata.Metadata{Type: model.MetricTypeGauge, Help: "help text"},
L: labels.FromStrings(model.MetricNameLabel, "metric_a", "foo", "1245", "bar", "124500"), V: 1, T: timestamp.FromTime(ts),
}, committed[1245])
testutil.RequireEqual(t, sample{
MF: expectedMF,
M: metadata.Metadata{Type: model.MetricTypeGauge, Help: "help text"},
L: labels.FromStrings(model.MetricNameLabel, "metric_a", "foo", "1999", "bar", "199900"), V: 1, T: timestamp.FromTime(ts),
}, committed[len(committed)-1])
},
},
{
name: "237FamsAllTypes",
parsableText: readTextParseTestMetrics(t),
expectedSamplesLen: 1857,
testAppendedSamples: func(t *testing.T, committed []sample) {
// Verify a few samples.
testutil.RequireEqual(t, sample{
MF: func() string {
if !appV2 {
return ""
}
return "go_gc_gomemlimit_bytes"
}(),
M: metadata.Metadata{Type: model.MetricTypeGauge, Help: "Go runtime memory limit configured by the user, otherwise math.MaxInt64. This value is set by the GOMEMLIMIT environment variable, and the runtime/debug.SetMemoryLimit function. Sourced from /gc/gomemlimit:bytes"},
L: labels.FromStrings(model.MetricNameLabel, "go_gc_gomemlimit_bytes"), V: 9.03676723e+08, T: timestamp.FromTime(ts),
}, committed[11])
testutil.RequireEqual(t, sample{
MF: func() string {
if !appV2 {
return ""
}
return "prometheus_http_request_duration_seconds"
}(),
M: metadata.Metadata{Type: model.MetricTypeHistogram, Help: "Histogram of latencies for HTTP requests."},
L: labels.FromStrings(model.MetricNameLabel, "prometheus_http_request_duration_seconds_bucket", "handler", "/api/v1/query_range", "le", "120.0"), V: 118157, T: timestamp.FromTime(ts),
}, committed[448])
testutil.RequireEqual(t, sample{
MF: func() string {
if !appV2 {
return ""
}
return "promhttp_metric_handler_requests_total"
}(),
M: metadata.Metadata{Type: model.MetricTypeCounter, Help: "Total number of scrapes by HTTP status code."},
L: labels.FromStrings(model.MetricNameLabel, "promhttp_metric_handler_requests_total", "code", "503"), V: 0, T: timestamp.FromTime(ts),
}, committed[len(committed)-1])
},
},
{
name: "100HistsWithExemplars",
parsableText: makeTestHistogramsWithExemplars(100),
expectedSamplesLen: 24 * 100,
testAppendedSamples: func(t *testing.T, committed []sample) {
// Verify a few samples.
m := metadata.Metadata{Type: model.MetricTypeHistogram, Help: "RPC latency distributions."}
testutil.RequireEqual(t, sample{
MF: func() string {
if !appV2 {
return ""
}
return "rpc_durations_histogram0_seconds"
}(),
M: m, L: labels.FromStrings(model.MetricNameLabel, "rpc_durations_histogram0_seconds_bucket", "le", "0.0003100000000000002"), V: 15, T: timestamp.FromTime(ts),
ES: []exemplar.Exemplar{
{Labels: labels.FromStrings("dummyID", "9818"), Value: 0.0002791130914009552, Ts: 1726839814982, HasTs: true},
},
}, committed[13])
testutil.RequireEqual(t, sample{
MF: func() string {
if !appV2 {
return ""
}
return "rpc_durations_histogram49_seconds"
}(),
M: m, L: labels.FromStrings(model.MetricNameLabel, "rpc_durations_histogram49_seconds_sum"), V: -8.452185437166741e-05, T: timestamp.FromTime(ts),
}, committed[24*50-3])
// This series does not have metadata, nor metric family, because of isSeriesPartOfFamily bug and OpenMetric 1.0 limitations around _created series.
// TODO(bwplotka): Fix with https://github.com/prometheus/prometheus/issues/17900
testutil.RequireEqual(t, sample{
L: labels.FromStrings(model.MetricNameLabel, "rpc_durations_histogram99_seconds_created"), V: 1.726839813016302e+09, T: timestamp.FromTime(ts),
}, committed[len(committed)-1])
},
testExemplars: func(t *testing.T, eq storage.ExemplarQuerier) {
er, err := eq.Select(math.MinInt64, math.MaxInt64, nil)
require.NoError(t, err)
// 12 out of 24 histogram series have exemplars.
require.Len(t, er, 12*100)
testutil.RequireEqual(t, exemplar.QueryResult{
SeriesLabels: labels.FromStrings(model.MetricNameLabel, "rpc_durations_histogram0_seconds_bucket", "le", "0.0003100000000000002"),
Exemplars: []exemplar.Exemplar{
{Labels: labels.FromStrings("dummyID", "9818"), Value: 0.0002791130914009552, Ts: 1726839814982, HasTs: true},
},
}, er[10])
testutil.RequireEqual(t, exemplar.QueryResult{
SeriesLabels: labels.FromStrings(model.MetricNameLabel, "rpc_durations_histogram9_seconds_bucket", "le", "1.0000000000000216e-05"),
Exemplars: []exemplar.Exemplar{
{Labels: labels.FromStrings("dummyID", "19206"), Value: -4.6156147425468016e-05, Ts: 1726839815133, HasTs: true},
},
}, er[len(er)-1])
},
},
} {
t.Run(fmt.Sprintf("appV2=%v/data=%v", appV2, tc.name), func(t *testing.T) {
s := teststorage.New(t, func(opt *tsdb.Options) {
opt.EnableMetadataWALRecords = true
opt.EnableExemplarStorage = true
opt.MaxExemplars = 1e5
})
t.Cleanup(func() { _ = s.Close() })
appTest := teststorage.NewAppendable().Then(s)
sl, _ := newTestScrapeLoop(t, withAppendable(appTest, appV2))
app := sl.appender()
_, _, _, err := app.append(tc.parsableText, "application/openmetrics-text", ts)
require.NoError(t, err)
require.NoError(t, app.Commit())
// Check the recorded samples on the Appender layer.
require.Nil(t, appTest.PendingSamples())
require.Nil(t, appTest.RolledbackSamples())
got := appTest.ResultSamples()
require.Len(t, got, tc.expectedSamplesLen)
tc.testAppendedSamples(t, got)
// Check basic storage stats.
stats := s.Head().Stats(model.MetricNameLabel, 2000)
require.Equal(t, tc.expectedSamplesLen, int(stats.NumSeries))
if tc.testExemplars != nil {
// Check exemplars.
eq, err := s.ExemplarQuerier(t.Context())
require.NoError(t, err)
tc.testExemplars(t, eq)
}
})
}
}
}
// BenchmarkScrapeLoopAppend benchmarks scrape appends for typical cases.
//
// Benchmark compares append function run across 4 dimensions:
@ -1569,7 +1747,7 @@ func BenchmarkScrapeLoopAppend(b *testing.B) {
name string
parsableText []byte
}{
{name: "1Fam1000Gauges", parsableText: makeTestGauges(2000)}, // ~68.1 KB, ~77.9 KB in proto.
{name: "1Fam2000Gauges", parsableText: makeTestGauges(2000)}, // ~68.1 KB, ~77.9 KB in proto.
{name: "237FamsAllTypes", parsableText: readTextParseTestMetrics(b)}, // ~185.7 KB, ~70.6 KB in proto.
} {
b.Run(fmt.Sprintf("appV2=%v/appendMetadataToWAL=%v/data=%v", appV2, appendMetadataToWAL, data.name), func(b *testing.B) {
@ -3225,9 +3403,7 @@ metric: <
}
sl.alwaysScrapeClassicHist = test.alwaysScrapeClassicHist
// This test does not care about metadata.
// Having this true would mean we need to add metadata to sample
// expectations.
// TODO(bwplotka): Add cases for append metadata to WAL and pass metadata
// TODO(bwplotka): Add metadata expectations and turn it on.
sl.appendMetadataToWAL = false
})
app := sl.appender()

View file

@ -18,12 +18,8 @@ import (
"os"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/util/testutil"
)
@ -62,20 +58,20 @@ func NewWithError(o ...Option) (*TestStorage, error) {
if err != nil {
return nil, fmt.Errorf("opening test storage: %w", err)
}
reg := prometheus.NewRegistry()
eMetrics := tsdb.NewExemplarMetrics(reg)
// reg := prometheus.NewRegistry()
// eMetrics := tsdb.NewExemplarMetrics(reg)
es, err := tsdb.NewCircularExemplarStorage(10, eMetrics, opts.OutOfOrderTimeWindow)
if err != nil {
return nil, fmt.Errorf("opening test exemplar storage: %w", err)
}
return &TestStorage{DB: db, exemplarStorage: es, dir: dir}, nil
//es, err := tsdb.NewCircularExemplarStorage(10, eMetrics, opts.OutOfOrderTimeWindow)
//if err != nil {
// return nil, fmt.Errorf("opening test exemplar storage: %w", err)
//}
return &TestStorage{DB: db, dir: dir}, nil
}
type TestStorage struct {
*tsdb.DB
exemplarStorage tsdb.ExemplarStorage
dir string
// exemplarStorage tsdb.ExemplarStorage
dir string
}
func (s TestStorage) Close() error {
@ -85,14 +81,14 @@ func (s TestStorage) Close() error {
return os.RemoveAll(s.dir)
}
func (s TestStorage) ExemplarAppender() storage.ExemplarAppender {
return s
}
func (s TestStorage) ExemplarQueryable() storage.ExemplarQueryable {
return s.exemplarStorage
}
func (s TestStorage) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
return ref, s.exemplarStorage.AddExemplar(l, e)
}
//func (s TestStorage) ExemplarAppender() storage.ExemplarAppender {
// return s
//}
//
//func (s TestStorage) ExemplarQueryable() storage.ExemplarQueryable {
// return s.exemplarStorage
//}
//
//func (s TestStorage) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
// return ref, s.exemplarStorage.AddExemplar(l, e)
//}

View file

@ -401,7 +401,7 @@ var sampleFlagMap = map[string]string{
}
func TestEndpoints(t *testing.T) {
storage := promqltest.LoadedStorage(t, `
s := promqltest.LoadedStorage(t, `
load 1m
test_metric1{foo="bar"} 0+100x100
test_metric1{foo="boo"} 1+0x100
@ -414,7 +414,7 @@ func TestEndpoints(t *testing.T) {
test_metric5{"host.name"="localhost"} 1+0x100
test_metric5{"junk\n{},=: chars"="bar"} 1+0x100
`)
t.Cleanup(func() { storage.Close() })
t.Cleanup(func() { s.Close() })
start := time.Unix(0, 0)
exemplars := []exemplar.QueryResult{
@ -459,10 +459,13 @@ func TestEndpoints(t *testing.T) {
},
},
}
app := s.AppenderV2(t.Context())
for _, ed := range exemplars {
_, err := storage.AppendExemplar(0, ed.SeriesLabels, ed.Exemplars[0])
require.NoError(t, err, "failed to add exemplar: %+v", ed.Exemplars[0])
_, err := app.Append(0, ed.SeriesLabels, 0, 0, 0, nil, nil, storage.AOptions{Exemplars: ed.Exemplars})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
now := time.Now()
@ -480,9 +483,9 @@ func TestEndpoints(t *testing.T) {
testTargetRetriever := setupTestTargetRetriever(t)
api := &API{
Queryable: storage,
Queryable: s,
QueryEngine: ng,
ExemplarQueryable: storage.ExemplarQueryable(),
ExemplarQueryable: s,
targetRetriever: testTargetRetriever.toFactory(),
alertmanagerRetriever: testAlertmanagerRetriever{}.toFactory(),
flagsMap: sampleFlagMap,
@ -491,14 +494,14 @@ func TestEndpoints(t *testing.T) {
ready: func(f http.HandlerFunc) http.HandlerFunc { return f },
rulesRetriever: algr.toFactory(),
}
testEndpoints(t, api, testTargetRetriever, storage, true)
testEndpoints(t, api, testTargetRetriever, s, true)
})
// Run all the API tests against an API that is wired to forward queries via
// the remote read client to a test server, which in turn sends them to the
// data from the test storage.
t.Run("remote", func(t *testing.T) {
server := setupRemote(storage)
server := setupRemote(s)
defer server.Close()
u, err := url.Parse(server.URL)
@ -545,7 +548,7 @@ func TestEndpoints(t *testing.T) {
api := &API{
Queryable: remote,
QueryEngine: ng,
ExemplarQueryable: storage.ExemplarQueryable(),
ExemplarQueryable: s,
targetRetriever: testTargetRetriever.toFactory(),
alertmanagerRetriever: testAlertmanagerRetriever{}.toFactory(),
flagsMap: sampleFlagMap,
@ -554,7 +557,7 @@ func TestEndpoints(t *testing.T) {
ready: func(f http.HandlerFunc) http.HandlerFunc { return f },
rulesRetriever: algr.toFactory(),
}
testEndpoints(t, api, testTargetRetriever, storage, false)
testEndpoints(t, api, testTargetRetriever, s, false)
})
}
@ -671,7 +674,7 @@ func TestGetSeries(t *testing.T) {
func TestQueryExemplars(t *testing.T) {
start := time.Unix(0, 0)
storage := promqltest.LoadedStorage(t, `
s := promqltest.LoadedStorage(t, `
load 1m
test_metric1{foo="bar"} 0+100x100
test_metric1{foo="boo"} 1+0x100
@ -682,12 +685,12 @@ func TestQueryExemplars(t *testing.T) {
test_metric4{foo="boo", dup="1"} 1+0x100
test_metric4{foo="boo"} 1+0x100
`)
t.Cleanup(func() { storage.Close() })
t.Cleanup(func() { _ = s.Close() })
api := &API{
Queryable: storage,
Queryable: s,
QueryEngine: testEngine(t),
ExemplarQueryable: storage.ExemplarQueryable(),
ExemplarQueryable: s,
}
request := func(method string, qs url.Values) (*http.Request, error) {
@ -765,15 +768,15 @@ func TestQueryExemplars(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
es := storage
es := s
ctx := context.Background()
for _, te := range tc.exemplars {
for _, e := range te.Exemplars {
_, err := es.AppendExemplar(0, te.SeriesLabels, e)
require.NoError(t, err)
}
app := es.AppenderV2(t.Context())
for _, ed := range tc.exemplars {
_, err := app.Append(0, ed.SeriesLabels, 0, 0, 0, nil, nil, storage.AOptions{Exemplars: ed.Exemplars})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
req, err := request(http.MethodGet, tc.query)
require.NoError(t, err)
@ -1119,7 +1122,7 @@ func setupRemote(s storage.Storage) *httptest.Server {
return httptest.NewServer(handler)
}
func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.ExemplarStorage, testLabelAPI bool) {
func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, s *teststorage.TestStorage, testLabelAPI bool) {
start := time.Unix(0, 0)
type targetMetadata struct {
@ -3762,15 +3765,15 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.E
tr.ResetMetadataStore()
for _, tm := range test.metadata {
tr.SetMetadataStoreForTargets(tm.identifier, &testMetaStore{Metadata: tm.metadata})
require.NoError(t, tr.SetMetadataStoreForTargets(tm.identifier, &testMetaStore{Metadata: tm.metadata}))
}
for _, te := range test.exemplars {
for _, e := range te.Exemplars {
_, err := es.AppendExemplar(0, te.SeriesLabels, e)
require.NoError(t, err)
}
app := s.AppenderV2(t.Context())
for _, ed := range test.exemplars {
_, err := app.Append(0, ed.SeriesLabels, 0, 0, 0, nil, nil, storage.AOptions{Exemplars: ed.Exemplars})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
res := test.endpoint(req.WithContext(ctx))
assertAPIError(t, res.err, test.errType)
@ -4770,13 +4773,11 @@ func TestExtractQueryOpts(t *testing.T) {
// Test query timeout parameter.
func TestQueryTimeout(t *testing.T) {
storage := promqltest.LoadedStorage(t, `
s := promqltest.LoadedStorage(t, `
load 1m
test_metric1{foo="bar"} 0+100x100
`)
t.Cleanup(func() {
_ = storage.Close()
})
t.Cleanup(func() { _ = s.Close() })
now := time.Now()
@ -4796,9 +4797,9 @@ func TestQueryTimeout(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
engine := &fakeEngine{}
api := &API{
Queryable: storage,
Queryable: s,
QueryEngine: engine,
ExemplarQueryable: storage.ExemplarQueryable(),
ExemplarQueryable: s,
alertmanagerRetriever: testAlertmanagerRetriever{}.toFactory(),
flagsMap: sampleFlagMap,
now: func() time.Time { return now },