diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 9c12a31ab3..e33e0e39c7 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -1545,6 +1545,184 @@ func TestPromTextToProto(t *testing.T) { require.Equal(t, "promhttp_metric_handler_requests_total", got[236]) } +// TestScrapeLoopAppend_WithStorage tests appends and storage integration for the large input files that are also used in +// benchmarks. +func TestScrapeLoopAppend_WithStorage(t *testing.T) { + ts := time.Now() + + for _, appV2 := range []bool{false, true} { + for _, tc := range []struct { + name string + parsableText []byte + + expectedSamplesLen int + testAppendedSamples func(t *testing.T, committed []sample) + testExemplars func(t *testing.T, eq storage.ExemplarQuerier) + }{ + { + name: "1Fam2000Gauges", + parsableText: makeTestGauges(2000), + + expectedSamplesLen: 2000, + testAppendedSamples: func(t *testing.T, committed []sample) { + var expectedMF string + if appV2 { + // Only AppenderV2 supports metric family passing. + expectedMF = "metric_a" + } + // Verify a few samples. + testutil.RequireEqual(t, sample{ + MF: expectedMF, + M: metadata.Metadata{Type: model.MetricTypeGauge, Help: "help text"}, + L: labels.FromStrings(model.MetricNameLabel, "metric_a", "foo", "0", "bar", "0"), V: 1, T: timestamp.FromTime(ts), + }, committed[0]) + testutil.RequireEqual(t, sample{ + MF: expectedMF, + M: metadata.Metadata{Type: model.MetricTypeGauge, Help: "help text"}, + L: labels.FromStrings(model.MetricNameLabel, "metric_a", "foo", "1245", "bar", "124500"), V: 1, T: timestamp.FromTime(ts), + }, committed[1245]) + testutil.RequireEqual(t, sample{ + MF: expectedMF, + M: metadata.Metadata{Type: model.MetricTypeGauge, Help: "help text"}, + L: labels.FromStrings(model.MetricNameLabel, "metric_a", "foo", "1999", "bar", "199900"), V: 1, T: timestamp.FromTime(ts), + }, committed[len(committed)-1]) + }, + }, + { + name: "237FamsAllTypes", + parsableText: readTextParseTestMetrics(t), + + expectedSamplesLen: 1857, + testAppendedSamples: func(t *testing.T, committed []sample) { + // Verify a few samples. + testutil.RequireEqual(t, sample{ + MF: func() string { + if !appV2 { + return "" + } + return "go_gc_gomemlimit_bytes" + }(), + M: metadata.Metadata{Type: model.MetricTypeGauge, Help: "Go runtime memory limit configured by the user, otherwise math.MaxInt64. This value is set by the GOMEMLIMIT environment variable, and the runtime/debug.SetMemoryLimit function. Sourced from /gc/gomemlimit:bytes"}, + L: labels.FromStrings(model.MetricNameLabel, "go_gc_gomemlimit_bytes"), V: 9.03676723e+08, T: timestamp.FromTime(ts), + }, committed[11]) + testutil.RequireEqual(t, sample{ + MF: func() string { + if !appV2 { + return "" + } + return "prometheus_http_request_duration_seconds" + }(), + M: metadata.Metadata{Type: model.MetricTypeHistogram, Help: "Histogram of latencies for HTTP requests."}, + L: labels.FromStrings(model.MetricNameLabel, "prometheus_http_request_duration_seconds_bucket", "handler", "/api/v1/query_range", "le", "120.0"), V: 118157, T: timestamp.FromTime(ts), + }, committed[448]) + testutil.RequireEqual(t, sample{ + MF: func() string { + if !appV2 { + return "" + } + return "promhttp_metric_handler_requests_total" + }(), + M: metadata.Metadata{Type: model.MetricTypeCounter, Help: "Total number of scrapes by HTTP status code."}, + L: labels.FromStrings(model.MetricNameLabel, "promhttp_metric_handler_requests_total", "code", "503"), V: 0, T: timestamp.FromTime(ts), + }, committed[len(committed)-1]) + }, + }, + { + name: "100HistsWithExemplars", + parsableText: makeTestHistogramsWithExemplars(100), + + expectedSamplesLen: 24 * 100, + testAppendedSamples: func(t *testing.T, committed []sample) { + // Verify a few samples. + m := metadata.Metadata{Type: model.MetricTypeHistogram, Help: "RPC latency distributions."} + testutil.RequireEqual(t, sample{ + MF: func() string { + if !appV2 { + return "" + } + return "rpc_durations_histogram0_seconds" + }(), + M: m, L: labels.FromStrings(model.MetricNameLabel, "rpc_durations_histogram0_seconds_bucket", "le", "0.0003100000000000002"), V: 15, T: timestamp.FromTime(ts), + ES: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "9818"), Value: 0.0002791130914009552, Ts: 1726839814982, HasTs: true}, + }, + }, committed[13]) + testutil.RequireEqual(t, sample{ + MF: func() string { + if !appV2 { + return "" + } + return "rpc_durations_histogram49_seconds" + }(), + M: m, L: labels.FromStrings(model.MetricNameLabel, "rpc_durations_histogram49_seconds_sum"), V: -8.452185437166741e-05, T: timestamp.FromTime(ts), + }, committed[24*50-3]) + + // This series does not have metadata, nor metric family, because of isSeriesPartOfFamily bug and OpenMetric 1.0 limitations around _created series. + // TODO(bwplotka): Fix with https://github.com/prometheus/prometheus/issues/17900 + testutil.RequireEqual(t, sample{ + L: labels.FromStrings(model.MetricNameLabel, "rpc_durations_histogram99_seconds_created"), V: 1.726839813016302e+09, T: timestamp.FromTime(ts), + }, committed[len(committed)-1]) + }, + testExemplars: func(t *testing.T, eq storage.ExemplarQuerier) { + er, err := eq.Select(math.MinInt64, math.MaxInt64, nil) + require.NoError(t, err) + + // 12 out of 24 histogram series have exemplars. + require.Len(t, er, 12*100) + testutil.RequireEqual(t, exemplar.QueryResult{ + SeriesLabels: labels.FromStrings(model.MetricNameLabel, "rpc_durations_histogram0_seconds_bucket", "le", "0.0003100000000000002"), + Exemplars: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "9818"), Value: 0.0002791130914009552, Ts: 1726839814982, HasTs: true}, + }, + }, er[10]) + testutil.RequireEqual(t, exemplar.QueryResult{ + SeriesLabels: labels.FromStrings(model.MetricNameLabel, "rpc_durations_histogram9_seconds_bucket", "le", "1.0000000000000216e-05"), + Exemplars: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "19206"), Value: -4.6156147425468016e-05, Ts: 1726839815133, HasTs: true}, + }, + }, er[len(er)-1]) + }, + }, + } { + t.Run(fmt.Sprintf("appV2=%v/data=%v", appV2, tc.name), func(t *testing.T) { + s := teststorage.New(t, func(opt *tsdb.Options) { + opt.EnableMetadataWALRecords = true + opt.EnableExemplarStorage = true + opt.MaxExemplars = 1e5 + }) + t.Cleanup(func() { _ = s.Close() }) + + appTest := teststorage.NewAppendable().Then(s) + sl, _ := newTestScrapeLoop(t, withAppendable(appTest, appV2)) + app := sl.appender() + + _, _, _, err := app.append(tc.parsableText, "application/openmetrics-text", ts) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + // Check the recorded samples on the Appender layer. + require.Nil(t, appTest.PendingSamples()) + require.Nil(t, appTest.RolledbackSamples()) + + got := appTest.ResultSamples() + require.Len(t, got, tc.expectedSamplesLen) + tc.testAppendedSamples(t, got) + + // Check basic storage stats. + stats := s.Head().Stats(model.MetricNameLabel, 2000) + require.Equal(t, tc.expectedSamplesLen, int(stats.NumSeries)) + + if tc.testExemplars != nil { + // Check exemplars. + eq, err := s.ExemplarQuerier(t.Context()) + require.NoError(t, err) + tc.testExemplars(t, eq) + } + }) + } + } +} + // BenchmarkScrapeLoopAppend benchmarks scrape appends for typical cases. // // Benchmark compares append function run across 4 dimensions: @@ -1569,7 +1747,7 @@ func BenchmarkScrapeLoopAppend(b *testing.B) { name string parsableText []byte }{ - {name: "1Fam1000Gauges", parsableText: makeTestGauges(2000)}, // ~68.1 KB, ~77.9 KB in proto. + {name: "1Fam2000Gauges", parsableText: makeTestGauges(2000)}, // ~68.1 KB, ~77.9 KB in proto. {name: "237FamsAllTypes", parsableText: readTextParseTestMetrics(b)}, // ~185.7 KB, ~70.6 KB in proto. } { b.Run(fmt.Sprintf("appV2=%v/appendMetadataToWAL=%v/data=%v", appV2, appendMetadataToWAL, data.name), func(b *testing.B) { @@ -3225,9 +3403,7 @@ metric: < } sl.alwaysScrapeClassicHist = test.alwaysScrapeClassicHist // This test does not care about metadata. - // Having this true would mean we need to add metadata to sample - // expectations. - // TODO(bwplotka): Add cases for append metadata to WAL and pass metadata + // TODO(bwplotka): Add metadata expectations and turn it on. sl.appendMetadataToWAL = false }) app := sl.appender() diff --git a/util/teststorage/storage.go b/util/teststorage/storage.go index a8e1306955..08cc4970a2 100644 --- a/util/teststorage/storage.go +++ b/util/teststorage/storage.go @@ -18,12 +18,8 @@ import ( "os" "time" - "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" - "github.com/prometheus/prometheus/model/exemplar" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/util/testutil" ) @@ -62,20 +58,20 @@ func NewWithError(o ...Option) (*TestStorage, error) { if err != nil { return nil, fmt.Errorf("opening test storage: %w", err) } - reg := prometheus.NewRegistry() - eMetrics := tsdb.NewExemplarMetrics(reg) + // reg := prometheus.NewRegistry() + // eMetrics := tsdb.NewExemplarMetrics(reg) - es, err := tsdb.NewCircularExemplarStorage(10, eMetrics, opts.OutOfOrderTimeWindow) - if err != nil { - return nil, fmt.Errorf("opening test exemplar storage: %w", err) - } - return &TestStorage{DB: db, exemplarStorage: es, dir: dir}, nil + //es, err := tsdb.NewCircularExemplarStorage(10, eMetrics, opts.OutOfOrderTimeWindow) + //if err != nil { + // return nil, fmt.Errorf("opening test exemplar storage: %w", err) + //} + return &TestStorage{DB: db, dir: dir}, nil } type TestStorage struct { *tsdb.DB - exemplarStorage tsdb.ExemplarStorage - dir string + // exemplarStorage tsdb.ExemplarStorage + dir string } func (s TestStorage) Close() error { @@ -85,14 +81,14 @@ func (s TestStorage) Close() error { return os.RemoveAll(s.dir) } -func (s TestStorage) ExemplarAppender() storage.ExemplarAppender { - return s -} - -func (s TestStorage) ExemplarQueryable() storage.ExemplarQueryable { - return s.exemplarStorage -} - -func (s TestStorage) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { - return ref, s.exemplarStorage.AddExemplar(l, e) -} +//func (s TestStorage) ExemplarAppender() storage.ExemplarAppender { +// return s +//} +// +//func (s TestStorage) ExemplarQueryable() storage.ExemplarQueryable { +// return s.exemplarStorage +//} +// +//func (s TestStorage) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { +// return ref, s.exemplarStorage.AddExemplar(l, e) +//} diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 39c1fa6080..ef70a1965e 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -401,7 +401,7 @@ var sampleFlagMap = map[string]string{ } func TestEndpoints(t *testing.T) { - storage := promqltest.LoadedStorage(t, ` + s := promqltest.LoadedStorage(t, ` load 1m test_metric1{foo="bar"} 0+100x100 test_metric1{foo="boo"} 1+0x100 @@ -414,7 +414,7 @@ func TestEndpoints(t *testing.T) { test_metric5{"host.name"="localhost"} 1+0x100 test_metric5{"junk\n{},=: chars"="bar"} 1+0x100 `) - t.Cleanup(func() { storage.Close() }) + t.Cleanup(func() { s.Close() }) start := time.Unix(0, 0) exemplars := []exemplar.QueryResult{ @@ -459,10 +459,13 @@ func TestEndpoints(t *testing.T) { }, }, } + + app := s.AppenderV2(t.Context()) for _, ed := range exemplars { - _, err := storage.AppendExemplar(0, ed.SeriesLabels, ed.Exemplars[0]) - require.NoError(t, err, "failed to add exemplar: %+v", ed.Exemplars[0]) + _, err := app.Append(0, ed.SeriesLabels, 0, 0, 0, nil, nil, storage.AOptions{Exemplars: ed.Exemplars}) + require.NoError(t, err) } + require.NoError(t, app.Commit()) now := time.Now() @@ -480,9 +483,9 @@ func TestEndpoints(t *testing.T) { testTargetRetriever := setupTestTargetRetriever(t) api := &API{ - Queryable: storage, + Queryable: s, QueryEngine: ng, - ExemplarQueryable: storage.ExemplarQueryable(), + ExemplarQueryable: s, targetRetriever: testTargetRetriever.toFactory(), alertmanagerRetriever: testAlertmanagerRetriever{}.toFactory(), flagsMap: sampleFlagMap, @@ -491,14 +494,14 @@ func TestEndpoints(t *testing.T) { ready: func(f http.HandlerFunc) http.HandlerFunc { return f }, rulesRetriever: algr.toFactory(), } - testEndpoints(t, api, testTargetRetriever, storage, true) + testEndpoints(t, api, testTargetRetriever, s, true) }) // Run all the API tests against an API that is wired to forward queries via // the remote read client to a test server, which in turn sends them to the // data from the test storage. t.Run("remote", func(t *testing.T) { - server := setupRemote(storage) + server := setupRemote(s) defer server.Close() u, err := url.Parse(server.URL) @@ -545,7 +548,7 @@ func TestEndpoints(t *testing.T) { api := &API{ Queryable: remote, QueryEngine: ng, - ExemplarQueryable: storage.ExemplarQueryable(), + ExemplarQueryable: s, targetRetriever: testTargetRetriever.toFactory(), alertmanagerRetriever: testAlertmanagerRetriever{}.toFactory(), flagsMap: sampleFlagMap, @@ -554,7 +557,7 @@ func TestEndpoints(t *testing.T) { ready: func(f http.HandlerFunc) http.HandlerFunc { return f }, rulesRetriever: algr.toFactory(), } - testEndpoints(t, api, testTargetRetriever, storage, false) + testEndpoints(t, api, testTargetRetriever, s, false) }) } @@ -671,7 +674,7 @@ func TestGetSeries(t *testing.T) { func TestQueryExemplars(t *testing.T) { start := time.Unix(0, 0) - storage := promqltest.LoadedStorage(t, ` + s := promqltest.LoadedStorage(t, ` load 1m test_metric1{foo="bar"} 0+100x100 test_metric1{foo="boo"} 1+0x100 @@ -682,12 +685,12 @@ func TestQueryExemplars(t *testing.T) { test_metric4{foo="boo", dup="1"} 1+0x100 test_metric4{foo="boo"} 1+0x100 `) - t.Cleanup(func() { storage.Close() }) + t.Cleanup(func() { _ = s.Close() }) api := &API{ - Queryable: storage, + Queryable: s, QueryEngine: testEngine(t), - ExemplarQueryable: storage.ExemplarQueryable(), + ExemplarQueryable: s, } request := func(method string, qs url.Values) (*http.Request, error) { @@ -765,15 +768,15 @@ func TestQueryExemplars(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - es := storage + es := s ctx := context.Background() - for _, te := range tc.exemplars { - for _, e := range te.Exemplars { - _, err := es.AppendExemplar(0, te.SeriesLabels, e) - require.NoError(t, err) - } + app := es.AppenderV2(t.Context()) + for _, ed := range tc.exemplars { + _, err := app.Append(0, ed.SeriesLabels, 0, 0, 0, nil, nil, storage.AOptions{Exemplars: ed.Exemplars}) + require.NoError(t, err) } + require.NoError(t, app.Commit()) req, err := request(http.MethodGet, tc.query) require.NoError(t, err) @@ -1119,7 +1122,7 @@ func setupRemote(s storage.Storage) *httptest.Server { return httptest.NewServer(handler) } -func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.ExemplarStorage, testLabelAPI bool) { +func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, s *teststorage.TestStorage, testLabelAPI bool) { start := time.Unix(0, 0) type targetMetadata struct { @@ -3762,15 +3765,15 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.E tr.ResetMetadataStore() for _, tm := range test.metadata { - tr.SetMetadataStoreForTargets(tm.identifier, &testMetaStore{Metadata: tm.metadata}) + require.NoError(t, tr.SetMetadataStoreForTargets(tm.identifier, &testMetaStore{Metadata: tm.metadata})) } - for _, te := range test.exemplars { - for _, e := range te.Exemplars { - _, err := es.AppendExemplar(0, te.SeriesLabels, e) - require.NoError(t, err) - } + app := s.AppenderV2(t.Context()) + for _, ed := range test.exemplars { + _, err := app.Append(0, ed.SeriesLabels, 0, 0, 0, nil, nil, storage.AOptions{Exemplars: ed.Exemplars}) + require.NoError(t, err) } + require.NoError(t, app.Commit()) res := test.endpoint(req.WithContext(ctx)) assertAPIError(t, res.err, test.errType) @@ -4770,13 +4773,11 @@ func TestExtractQueryOpts(t *testing.T) { // Test query timeout parameter. func TestQueryTimeout(t *testing.T) { - storage := promqltest.LoadedStorage(t, ` + s := promqltest.LoadedStorage(t, ` load 1m test_metric1{foo="bar"} 0+100x100 `) - t.Cleanup(func() { - _ = storage.Close() - }) + t.Cleanup(func() { _ = s.Close() }) now := time.Now() @@ -4796,9 +4797,9 @@ func TestQueryTimeout(t *testing.T) { t.Run(tc.name, func(t *testing.T) { engine := &fakeEngine{} api := &API{ - Queryable: storage, + Queryable: s, QueryEngine: engine, - ExemplarQueryable: storage.ExemplarQueryable(), + ExemplarQueryable: s, alertmanagerRetriever: testAlertmanagerRetriever{}.toFactory(), flagsMap: sampleFlagMap, now: func() time.Time { return now },