diff --git a/.golangci.yml b/.golangci.yml index 8cb3265f4f..05a23b53b2 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -128,8 +128,6 @@ linters: # Disable this check for now since it introduces too many changes in our existing codebase. # See https://pkg.go.dev/golang.org/x/tools/go/analysis/passes/modernize#hdr-Analyzer_omitzero for more details. - omitzero - # Disable waitgroup check until we really move to Go 1.25. - - waitgroup perfsprint: # Optimizes even if it requires an int or uint type cast. int-conversion: true diff --git a/MAINTAINERS.md b/MAINTAINERS.md index 2e4a982382..ae61059af5 100644 --- a/MAINTAINERS.md +++ b/MAINTAINERS.md @@ -6,6 +6,7 @@ General maintainers: * Bryan Boreham (bjboreham@gmail.com / @bboreham) * Ayoub Mrini (ayoubmrini424@gmail.com / @machine424) * Julien Pivotto (roidelapluie@prometheus.io / @roidelapluie) +* György Krajcsovits ( / @krajorama) Maintainers for specific parts of the codebase: * `cmd` @@ -18,7 +19,7 @@ Maintainers for specific parts of the codebase: * `storage` * `remote`: Callum Styan ( / @cstyan), Bartłomiej Płotka ( / @bwplotka), Tom Wilkie (tom.wilkie@gmail.com / @tomwilkie), Alex Greenbank ( / @alexgreenbank) * `otlptranslator`: Arthur Silva Sens ( / @ArthurSens), Arve Knudsen ( / @aknuds1), Jesús Vázquez ( / @jesusvazquez) -* `tsdb`: Ganesh Vernekar ( / @codesome), Bartłomiej Płotka ( / @bwplotka), Jesús Vázquez ( / @jesusvazquez), George Krajcsovits ( / @krajorama) +* `tsdb`: Ganesh Vernekar ( / @codesome), Bartłomiej Płotka ( / @bwplotka), Jesús Vázquez ( / @jesusvazquez) * `web` * `ui`: Julius Volz ( / @juliusv) * `module`: Augustin Husson ( / @nexucis) diff --git a/cmd/prometheus/testdata/features.json b/cmd/prometheus/testdata/features.json index c39f60ab33..5fc01aa195 100644 --- a/cmd/prometheus/testdata/features.json +++ b/cmd/prometheus/testdata/features.json @@ -80,6 +80,7 @@ "histogram_count": true, "histogram_fraction": true, "histogram_quantile": true, + "histogram_quantiles": false, "histogram_stddev": true, "histogram_stdvar": true, "histogram_sum": true, diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index abb709c31d..183b918ba0 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -372,7 +372,7 @@ func main() { os.Exit(CheckSD(*sdConfigFile, *sdJobName, *sdTimeout, prometheus.DefaultRegisterer)) case checkConfigCmd.FullCommand(): - os.Exit(CheckConfig(*agentMode, *checkConfigSyntaxOnly, newConfigLintConfig(*checkConfigLint, *checkConfigLintFatal, *checkConfigIgnoreUnknownFields, model.UTF8Validation, model.Duration(*checkLookbackDelta)), *configFiles...)) + os.Exit(CheckConfig(*agentMode, *checkConfigSyntaxOnly, newConfigLintConfig(*checkConfigLint, *checkConfigLintFatal, *checkConfigIgnoreUnknownFields, model.UTF8Validation, model.Duration(*checkLookbackDelta)), promtoolParser, *configFiles...)) case checkServerHealthCmd.FullCommand(): os.Exit(checkErr(CheckServerStatus(serverURL, checkHealth, httpRoundTripper))) @@ -598,7 +598,7 @@ func CheckServerStatus(serverURL *url.URL, checkEndpoint string, roundTripper ht } // CheckConfig validates configuration files. -func CheckConfig(agentMode, checkSyntaxOnly bool, lintSettings configLintConfig, files ...string) int { +func CheckConfig(agentMode, checkSyntaxOnly bool, lintSettings configLintConfig, p parser.Parser, files ...string) int { failed := false hasErrors := false @@ -619,7 +619,7 @@ func CheckConfig(agentMode, checkSyntaxOnly bool, lintSettings configLintConfig, if !checkSyntaxOnly { scrapeConfigsFailed := lintScrapeConfigs(scrapeConfigs, lintSettings) failed = failed || scrapeConfigsFailed - rulesFailed, rulesHaveErrors := checkRules(ruleFiles, lintSettings.rulesLintConfig, parser.NewParser(parser.Options{})) + rulesFailed, rulesHaveErrors := checkRules(ruleFiles, lintSettings.rulesLintConfig, p) failed = failed || rulesFailed hasErrors = hasErrors || rulesHaveErrors } diff --git a/cmd/promtool/main_test.go b/cmd/promtool/main_test.go index 3b1730d894..297dd35d70 100644 --- a/cmd/promtool/main_test.go +++ b/cmd/promtool/main_test.go @@ -706,20 +706,21 @@ func TestCheckScrapeConfigs(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { // Non-fatal linting. - code := CheckConfig(false, false, newConfigLintConfig(lintOptionTooLongScrapeInterval, false, false, model.UTF8Validation, tc.lookbackDelta), "./testdata/prometheus-config.lint.too_long_scrape_interval.yml") + p := parser.NewParser(parser.Options{}) + code := CheckConfig(false, false, newConfigLintConfig(lintOptionTooLongScrapeInterval, false, false, model.UTF8Validation, tc.lookbackDelta), p, "./testdata/prometheus-config.lint.too_long_scrape_interval.yml") require.Equal(t, successExitCode, code, "Non-fatal linting should return success") // Fatal linting. - code = CheckConfig(false, false, newConfigLintConfig(lintOptionTooLongScrapeInterval, true, false, model.UTF8Validation, tc.lookbackDelta), "./testdata/prometheus-config.lint.too_long_scrape_interval.yml") + code = CheckConfig(false, false, newConfigLintConfig(lintOptionTooLongScrapeInterval, true, false, model.UTF8Validation, tc.lookbackDelta), p, "./testdata/prometheus-config.lint.too_long_scrape_interval.yml") if tc.expectError { require.Equal(t, lintErrExitCode, code, "Fatal linting should return error") } else { require.Equal(t, successExitCode, code, "Fatal linting should return success when there are no problems") } // Check syntax only, no linting. - code = CheckConfig(false, true, newConfigLintConfig(lintOptionTooLongScrapeInterval, true, false, model.UTF8Validation, tc.lookbackDelta), "./testdata/prometheus-config.lint.too_long_scrape_interval.yml") + code = CheckConfig(false, true, newConfigLintConfig(lintOptionTooLongScrapeInterval, true, false, model.UTF8Validation, tc.lookbackDelta), p, "./testdata/prometheus-config.lint.too_long_scrape_interval.yml") require.Equal(t, successExitCode, code, "Fatal linting should return success when checking syntax only") // Lint option "none" should disable linting. - code = CheckConfig(false, false, newConfigLintConfig(lintOptionNone+","+lintOptionTooLongScrapeInterval, true, false, model.UTF8Validation, tc.lookbackDelta), "./testdata/prometheus-config.lint.too_long_scrape_interval.yml") + code = CheckConfig(false, false, newConfigLintConfig(lintOptionNone+","+lintOptionTooLongScrapeInterval, true, false, model.UTF8Validation, tc.lookbackDelta), p, "./testdata/prometheus-config.lint.too_long_scrape_interval.yml") require.Equal(t, successExitCode, code, `Fatal linting should return success when lint option "none" is specified`) }) } diff --git a/cmd/promtool/tsdb.go b/cmd/promtool/tsdb.go index 1aaf87bc42..f43da0e1d0 100644 --- a/cmd/promtool/tsdb.go +++ b/cmd/promtool/tsdb.go @@ -159,17 +159,14 @@ func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (u batch := lbls[:l] lbls = lbls[l:] - wg.Add(1) - go func() { - defer wg.Done() - + wg.Go(func() { n, err := b.ingestScrapesShard(batch, 100, int64(timeDelta*i)) if err != nil { // exitWithError(err) fmt.Println(" err", err) } total.Add(n) - }() + }) } wg.Wait() } diff --git a/cmd/promtool/unittest.go b/cmd/promtool/unittest.go index 7e3db94501..c9278d8a46 100644 --- a/cmd/promtool/unittest.go +++ b/cmd/promtool/unittest.go @@ -255,6 +255,7 @@ func (tg *testGroup) test(testname string, evalInterval time.Duration, groupOrde Context: context.Background(), NotifyFunc: func(context.Context, string, ...*rules.Alert) {}, Logger: promslog.NewNopLogger(), + Parser: tg.parser, } m := rules.NewManager(opts) groupsMap, ers := m.LoadGroups(time.Duration(tg.Interval), tg.ExternalLabels, tg.ExternalURL, nil, ignoreUnknownFields, ruleFiles...) diff --git a/discovery/manager_test.go b/discovery/manager_test.go index 162730d9aa..8a49005100 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -1562,11 +1562,9 @@ func TestConfigReloadAndShutdownRace(t *testing.T) { discoveryManager.updatert = 100 * time.Millisecond var wgDiscovery sync.WaitGroup - wgDiscovery.Add(1) - go func() { + wgDiscovery.Go(func() { discoveryManager.Run() - wgDiscovery.Done() - }() + }) time.Sleep(time.Millisecond * 200) var wgBg sync.WaitGroup @@ -1588,11 +1586,9 @@ func TestConfigReloadAndShutdownRace(t *testing.T) { discoveryManager.ApplyConfig(c) delete(c, "prometheus") - wgBg.Add(1) - go func() { + wgBg.Go(func() { discoveryManager.ApplyConfig(c) - wgBg.Done() - }() + }) mgrCancel() wgDiscovery.Wait() diff --git a/docs/querying/functions.md b/docs/querying/functions.md index 3a9b7025f8..68a003359d 100644 --- a/docs/querying/functions.md +++ b/docs/querying/functions.md @@ -433,6 +433,23 @@ and is therefore flagged by an info-level annotation reading `input to histogram_quantile needed to be fixed for monotonicity`. If you encounter this annotation, you should find and remove the source of the invalid data. +## `histogram_quantiles()` + +**This function has to be enabled via the [feature +flag](../feature_flags.md#experimental-promql-functions) +`--enable-feature=promql-experimental-functions`.** + +`histogram_quantiles(v instant-vector, quantile_label string, φ_1 scalar, φ_2 scalar, ...)` calculates multiple (between 1 and 10) φ-quantiles (0 ≤ +φ ≤ 1) from a [classic +histogram](https://prometheus.io/docs/concepts/metric_types/#histogram) or from +a native histogram. Quantile calculation works the same way as in `histogram_quantile()`. +The second argument (a string) specifies the label name that is used to identify different quantiles in the query result. +``` +histogram_quantiles(sum(rate(foo[1m])), "quantile", 0.9, 0.99) +# => {quantile="0.9"} 123 + {quantile="0.99"} 128 +``` + ## `histogram_stddev()` and `histogram_stdvar()` `histogram_stddev(v instant-vector)` returns the estimated standard deviation diff --git a/promql/engine.go b/promql/engine.go index 3596501631..d2480fa8bc 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1214,6 +1214,9 @@ type EvalNodeHelper struct { // funcHistogramQuantile and funcHistogramFraction for classic histograms. signatureToMetricWithBuckets map[string]*metricWithBuckets nativeHistogramSamples []Sample + // funcHistogramQuantiles for histograms. + quantileStrs map[float64]string + signatureToLabelsWithQuantile map[string]map[float64]labels.Labels lb *labels.Builder lblBuf []byte @@ -1305,6 +1308,35 @@ func (enh *EvalNodeHelper) resetHistograms(inVec Vector, arg parser.Expr) annota return annos } +func (enh *EvalNodeHelper) getOrCreateLblsWithQuantile(lbls labels.Labels, quantileLabel string, q float64) labels.Labels { + if enh.signatureToLabelsWithQuantile == nil { + enh.signatureToLabelsWithQuantile = make(map[string]map[float64]labels.Labels) + } + + enh.lblBuf = lbls.Bytes(enh.lblBuf) + cachedLbls, ok := enh.signatureToLabelsWithQuantile[string(enh.lblBuf)] + if !ok { + cachedLbls = make(map[float64]labels.Labels, len(enh.quantileStrs)) + enh.signatureToLabelsWithQuantile[string(enh.lblBuf)] = cachedLbls + } + + cachedLblsWithQuantile, ok := cachedLbls[q] + if !ok { + quantileStr := "NaN" + if !math.IsNaN(q) { + // Cannot do map lookup by NaN key. + quantileStr = enh.quantileStrs[q] + } + cachedLblsWithQuantile = labels.NewBuilder(lbls). + Set(quantileLabel, quantileStr). + Labels() + + cachedLbls[q] = cachedLblsWithQuantile + } + + return cachedLblsWithQuantile +} + // rangeEval evaluates the given expressions, and then for each step calls // the given funcCall with the values computed for each expression at that // step. The return value is the combination into time series of all the @@ -4332,7 +4364,7 @@ func detectHistogramStatsDecoding(expr parser.Expr) { // further up (the latter wouldn't make sense, // but no harm in detecting it). n.SkipHistogramBuckets = true - case "histogram_quantile", "histogram_fraction": + case "histogram_quantile", "histogram_quantiles", "histogram_fraction": // If we ever see a function that needs the // whole histogram, we will not skip the // buckets. diff --git a/promql/engine_test.go b/promql/engine_test.go index 0feedf3552..51436d349f 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -94,11 +94,9 @@ func TestQueryConcurrency(t *testing.T) { var wg sync.WaitGroup for range maxConcurrency { q := engine.NewTestQuery(f) - wg.Add(1) - go func() { + wg.Go(func() { q.Exec(ctx) - wg.Done() - }() + }) select { case <-processing: // Expected. @@ -108,11 +106,9 @@ func TestQueryConcurrency(t *testing.T) { } q := engine.NewTestQuery(f) - wg.Add(1) - go func() { + wg.Go(func() { q.Exec(ctx) - wg.Done() - }() + }) select { case <-processing: diff --git a/promql/functions.go b/promql/functions.go index 2cb90a9b6c..546f94df12 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -1720,8 +1720,8 @@ func funcHistogramQuantile(vectorVals []Vector, _ Matrix, args parser.Expression inVec := vectorVals[1] var annos annotations.Annotations - if math.IsNaN(q) || q < 0 || q > 1 { - annos.Add(annotations.NewInvalidQuantileWarning(q, args[0].PositionRange())) + if err := validateQuantile(q, args[0]); err != nil { + annos.Add(err) } annos.Merge(enh.resetHistograms(inVec, args[1])) @@ -1770,6 +1770,89 @@ func funcHistogramQuantile(vectorVals []Vector, _ Matrix, args parser.Expression return enh.Out, annos } +func validateQuantile(q float64, arg parser.Expr) error { + if math.IsNaN(q) || q < 0 || q > 1 { + return annotations.NewInvalidQuantileWarning(q, arg.PositionRange()) + } + return nil +} + +// === histogram_quantiles(Vector parser.ValueTypeVector, label parser.ValueTypeString, q0 parser.ValueTypeScalar, qs parser.ValueTypeScalar...) (Vector, Annotations) === +func funcHistogramQuantiles(vectorVals []Vector, _ Matrix, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) { + var ( + inVec = vectorVals[0] + quantileLabel = args[1].(*parser.StringLiteral).Val + numQuantiles = len(vectorVals[2:]) + qs = make([]float64, 0, numQuantiles) + + annos annotations.Annotations + ) + + if enh.quantileStrs == nil { + enh.quantileStrs = make(map[float64]string, numQuantiles) + } + for i := 2; i < len(vectorVals); i++ { + q := vectorVals[i][0].F + + if err := validateQuantile(q, args[i]); err != nil { + annos.Add(err) + } + + if _, ok := enh.quantileStrs[q]; !ok { + enh.quantileStrs[q] = labels.FormatOpenMetricsFloat(q) + } + qs = append(qs, q) + } + + annos.Merge(enh.resetHistograms(inVec, args[0])) + + for _, q := range qs { + // Deal with the native histograms. + for _, sample := range enh.nativeHistogramSamples { + if sample.H == nil { + // Native histogram conflicts with classic histogram at the same timestamp, ignore. + continue + } + if !enh.enableDelayedNameRemoval { + sample.Metric = sample.Metric.DropReserved(schema.IsMetadataLabel) + } + hq, hqAnnos := HistogramQuantile(q, sample.H, sample.Metric.Get(model.MetricNameLabel), args[0].PositionRange()) + annos.Merge(hqAnnos) + enh.Out = append(enh.Out, Sample{ + Metric: enh.getOrCreateLblsWithQuantile(sample.Metric, quantileLabel, q), + F: hq, + DropName: true, + }) + } + + // Deal with classic histograms that have already been filtered for conflicting native histograms. + for _, mb := range enh.signatureToMetricWithBuckets { + if len(mb.buckets) > 0 { + hq, forcedMonotonicity, _, minBucket, maxBucket, maxDiff := BucketQuantile(q, mb.buckets) + if forcedMonotonicity { + metricName := "" + if enh.enableDelayedNameRemoval { + metricName = getMetricName(mb.metric) + } + annos.Add(annotations.NewHistogramQuantileForcedMonotonicityInfo(metricName, args[1].PositionRange(), enh.Ts, minBucket, maxBucket, maxDiff)) + } + + if !enh.enableDelayedNameRemoval { + mb.metric = mb.metric.DropReserved(schema.IsMetadataLabel) + } + + enh.Out = append(enh.Out, Sample{ + Metric: enh.getOrCreateLblsWithQuantile(mb.metric, quantileLabel, q), + F: hq, + DropName: true, + }) + } + } + } + + return enh.Out, annos +} + // pickFirstSampleIndex returns the index of the last sample before // or at the range start, or 0 if none exist before the range start. // If the vector selector is not anchored, it always returns 0, true. @@ -2100,6 +2183,7 @@ var FunctionCalls = map[string]FunctionCall{ "histogram_count": funcHistogramCount, "histogram_fraction": funcHistogramFraction, "histogram_quantile": funcHistogramQuantile, + "histogram_quantiles": funcHistogramQuantiles, "histogram_sum": funcHistogramSum, "histogram_stddev": funcHistogramStdDev, "histogram_stdvar": funcHistogramStdVar, diff --git a/promql/parser/functions.go b/promql/parser/functions.go index c7c7332305..180a255ab0 100644 --- a/promql/parser/functions.go +++ b/promql/parser/functions.go @@ -205,6 +205,13 @@ var Functions = map[string]*Function{ ArgTypes: []ValueType{ValueTypeScalar, ValueTypeVector}, ReturnType: ValueTypeVector, }, + "histogram_quantiles": { + Name: "histogram_quantiles", + ArgTypes: []ValueType{ValueTypeVector, ValueTypeString, ValueTypeScalar, ValueTypeScalar}, + Variadic: 9, + ReturnType: ValueTypeVector, + Experimental: true, + }, "double_exponential_smoothing": { Name: "double_exponential_smoothing", ArgTypes: []ValueType{ValueTypeMatrix, ValueTypeScalar, ValueTypeScalar}, diff --git a/promql/promqltest/testdata/histograms.test b/promql/promqltest/testdata/histograms.test index 436390ee41..db7d5de230 100644 --- a/promql/promqltest/testdata/histograms.test +++ b/promql/promqltest/testdata/histograms.test @@ -598,6 +598,40 @@ eval instant at 50m histogram_quantile(1, testhistogram3_bucket) {start="positive"} 1 {start="negative"} -0.1 +eval instant at 50m histogram_quantiles(testhistogram3, "q", 0, 0.25, 0.5, 0.75, 1) + expect no_warn + {q="0.0", start="positive"} 0 + {q="0.0", start="negative"} -0.25 + {q="0.25", start="positive"} 0.055 + {q="0.25", start="negative"} -0.225 + {q="0.5", start="positive"} 0.125 + {q="0.5", start="negative"} -0.2 + {q="0.75", start="positive"} 0.45 + {q="0.75", start="negative"} -0.15 + {q="1.0", start="positive"} 1 + {q="1.0", start="negative"} -0.1 + +eval instant at 50m histogram_quantiles(testhistogram3_bucket, "q", 0, 0.25, 0.5, 0.75, 1) + expect no_warn + {q="0.0", start="positive"} 0 + {q="0.0", start="negative"} -0.25 + {q="0.25", start="positive"} 0.055 + {q="0.25", start="negative"} -0.225 + {q="0.5", start="positive"} 0.125 + {q="0.5", start="negative"} -0.2 + {q="0.75", start="positive"} 0.45 + {q="0.75", start="negative"} -0.15 + {q="1.0", start="positive"} 1 + {q="1.0", start="negative"} -0.1 + +# Break label set uniqueness. + +eval instant at 50m histogram_quantiles(testhistogram3, "start", 0, 0.25, 0.5, 0.75, 1) + expect fail + +eval instant at 50m histogram_quantiles(testhistogram3_bucket, "start", 0, 0.25, 0.5, 0.75, 1) + expect fail + # Quantile too low. eval instant at 50m histogram_quantile(-0.1, testhistogram) @@ -610,6 +644,16 @@ eval instant at 50m histogram_quantile(-0.1, testhistogram_bucket) {start="positive"} -Inf {start="negative"} -Inf +eval instant at 50m histogram_quantiles(testhistogram, "q", -0.1) + expect warn + {q="-0.1", start="positive"} -Inf + {q="-0.1", start="negative"} -Inf + +eval instant at 50m histogram_quantiles(testhistogram_bucket, "q", -0.1) + expect warn + {q="-0.1", start="positive"} -Inf + {q="-0.1", start="negative"} -Inf + # Quantile too high. eval instant at 50m histogram_quantile(1.01, testhistogram) @@ -622,6 +666,16 @@ eval instant at 50m histogram_quantile(1.01, testhistogram_bucket) {start="positive"} +Inf {start="negative"} +Inf +eval instant at 50m histogram_quantiles(testhistogram, "q", 1.01) + expect warn + {q="1.01", start="positive"} +Inf + {q="1.01", start="negative"} +Inf + +eval instant at 50m histogram_quantiles(testhistogram_bucket, "q", 1.01) + expect warn + {q="1.01", start="positive"} +Inf + {q="1.01", start="negative"} +Inf + # Quantile invalid. eval instant at 50m histogram_quantile(NaN, testhistogram) @@ -634,9 +688,22 @@ eval instant at 50m histogram_quantile(NaN, testhistogram_bucket) {start="positive"} NaN {start="negative"} NaN +eval instant at 50m histogram_quantiles(testhistogram, "q", NaN) + expect warn + {q="NaN", start="positive"} NaN + {q="NaN", start="negative"} NaN + +eval instant at 50m histogram_quantiles(testhistogram_bucket, "q", NaN) + expect warn + {q="NaN", start="positive"} NaN + {q="NaN", start="negative"} NaN + eval instant at 50m histogram_quantile(NaN, non_existent) expect warn msg: PromQL warning: quantile value should be between 0 and 1, got NaN +eval instant at 50m histogram_quantiles(non_existent, "q", NaN) + expect warn msg: PromQL warning: quantile value should be between 0 and 1, got NaN + # Quantile value in lowest bucket. eval instant at 50m histogram_quantile(0, testhistogram) @@ -967,6 +1034,12 @@ eval instant at 50m histogram_quantile(0.99, nonmonotonic_bucket) expect info {} 979.75 +eval instant at 50m histogram_quantiles(nonmonotonic_bucket, "q", 0.01, 0.5, 0.99) + expect info + {q="0.01"} 0.0045 + {q="0.5"} 8.5 + {q="0.99"} 979.75 + # Buckets with different representations of the same upper bound. eval instant at 50m histogram_quantile(0.5, rate(mixed_bucket[10m])) {instance="ins1", job="job1"} 0.15 @@ -1002,9 +1075,15 @@ load_with_nhcb 5m eval instant at 50m histogram_quantile(0.99, {__name__=~"request_duration_seconds\\d*_bucket"}) expect fail +eval instant at 50m histogram_quantiles({__name__=~"request_duration_seconds\\d*_bucket"}, "q", 0.99) + expect fail + eval instant at 50m histogram_quantile(0.99, {__name__=~"request_duration_seconds\\d*"}) expect fail +eval instant at 50m histogram_quantiles({__name__=~"request_duration_seconds\\d*"}, "q", 0.99) + expect fail + # Histogram with constant buckets. load_with_nhcb 1m const_histogram_bucket{le="0.0"} 1 1 1 1 1 @@ -1066,7 +1145,7 @@ eval instant at 10m histogram_sum(increase(histogram_with_reset[15m])) clear -# Test histogram_quantile and histogram_fraction with conflicting classic and native histograms. +# Test histogram_quantile(s) and histogram_fraction with conflicting classic and native histograms. load 1m series{host="a"} {{schema:0 sum:5 count:4 buckets:[9 2 1]}} series{host="a", le="0.1"} 2 @@ -1081,6 +1160,11 @@ eval instant at 0 histogram_quantile(0.8, series) expect warn msg: PromQL warning: vector contains a mix of classic and native histograms for metric name "series" # Should return no results. +eval instant at 0 histogram_quantiles(series, "q", 0.1, 0.2) + expect no_info + expect warn msg: PromQL warning: vector contains a mix of classic and native histograms for metric name "series" + # Should return no results. + eval instant at 0 histogram_fraction(-Inf, 1, series) expect no_info expect warn msg: PromQL warning: vector contains a mix of classic and native histograms for metric name "series" diff --git a/promql/promqltest/testdata/native_histograms.test b/promql/promqltest/testdata/native_histograms.test index 3b497e5ff4..40789b295a 100644 --- a/promql/promqltest/testdata/native_histograms.test +++ b/promql/promqltest/testdata/native_histograms.test @@ -55,6 +55,10 @@ eval instant at 1m histogram_quantile(0.5, single_histogram) expect no_info {} 1.414213562373095 +eval instant at 1m histogram_quantiles(single_histogram, "q", 0.5) + expect no_info + {q="0.5"} 1.414213562373095 + clear # Repeat the same histogram 10 times. @@ -1605,6 +1609,11 @@ eval instant at 1m histogram_quantile(0.81, histogram_nan) {case="100% NaNs"} NaN {case="20% NaNs"} NaN +eval instant at 1m histogram_quantiles(histogram_nan, "q", 0.81) + expect info msg: PromQL info: input to histogram_quantile has NaN observations, result is NaN for metric name "histogram_nan" + {case="100% NaNs", q="0.81"} NaN + {case="20% NaNs", q="0.81"} NaN + eval instant at 1m histogram_quantile(0.8, histogram_nan{case="100% NaNs"}) expect info msg: PromQL info: input to histogram_quantile has NaN observations, result is NaN for metric name "histogram_nan" {case="100% NaNs"} NaN @@ -1891,6 +1900,9 @@ eval instant at 1m histogram_quantile(0.5, myHistogram2) eval instant at 1m histogram_quantile(0.5, mixedHistogram) expect warn msg: PromQL warning: vector contains a mix of classic and native histograms for metric name "mixedHistogram" +eval instant at 1m histogram_quantiles(mixedHistogram, "q", 0.5) + expect warn msg: PromQL warning: vector contains a mix of classic and native histograms for metric name "mixedHistogram" + clear # A counter reset only in a bucket. Sub-queries still need to detect @@ -1960,6 +1972,9 @@ eval instant at 1m histogram_count(histogram unless histogram_quantile(0.5, hist eval instant at 1m histogram_quantile(0.5, histogram unless histogram_count(histogram) == 0) {} 3.1748021039363987 +eval instant at 1m histogram_quantiles(histogram unless histogram_count(histogram) == 0, "q", 0.5) + {q="0.5"} 3.1748021039363987 + clear # Regression test for: diff --git a/rules/manager_test.go b/rules/manager_test.go index 1b9f4be7d5..19c815e50c 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -49,6 +49,7 @@ import ( "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/util/teststorage" prom_testutil "github.com/prometheus/prometheus/util/testutil" + "github.com/prometheus/prometheus/util/testutil/synctest" ) func TestMain(m *testing.M) { @@ -2010,306 +2011,306 @@ func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { func TestAsyncRuleEvaluation(t *testing.T) { t.Run("synchronous evaluation with independent rules", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) + ctx := t.Context() - ruleManager := NewManager(optsFactory(storage, &maxInflight, &inflightQueries, 0)) - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) + ruleManager := NewManager(optsFactory(storage, &maxInflight, &inflightQueries, 0)) + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) - expectedRuleCount := 6 - expectedSampleCount := 4 + expectedRuleCount := 6 + expectedSampleCount := 4 - for _, group := range groups { - require.Len(t, group.rules, expectedRuleCount) + for _, group := range groups { + require.Len(t, group.rules, expectedRuleCount) - start := time.Now() - DefaultEvalIterationFunc(ctx, group, start) + start := time.Now() + DefaultEvalIterationFunc(ctx, group, start) - // Expected evaluation order - order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) - require.Nil(t, order) + // Expected evaluation order + order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) + require.Nil(t, order) - // Never expect more than 1 inflight query at a time. - require.EqualValues(t, 1, maxInflight.Load()) - // Each rule should take at least 1 second to execute sequentially. - require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) - // Each recording rule produces one vector. - require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) - // Group duration is higher than the sum of rule durations (group overhead). - require.GreaterOrEqual(t, group.GetEvaluationTime(), group.GetRuleEvaluationTimeSum()) - } + // Never expect more than 1 inflight query at a time. + require.EqualValues(t, 1, maxInflight.Load()) + // Each rule should take at least 1 second to execute sequentially. + require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) + // Each recording rule produces one vector. + require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + // Group duration is higher than the sum of rule durations (group overhead). + require.GreaterOrEqual(t, group.GetEvaluationTime(), group.GetRuleEvaluationTimeSum()) + } + }) }) t.Run("asynchronous evaluation with independent and dependent rules", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) + ctx := t.Context() - expectedRuleCount := 6 - expectedSampleCount := 4 - opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + expectedRuleCount := 6 + expectedSampleCount := 4 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) - // Configure concurrency settings. - opts.ConcurrentEvalsEnabled = true - opts.MaxConcurrentEvals = 2 - opts.RuleConcurrencyController = nil - ruleManager := NewManager(opts) + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) - for _, group := range groups { - require.Len(t, group.rules, expectedRuleCount) + for _, group := range groups { + require.Len(t, group.rules, expectedRuleCount) - start := time.Now() - DefaultEvalIterationFunc(ctx, group, start) + start := time.Now() + DefaultEvalIterationFunc(ctx, group, start) - // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. - require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) - // Some rules should execute concurrently so should complete quicker. - require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) - // Each recording rule produces one vector. - require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) - } + // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. + require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) + // Each recording rule produces one vector. + require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + } + }) }) t.Run("asynchronous evaluation of all independent rules, insufficient concurrency", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) + ctx := t.Context() - expectedRuleCount := 8 - expectedSampleCount := expectedRuleCount - opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + expectedRuleCount := 8 + expectedSampleCount := expectedRuleCount + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) - // Configure concurrency settings. - opts.ConcurrentEvalsEnabled = true - opts.MaxConcurrentEvals = 2 - opts.RuleConcurrencyController = nil - ruleManager := NewManager(opts) + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_independent.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_independent.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) - for _, group := range groups { - require.Len(t, group.rules, expectedRuleCount) + for _, group := range groups { + require.Len(t, group.rules, expectedRuleCount) - start := time.Now() - DefaultEvalIterationFunc(ctx, group, start) + start := time.Now() + DefaultEvalIterationFunc(ctx, group, start) - // Expected evaluation order (isn't affected by concurrency settings) - order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) - require.Equal(t, []ConcurrentRules{ - {0, 1, 2, 3, 4, 5, 6, 7}, - }, order) + // Expected evaluation order (isn't affected by concurrency settings) + order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) + require.Equal(t, []ConcurrentRules{ + {0, 1, 2, 3, 4, 5, 6, 7}, + }, order) - // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. - require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) - // Some rules should execute concurrently so should complete quicker. - require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) - // Each recording rule produces one vector. - require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) - } + // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. + require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) + // Each recording rule produces one vector. + require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + } + }) }) t.Run("asynchronous evaluation of all independent rules, sufficient concurrency", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) + ctx := t.Context() - expectedRuleCount := 8 - expectedSampleCount := expectedRuleCount - opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + expectedRuleCount := 8 + expectedSampleCount := expectedRuleCount + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) - // Configure concurrency settings. - opts.ConcurrentEvalsEnabled = true - opts.MaxConcurrentEvals = int64(expectedRuleCount) * 2 - opts.RuleConcurrencyController = nil - ruleManager := NewManager(opts) + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = int64(expectedRuleCount) * 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_independent.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_independent.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) - for _, group := range groups { - require.Len(t, group.rules, expectedRuleCount) + for _, group := range groups { + require.Len(t, group.rules, expectedRuleCount) + + start := time.Now() + + DefaultEvalIterationFunc(ctx, group, start) + + // Expected evaluation order + order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) + require.Equal(t, []ConcurrentRules{ + {0, 1, 2, 3, 4, 5, 6, 7}, + }, order) + + // Max inflight can be up to MaxConcurrentEvals concurrent evals, since there is sufficient concurrency to run all rules at once. + require.LessOrEqual(t, int64(maxInflight.Load()), opts.MaxConcurrentEvals) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) + // Each recording rule produces one vector. + require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + // Group duration is less than the sum of rule durations + require.Less(t, group.GetEvaluationTime(), group.GetRuleEvaluationTimeSum()) + } + }) + }) + + t.Run("asynchronous evaluation of independent rules, with indeterminate. Should be synchronous", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) + + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} + + ctx := t.Context() + + ruleCount := 7 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = int64(ruleCount) * 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) + + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_indeterminates.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + + for _, group := range groups { + require.Len(t, group.rules, ruleCount) + + start := time.Now() + + group.Eval(ctx, start) + + // Never expect more than 1 inflight query at a time. + require.EqualValues(t, 1, maxInflight.Load()) + // Each rule should take at least 1 second to execute sequentially. + require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) + // Each rule produces one vector. + require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + } + }) + }) + + t.Run("asynchronous evaluation of rules that benefit from reordering", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) + + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} + + ctx := t.Context() + + ruleCount := 8 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = int64(ruleCount) * 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) + + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_dependents_on_base.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + var group *Group + for _, g := range groups { + group = g + } start := time.Now() - DefaultEvalIterationFunc(ctx, group, start) - // Expected evaluation order order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) require.Equal(t, []ConcurrentRules{ - {0, 1, 2, 3, 4, 5, 6, 7}, + {0, 4}, + {1, 2, 3, 5, 6, 7}, }, order) - // Max inflight can be up to MaxConcurrentEvals concurrent evals, since there is sufficient concurrency to run all rules at once. - require.LessOrEqual(t, int64(maxInflight.Load()), opts.MaxConcurrentEvals) - // Some rules should execute concurrently so should complete quicker. - require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) - // Each recording rule produces one vector. - require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) - // Group duration is less than the sum of rule durations - require.Less(t, group.GetEvaluationTime(), group.GetRuleEvaluationTimeSum()) - } - }) - - t.Run("asynchronous evaluation of independent rules, with indeterminate. Should be synchronous", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) - - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} - - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - ruleCount := 7 - opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) - - // Configure concurrency settings. - opts.ConcurrentEvalsEnabled = true - opts.MaxConcurrentEvals = int64(ruleCount) * 2 - opts.RuleConcurrencyController = nil - ruleManager := NewManager(opts) - - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_indeterminates.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) - - for _, group := range groups { - require.Len(t, group.rules, ruleCount) - - start := time.Now() - group.Eval(ctx, start) - // Never expect more than 1 inflight query at a time. - require.EqualValues(t, 1, maxInflight.Load()) - // Each rule should take at least 1 second to execute sequentially. - require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) + // Inflight queries should be equal to 6. This is the size of the second batch of rules that can be executed concurrently. + require.EqualValues(t, 6, maxInflight.Load()) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) // Each rule produces one vector. require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) - } - }) - - t.Run("asynchronous evaluation of rules that benefit from reordering", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) - - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} - - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - ruleCount := 8 - opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) - - // Configure concurrency settings. - opts.ConcurrentEvalsEnabled = true - opts.MaxConcurrentEvals = int64(ruleCount) * 2 - opts.RuleConcurrencyController = nil - ruleManager := NewManager(opts) - - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_dependents_on_base.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) - var group *Group - for _, g := range groups { - group = g - } - - start := time.Now() - - // Expected evaluation order - order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) - require.Equal(t, []ConcurrentRules{ - {0, 4}, - {1, 2, 3, 5, 6, 7}, - }, order) - - group.Eval(ctx, start) - - // Inflight queries should be equal to 6. This is the size of the second batch of rules that can be executed concurrently. - require.EqualValues(t, 6, maxInflight.Load()) - // Some rules should execute concurrently so should complete quicker. - require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) - // Each rule produces one vector. - require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + }) }) t.Run("attempted asynchronous evaluation of chained rules", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) + ctx := t.Context() - ruleCount := 7 - opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + ruleCount := 7 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) - // Configure concurrency settings. - opts.ConcurrentEvalsEnabled = true - opts.MaxConcurrentEvals = int64(ruleCount) * 2 - opts.RuleConcurrencyController = nil - ruleManager := NewManager(opts) + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = int64(ruleCount) * 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_chain.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) - var group *Group - for _, g := range groups { - group = g - } + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_chain.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + var group *Group + for _, g := range groups { + group = g + } - start := time.Now() + start := time.Now() - // Expected evaluation order - order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) - require.Equal(t, []ConcurrentRules{ - {0, 1}, - {2}, - {3}, - {4, 5, 6}, - }, order) + // Expected evaluation order + order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) + require.Equal(t, []ConcurrentRules{ + {0, 1}, + {2}, + {3}, + {4, 5, 6}, + }, order) - group.Eval(ctx, start) + group.Eval(ctx, start) - require.EqualValues(t, 3, maxInflight.Load()) - // Some rules should execute concurrently so should complete quicker. - require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) - // Each rule produces one vector. - require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + require.EqualValues(t, 3, maxInflight.Load()) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) + // Each rule produces one vector. + require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + }) }) } @@ -2472,11 +2473,9 @@ func TestBoundedRuleEvalConcurrency(t *testing.T) { // Evaluate groups concurrently (like they normally do). var wg sync.WaitGroup for _, group := range groups { - wg.Add(1) - go func() { + wg.Go(func() { group.Eval(ctx, time.Now()) - wg.Done() - }() + }) } wg.Wait() diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 3dac96f6a0..9fdd750692 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -225,7 +225,8 @@ func (h *writeHandler) appendV1Samples(app storage.Appender, ss []prompb.Sample, if err != nil { if errors.Is(err, storage.ErrOutOfOrderSample) || errors.Is(err, storage.ErrOutOfBounds) || - errors.Is(err, storage.ErrDuplicateSampleForTimestamp) { + errors.Is(err, storage.ErrDuplicateSampleForTimestamp) || + errors.Is(err, storage.ErrTooOldSample) { h.logger.Error("Out of order sample from remote write", "err", err.Error(), "series", labels.String(), "timestamp", s.Timestamp) } return err @@ -247,7 +248,8 @@ func (h *writeHandler) appendV1Histograms(app storage.Appender, hh []prompb.Hist // a note indicating its inclusion in the future. if errors.Is(err, storage.ErrOutOfOrderSample) || errors.Is(err, storage.ErrOutOfBounds) || - errors.Is(err, storage.ErrDuplicateSampleForTimestamp) { + errors.Is(err, storage.ErrDuplicateSampleForTimestamp) || + errors.Is(err, storage.ErrTooOldSample) { h.logger.Error("Out of order histogram from remote write", "err", err.Error(), "series", labels.String(), "timestamp", hp.Timestamp) } return err @@ -409,7 +411,8 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * // a note indicating its inclusion in the future. if errors.Is(err, storage.ErrOutOfOrderSample) || errors.Is(err, storage.ErrOutOfBounds) || - errors.Is(err, storage.ErrDuplicateSampleForTimestamp) { + errors.Is(err, storage.ErrDuplicateSampleForTimestamp) || + errors.Is(err, storage.ErrTooOldSample) { // TODO(bwplotka): Not too spammy log? h.logger.Error("Out of order histogram from remote write", "err", err.Error(), "series", ls.String(), "timestamp", hp.Timestamp) badRequestErrs = append(badRequestErrs, fmt.Errorf("%w for series %v", err, ls.String())) diff --git a/storage/remote/write_otlp_handler.go b/storage/remote/write_otlp_handler.go index b8888baeb8..6cb4a0fff0 100644 --- a/storage/remote/write_otlp_handler.go +++ b/storage/remote/write_otlp_handler.go @@ -176,7 +176,7 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch { case err == nil: - case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrOutOfBounds), errors.Is(err, storage.ErrDuplicateSampleForTimestamp): + case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrOutOfBounds), errors.Is(err, storage.ErrDuplicateSampleForTimestamp), errors.Is(err, storage.ErrTooOldSample): // Indicated an out of order sample is a bad request to prevent retries. http.Error(w, err.Error(), http.StatusBadRequest) return diff --git a/tsdb/chunks/chunk_write_queue.go b/tsdb/chunks/chunk_write_queue.go index 1a046ea00a..a87c2602cd 100644 --- a/tsdb/chunks/chunk_write_queue.go +++ b/tsdb/chunks/chunk_write_queue.go @@ -111,10 +111,7 @@ func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChu } func (c *chunkWriteQueue) start() { - c.workerWg.Add(1) - go func() { - defer c.workerWg.Done() - + c.workerWg.Go(func() { for { job, ok := c.jobs.pop() if !ok { @@ -123,7 +120,7 @@ func (c *chunkWriteQueue) start() { c.processJob(job) } - }() + }) c.isRunningMtx.Lock() c.isRunning = true diff --git a/tsdb/chunks/queue_test.go b/tsdb/chunks/queue_test.go index 377a8181ff..2e3fff59a8 100644 --- a/tsdb/chunks/queue_test.go +++ b/tsdb/chunks/queue_test.go @@ -269,34 +269,26 @@ func TestQueuePushPopManyGoroutines(t *testing.T) { readersWG := sync.WaitGroup{} for range readGoroutines { - readersWG.Add(1) - - go func() { - defer readersWG.Done() - + readersWG.Go(func() { for j, ok := queue.pop(); ok; j, ok = queue.pop() { refsMx.Lock() refs[j.seriesRef] = true refsMx.Unlock() } - }() + }) } id := atomic.Uint64{} writersWG := sync.WaitGroup{} for range writeGoroutines { - writersWG.Add(1) - - go func() { - defer writersWG.Done() - + writersWG.Go(func() { for range writes { ref := id.Inc() require.True(t, queue.push(chunkWriteJob{seriesRef: HeadSeriesRef(ref)})) } - }() + }) } // Wait until all writes are done. diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index afe15a5f31..44a0921eec 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -1717,10 +1717,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) { var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - + wg.Go(func() { // Ingest sparse histograms. for _, ah := range allSparseSeries { var ( @@ -1743,7 +1740,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) { sparseULIDs, err = compactor.Write(sparseHead.opts.ChunkDirRoot, sparseHead, mint, maxt, nil) require.NoError(t, err) require.Len(t, sparseULIDs, 1) - }() + }) wg.Add(1) go func(c testcase) { diff --git a/tsdb/db.go b/tsdb/db.go index 1d73628bfd..a4a4a77f3c 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -929,9 +929,13 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn for _, tmpDir := range []string{walDir, dir} { // Remove tmp dirs. - if err := removeBestEffortTmpDirs(l, tmpDir); err != nil { + if err := tsdbutil.RemoveTmpDirs(l, tmpDir, isTmpDir); err != nil { return nil, fmt.Errorf("remove tmp dirs: %w", err) } + // Remove any temporary checkpoints that might have been interrupted during creation. + if err := wlog.DeleteTempCheckpoints(l, tmpDir); err != nil { + return nil, fmt.Errorf("delete temp checkpoints: %w", err) + } } db := &DB{ @@ -1115,26 +1119,6 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn return db, nil } -func removeBestEffortTmpDirs(l *slog.Logger, dir string) error { - files, err := os.ReadDir(dir) - if os.IsNotExist(err) { - return nil - } - if err != nil { - return err - } - for _, f := range files { - if isTmpDir(f) { - if err := os.RemoveAll(filepath.Join(dir, f.Name())); err != nil { - l.Error("failed to delete tmp block dir", "dir", filepath.Join(dir, f.Name()), "err", err) - continue - } - l.Info("Found and deleted tmp block dir", "dir", filepath.Join(dir, f.Name())) - } - } - return nil -} - // StartTime implements the Storage interface. func (db *DB) StartTime() (int64, error) { db.mtx.RLock() @@ -2538,8 +2522,7 @@ func isBlockDir(fi fs.DirEntry) bool { return err == nil } -// isTmpDir returns true if the given file-info contains a block ULID, a checkpoint prefix, -// or a chunk snapshot prefix and a tmp extension. +// isTmpDir returns true if the given file-info contains a block ULID, or a chunk snapshot prefix and a tmp extension. func isTmpDir(fi fs.DirEntry) bool { if !fi.IsDir() { return false @@ -2548,9 +2531,6 @@ func isTmpDir(fi fs.DirEntry) bool { fn := fi.Name() ext := filepath.Ext(fn) if ext == tmpForDeletionBlockDirSuffix || ext == tmpForCreationBlockDirSuffix || ext == tmpLegacy { - if strings.HasPrefix(fn, wlog.CheckpointPrefix) { - return true - } if strings.HasPrefix(fn, chunkSnapshotPrefix) { return true } diff --git a/tsdb/db_append_v2_test.go b/tsdb/db_append_v2_test.go index e6bcfb696d..8083829537 100644 --- a/tsdb/db_append_v2_test.go +++ b/tsdb/db_append_v2_test.go @@ -1835,6 +1835,7 @@ func TestBlockRanges_AppendV2(t *testing.T) { createBlock(t, dir, genSeries(1, 1, 0, firstBlockMaxT)) db, err := open(dir, logger, nil, DefaultOptions(), []int64{10000}, nil) require.NoError(t, err) + db.DisableCompactions() rangeToTriggerCompaction := db.compactor.(*LeveledCompactor).ranges[0]/2*3 + 1 @@ -1851,21 +1852,16 @@ func TestBlockRanges_AppendV2(t *testing.T) { require.NoError(t, err) require.NoError(t, app.Commit()) - for range 100 { - if len(db.Blocks()) == 2 { - break - } - time.Sleep(100 * time.Millisecond) - } - require.Len(t, db.Blocks(), 2, "no new block created after the set timeout") + require.NoError(t, db.Compact(ctx)) + blocks := db.Blocks() + require.Len(t, blocks, 2, "no new block after compaction") - require.LessOrEqual(t, db.Blocks()[1].Meta().MinTime, db.Blocks()[0].Meta().MaxTime, - "new block overlaps old:%v,new:%v", db.Blocks()[0].Meta(), db.Blocks()[1].Meta()) + require.GreaterOrEqual(t, blocks[1].Meta().MinTime, blocks[0].Meta().MaxTime, + "new block overlaps old:%v,new:%v", blocks[0].Meta(), blocks[1].Meta()) // Test that wal records are skipped when an existing block covers the same time ranges // and compaction doesn't create an overlapping block. app = db.AppenderV2(ctx) - db.DisableCompactions() _, err = app.Append(0, lbl, 0, secondBlockMaxt+1, rand.Float64(), nil, nil, storage.AOptions{}) require.NoError(t, err) _, err = app.Append(0, lbl, 0, secondBlockMaxt+2, rand.Float64(), nil, nil, storage.AOptions{}) @@ -1882,6 +1878,7 @@ func TestBlockRanges_AppendV2(t *testing.T) { db, err = open(dir, logger, nil, DefaultOptions(), []int64{10000}, nil) require.NoError(t, err) + db.DisableCompactions() defer db.Close() require.Len(t, db.Blocks(), 3, "db doesn't include expected number of blocks") @@ -1891,17 +1888,12 @@ func TestBlockRanges_AppendV2(t *testing.T) { _, err = app.Append(0, lbl, 0, thirdBlockMaxt+rangeToTriggerCompaction, rand.Float64(), nil, nil, storage.AOptions{}) // Trigger a compaction require.NoError(t, err) require.NoError(t, app.Commit()) - for range 100 { - if len(db.Blocks()) == 4 { - break - } - time.Sleep(100 * time.Millisecond) - } + require.NoError(t, db.Compact(ctx)) + blocks = db.Blocks() + require.Len(t, blocks, 4, "no new block after compaction") - require.Len(t, db.Blocks(), 4, "no new block created after the set timeout") - - require.LessOrEqual(t, db.Blocks()[3].Meta().MinTime, db.Blocks()[2].Meta().MaxTime, - "new block overlaps old:%v,new:%v", db.Blocks()[2].Meta(), db.Blocks()[3].Meta()) + require.GreaterOrEqual(t, blocks[3].Meta().MinTime, blocks[2].Meta().MaxTime, + "new block overlaps old:%v,new:%v", blocks[2].Meta(), blocks[3].Meta()) } // TestDBReadOnly ensures that opening a DB in readonly mode doesn't modify any files on the disk. diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 18e969f952..3f2861d633 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -2408,6 +2408,7 @@ func TestBlockRanges(t *testing.T) { createBlock(t, dir, genSeries(1, 1, 0, firstBlockMaxT)) db, err := open(dir, logger, nil, DefaultOptions(), []int64{10000}, nil) require.NoError(t, err) + db.DisableCompactions() rangeToTriggerCompaction := db.compactor.(*LeveledCompactor).ranges[0]/2*3 + 1 @@ -2424,21 +2425,16 @@ func TestBlockRanges(t *testing.T) { require.NoError(t, err) require.NoError(t, app.Commit()) - for range 100 { - if len(db.Blocks()) == 2 { - break - } - time.Sleep(100 * time.Millisecond) - } - require.Len(t, db.Blocks(), 2, "no new block created after the set timeout") + require.NoError(t, db.Compact(ctx)) + blocks := db.Blocks() + require.Len(t, blocks, 2, "no new block after compaction") - require.LessOrEqual(t, db.Blocks()[1].Meta().MinTime, db.Blocks()[0].Meta().MaxTime, - "new block overlaps old:%v,new:%v", db.Blocks()[0].Meta(), db.Blocks()[1].Meta()) + require.GreaterOrEqual(t, blocks[1].Meta().MinTime, blocks[0].Meta().MaxTime, + "new block overlaps old:%v,new:%v", blocks[0].Meta(), blocks[1].Meta()) // Test that wal records are skipped when an existing block covers the same time ranges // and compaction doesn't create an overlapping block. app = db.Appender(ctx) - db.DisableCompactions() _, err = app.Append(0, lbl, secondBlockMaxt+1, rand.Float64()) require.NoError(t, err) _, err = app.Append(0, lbl, secondBlockMaxt+2, rand.Float64()) @@ -2455,6 +2451,7 @@ func TestBlockRanges(t *testing.T) { db, err = open(dir, logger, nil, DefaultOptions(), []int64{10000}, nil) require.NoError(t, err) + db.DisableCompactions() defer db.Close() require.Len(t, db.Blocks(), 3, "db doesn't include expected number of blocks") @@ -2464,17 +2461,12 @@ func TestBlockRanges(t *testing.T) { _, err = app.Append(0, lbl, thirdBlockMaxt+rangeToTriggerCompaction, rand.Float64()) // Trigger a compaction require.NoError(t, err) require.NoError(t, app.Commit()) - for range 100 { - if len(db.Blocks()) == 4 { - break - } - time.Sleep(100 * time.Millisecond) - } + require.NoError(t, db.Compact(ctx)) + blocks = db.Blocks() + require.Len(t, blocks, 4, "no new block after compaction") - require.Len(t, db.Blocks(), 4, "no new block created after the set timeout") - - require.LessOrEqual(t, db.Blocks()[3].Meta().MinTime, db.Blocks()[2].Meta().MaxTime, - "new block overlaps old:%v,new:%v", db.Blocks()[2].Meta(), db.Blocks()[3].Meta()) + require.GreaterOrEqual(t, blocks[3].Meta().MinTime, blocks[2].Meta().MaxTime, + "new block overlaps old:%v,new:%v", blocks[2].Meta(), blocks[3].Meta()) } // TestDBReadOnly ensures that opening a DB in readonly mode doesn't modify any files on the disk. diff --git a/tsdb/head_append_v2_test.go b/tsdb/head_append_v2_test.go index 082d756e60..61b2eecf4e 100644 --- a/tsdb/head_append_v2_test.go +++ b/tsdb/head_append_v2_test.go @@ -1334,13 +1334,11 @@ func TestDataMissingOnQueryDuringCompaction_AppenderV2(t *testing.T) { require.NoError(t, err) var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { // Compacting head while the querier spans the compaction time. require.NoError(t, db.Compact(ctx)) require.NotEmpty(t, db.Blocks()) - }() + }) // Give enough time for compaction to finish. // We expect it to be blocked until querier is closed. diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 7b8ae0ecbd..142fbc18e7 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -3259,12 +3259,10 @@ func testHeadSeriesChunkRace(t *testing.T) { defer q.Close() var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { h.updateMinMaxTime(20, 25) h.gc() - }() + }) ss := q.Select(context.Background(), false, nil, matcher) for ss.Next() { } @@ -3748,13 +3746,11 @@ func TestChunkNotFoundHeadGCRace(t *testing.T) { s := ss.At() var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { // Compacting head while the querier spans the compaction time. require.NoError(t, db.Compact(ctx)) require.NotEmpty(t, db.Blocks()) - }() + }) // Give enough time for compaction to finish. // We expect it to be blocked until querier is closed. @@ -3812,13 +3808,11 @@ func TestDataMissingOnQueryDuringCompaction(t *testing.T) { require.NoError(t, err) var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { // Compacting head while the querier spans the compaction time. require.NoError(t, db.Compact(ctx)) require.NotEmpty(t, db.Blocks()) - }() + }) // Give enough time for compaction to finish. // We expect it to be blocked until querier is closed. diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index 31b93f850d..c0bf213c45 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -956,7 +956,7 @@ func FindIntersectingPostings(p Postings, candidates []Postings) (indexes []int, } if p.At() == h.at() { indexes = append(indexes, h.popIndex()) - } else if err := h.next(); err != nil { + } else if err := h.seekHead(p.At()); err != nil { return nil, err } } @@ -999,20 +999,18 @@ func (h *postingsWithIndexHeap) popIndex() int { // at provides the storage.SeriesRef where root Postings is pointing at this moment. func (h postingsWithIndexHeap) at() storage.SeriesRef { return h[0].p.At() } -// next performs the Postings.Next() operation on the root of the heap, performing the related operation on the heap -// and conveniently returning the result of calling Postings.Err() if the result of calling Next() was false. -// If Next() succeeds, heap is fixed to move the root to its new position, according to its Postings.At() value. -// If Next() returns fails and there's no error reported by Postings.Err(), then root is marked as removed and heap is fixed. -func (h *postingsWithIndexHeap) next() error { +// seekHead performs the Postings.Seek() operation on the root of the heap. +// If the root is exhausted or fails, it is removed from the heap. +func (h *postingsWithIndexHeap) seekHead(val storage.SeriesRef) error { pi := (*h)[0] - next := pi.p.Next() + next := pi.p.Seek(val) if next { heap.Fix(h, 0) return nil } if err := pi.p.Err(); err != nil { - return fmt.Errorf("postings %d: %w", pi.index, err) + return fmt.Errorf("seek postings %d: %w", pi.index, err) } h.popIndex() return nil diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index 77b43f76ab..5c67a2da6d 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -1192,7 +1192,7 @@ func (p *postingsFailingAfterNthCall) Err() error { } func TestPostingsWithIndexHeap(t *testing.T) { - t.Run("iterate", func(t *testing.T) { + t.Run("seekHead", func(t *testing.T) { h := postingsWithIndexHeap{ {index: 0, p: NewListPostings([]storage.SeriesRef{10, 20, 30})}, {index: 1, p: NewListPostings([]storage.SeriesRef{1, 5})}, @@ -1205,7 +1205,7 @@ func TestPostingsWithIndexHeap(t *testing.T) { for _, expected := range []storage.SeriesRef{1, 5, 10, 20, 25, 30, 50} { require.Equal(t, expected, h.at()) - require.NoError(t, h.next()) + require.NoError(t, h.seekHead(h.at()+1)) } require.True(t, h.empty()) }) @@ -1223,7 +1223,7 @@ func TestPostingsWithIndexHeap(t *testing.T) { for _, expected := range []storage.SeriesRef{1, 5, 10, 20} { require.Equal(t, expected, h.at()) - require.NoError(t, h.next()) + require.NoError(t, h.seekHead(h.at()+1)) } require.Equal(t, storage.SeriesRef(25), h.at()) node := heap.Pop(&h).(postingsWithIndex) diff --git a/tsdb/isolation_test.go b/tsdb/isolation_test.go index f2671024e8..2b2e1a6487 100644 --- a/tsdb/isolation_test.go +++ b/tsdb/isolation_test.go @@ -88,10 +88,7 @@ func BenchmarkIsolation(b *testing.B) { start := make(chan struct{}) for range goroutines { - wg.Add(1) - - go func() { - defer wg.Done() + wg.Go(func() { <-start for b.Loop() { @@ -99,7 +96,7 @@ func BenchmarkIsolation(b *testing.B) { iso.closeAppend(appendID) } - }() + }) } b.ResetTimer() @@ -118,10 +115,7 @@ func BenchmarkIsolationWithState(b *testing.B) { start := make(chan struct{}) for range goroutines { - wg.Add(1) - - go func() { - defer wg.Done() + wg.Go(func() { <-start for b.Loop() { @@ -129,7 +123,7 @@ func BenchmarkIsolationWithState(b *testing.B) { iso.closeAppend(appendID) } - }() + }) } readers := goroutines / 100 @@ -138,17 +132,14 @@ func BenchmarkIsolationWithState(b *testing.B) { } for g := 0; g < readers; g++ { - wg.Add(1) - - go func() { - defer wg.Done() + wg.Go(func() { <-start for b.Loop() { s := iso.State(math.MinInt64, math.MaxInt64) s.Close() } - }() + }) } b.ResetTimer() diff --git a/tsdb/label_values_bench_test.go b/tsdb/label_values_bench_test.go new file mode 100644 index 0000000000..1e55cf80c0 --- /dev/null +++ b/tsdb/label_values_bench_test.go @@ -0,0 +1,86 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "context" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb/wlog" +) + +// BenchmarkLabelValues_SlowPath benchmarks the performance of LabelValues when the matcher +// is far ahead of the candidate posting list. This reproduces the performance regression +// described in #14551 where dense candidates caused O(N) iteration instead of O(log N) seeking. +func BenchmarkLabelValues_SlowPath(b *testing.B) { + // Create a head with some data. + opts := DefaultHeadOptions() + opts.ChunkDirRoot = b.TempDir() + h, err := NewHead(nil, nil, nil, nil, opts, nil) + require.NoError(b, err) + defer h.Close() + + app := h.Appender(context.Background()) + // 1. Create a large number of series for a "candidate" label (e.g. "job"). + // We want these to NOT match the target matcher, but be candidates for a different label. + // We use "job=api" and "instance=..." + // We want the interaction to be: + // LabelValues("instance", "job"="api") + // "job"="api" will have 1 series at the END. + // "instance" will have 100k series. + + // Actually, let's stick to the reproduction case: + // distinct values for "val1". + // "b"="1" matcher. + + // Create 100k series with the same label value ("common") but without the matcher label. + // This results in a single large posting list for that value, simulating a dense candidate. + for i := range 100000 { + _, err := app.Append(0, labels.FromStrings("val1", "common", "extra", strconv.Itoa(i)), time.Now().UnixMilli(), 1) + require.NoError(b, err) + } + + // Create 1 series that matches the label "b=1", with a series ID greater than all previous ones. + // This forces the intersection to skip over all 100k previous candidates. + _, err = app.Append(0, labels.FromStrings("val1", "common", "b", "1"), time.Now().UnixMilli(), 1) + require.NoError(b, err) + + require.NoError(b, app.Commit()) + + ctx := context.Background() + matcher := labels.MustNewMatcher(labels.MatchEqual, "b", "1") + + // Use the correct method to access label values. + idx, err := h.Index() + require.NoError(b, err) + + b.ResetTimer() + b.ReportAllocs() + + for b.Loop() { + // "val1"="common" has 100k+1 postings. + // "b=1" has 1 posting (the last one). + vals, err := idx.LabelValues(ctx, "val1", nil, matcher) + require.NoError(b, err) + require.Equal(b, []string{"common"}, vals) + } +} + +// Ensure wlog/wal needed for NewHead. +var _ = wlog.WL{} diff --git a/tsdb/tsdbutil/remove_tmp_dirs.go b/tsdb/tsdbutil/remove_tmp_dirs.go new file mode 100644 index 0000000000..a95db3159e --- /dev/null +++ b/tsdb/tsdbutil/remove_tmp_dirs.go @@ -0,0 +1,45 @@ +// Copyright 2018 The Prometheus Authors + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdbutil + +import ( + "io/fs" + "log/slog" + "os" + "path/filepath" +) + +// RemoveTmpDirs attempts to remove directories in the specified directory which match the isTmpDir predicate. +// Errors encountered during reading the directory that other than non-existence are returned. All other errors +// encountered during removal of tmp directories are logged but do not cause early termination. +func RemoveTmpDirs(l *slog.Logger, dir string, isTmpDir func(fi fs.DirEntry) bool) error { + files, err := os.ReadDir(dir) + if os.IsNotExist(err) { + return nil + } + if err != nil { + return err + } + for _, f := range files { + if isTmpDir(f) { + if err := os.RemoveAll(filepath.Join(dir, f.Name())); err != nil { + l.Error("failed to delete tmp dir", "dir", filepath.Join(dir, f.Name()), "err", err) + continue + } + l.Info("Found and deleted tmp dir", "dir", filepath.Join(dir, f.Name())) + } + } + return nil +} diff --git a/tsdb/tsdbutil/remove_tmp_dirs_test.go b/tsdb/tsdbutil/remove_tmp_dirs_test.go new file mode 100644 index 0000000000..4ab282d3b3 --- /dev/null +++ b/tsdb/tsdbutil/remove_tmp_dirs_test.go @@ -0,0 +1,124 @@ +// Copyright 2018 The Prometheus Authors + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdbutil + +import ( + "io/fs" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/prometheus/common/promslog" + "github.com/stretchr/testify/require" +) + +func TestRemoveTmpDirs(t *testing.T) { + tests := []struct { + name string + isTmpDir func(fi fs.DirEntry) bool + setup func(t *testing.T, dir string) + expectedDirs []string // Directories that should remain after cleanup + }{ + { + name: "remove directories with tmp prefix", + isTmpDir: func(fi fs.DirEntry) bool { + return fi.IsDir() && strings.HasPrefix(fi.Name(), "tmp") + }, + setup: func(t *testing.T, dir string) { + require.NoError(t, os.Mkdir(filepath.Join(dir, "tmpdir1"), 0o755)) + require.NoError(t, os.Mkdir(filepath.Join(dir, "tmpdir2"), 0o755)) + require.NoError(t, os.Mkdir(filepath.Join(dir, "normaldir"), 0o755)) + }, + expectedDirs: []string{"normaldir"}, + }, + { + name: "remove directories with specific suffix", + isTmpDir: func(fi fs.DirEntry) bool { + return fi.IsDir() && strings.HasSuffix(fi.Name(), ".tmp") + }, + setup: func(t *testing.T, dir string) { + require.NoError(t, os.Mkdir(filepath.Join(dir, "data.tmp"), 0o755)) + require.NoError(t, os.Mkdir(filepath.Join(dir, "cache.tmp"), 0o755)) + require.NoError(t, os.Mkdir(filepath.Join(dir, "permanent"), 0o755)) + }, + expectedDirs: []string{"permanent"}, + }, + { + name: "no temporary directories to remove", + isTmpDir: func(fi fs.DirEntry) bool { + return fi.IsDir() && strings.HasPrefix(fi.Name(), "tmp") + }, + setup: func(t *testing.T, dir string) { + require.NoError(t, os.Mkdir(filepath.Join(dir, "normaldir1"), 0o755)) + require.NoError(t, os.Mkdir(filepath.Join(dir, "normaldir2"), 0o755)) + }, + expectedDirs: []string{"normaldir1", "normaldir2"}, + }, + { + name: "empty directory", + isTmpDir: func(fi fs.DirEntry) bool { + return fi.IsDir() && strings.HasPrefix(fi.Name(), "tmp") + }, + setup: func(_ *testing.T, _ string) {}, // No setup needed - directory is empty + expectedDirs: []string{}, + }, + { + name: "directory with files only (no directories)", + isTmpDir: func(fi fs.DirEntry) bool { + return fi.IsDir() && strings.HasPrefix(fi.Name(), "tmp") + }, + setup: func(t *testing.T, dir string) { + require.NoError(t, os.WriteFile(filepath.Join(dir, "tmpfile1.txt"), []byte("test"), 0o644)) + require.NoError(t, os.WriteFile(filepath.Join(dir, "tmpfile2.txt"), []byte("test"), 0o644)) + }, + expectedDirs: []string{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testDir := t.TempDir() + + if tt.setup != nil { + tt.setup(t, testDir) + } + + require.NoError(t, RemoveTmpDirs(promslog.NewNopLogger(), testDir, tt.isTmpDir)) + + entries, err := os.ReadDir(testDir) + require.NoError(t, err) + + // Get actual remaining directories + var actualDirs []string + for _, entry := range entries { + if entry.IsDir() { + actualDirs = append(actualDirs, entry.Name()) + } + } + + require.ElementsMatch(t, tt.expectedDirs, actualDirs, "Remaining directories don't match expected") + }) + } +} + +func TestRemoveTmpDirs_NonExistentDirectory(t *testing.T) { + testDir := t.TempDir() + nonExistent := filepath.Join(testDir, "does_not_exist") + + require.NoError(t, RemoveTmpDirs(promslog.NewNopLogger(), nonExistent, func(_ fs.DirEntry) bool { + return true + })) +} diff --git a/tsdb/wlog/checkpoint.go b/tsdb/wlog/checkpoint.go index 6742141fbc..3a4e194fec 100644 --- a/tsdb/wlog/checkpoint.go +++ b/tsdb/wlog/checkpoint.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "io" + "io/fs" "log/slog" "math" "os" @@ -31,6 +32,7 @@ import ( "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/tombstones" + "github.com/prometheus/prometheus/tsdb/tsdbutil" ) // CheckpointStats returns stats about a created checkpoint. @@ -80,8 +82,16 @@ func DeleteCheckpoints(dir string, maxIndex int) error { return errors.Join(errs...) } -// CheckpointPrefix is the prefix used for checkpoint files. -const CheckpointPrefix = "checkpoint." +// checkpointTempFileSuffix is the suffix used when creating temporary checkpoint files. +const checkpointTempFileSuffix = ".tmp" + +// DeleteTempCheckpoints deletes all temporary checkpoint directories in the given directory. +func DeleteTempCheckpoints(logger *slog.Logger, dir string) error { + if err := tsdbutil.RemoveTmpDirs(logger, dir, isTempDir); err != nil { + return fmt.Errorf("remove previous temporary checkpoint dirs: %w", err) + } + return nil +} // Checkpoint creates a compacted checkpoint of segments in range [from, to] in the given WAL. // It includes the most recent checkpoint if it exists. @@ -123,13 +133,13 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He defer sgmReader.Close() } - cpdir := checkpointDir(w.Dir(), to) - cpdirtmp := cpdir + ".tmp" - - if err := os.RemoveAll(cpdirtmp); err != nil { - return nil, fmt.Errorf("remove previous temporary checkpoint dir: %w", err) + if err := DeleteTempCheckpoints(logger, w.Dir()); err != nil { + return nil, err } + cpdir := checkpointDir(w.Dir(), to) + cpdirtmp := cpdir + checkpointTempFileSuffix + if err := os.MkdirAll(cpdirtmp, 0o777); err != nil { return nil, fmt.Errorf("create checkpoint dir: %w", err) } @@ -394,8 +404,11 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He return stats, nil } +// checkpointPrefix is the prefix used for checkpoint files. +const checkpointPrefix = "checkpoint." + func checkpointDir(dir string, i int) string { - return filepath.Join(dir, fmt.Sprintf(CheckpointPrefix+"%08d", i)) + return filepath.Join(dir, fmt.Sprintf(checkpointPrefix+"%08d", i)) } type checkpointRef struct { @@ -411,13 +424,13 @@ func listCheckpoints(dir string) (refs []checkpointRef, err error) { for i := range files { fi := files[i] - if !strings.HasPrefix(fi.Name(), CheckpointPrefix) { + if !strings.HasPrefix(fi.Name(), checkpointPrefix) { continue } if !fi.IsDir() { return nil, fmt.Errorf("checkpoint %s is not a directory", fi.Name()) } - idx, err := strconv.Atoi(fi.Name()[len(CheckpointPrefix):]) + idx, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) if err != nil { continue } @@ -431,3 +444,7 @@ func listCheckpoints(dir string) (refs []checkpointRef, err error) { return refs, nil } + +func isTempDir(fi fs.DirEntry) bool { + return strings.HasPrefix(fi.Name(), checkpointPrefix) && strings.HasSuffix(fi.Name(), checkpointTempFileSuffix) +} diff --git a/tsdb/wlog/checkpoint_test.go b/tsdb/wlog/checkpoint_test.go index 97ca2e768d..a348239ec7 100644 --- a/tsdb/wlog/checkpoint_test.go +++ b/tsdb/wlog/checkpoint_test.go @@ -417,3 +417,81 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) { }) require.NoError(t, err) } + +func TestCheckpointDeletesTemporaryCheckpoints(t *testing.T) { + dir := t.TempDir() + + // Create one tmp checkpoint directory + require.NoError(t, os.MkdirAll(filepath.Join(dir, "checkpoint.00001000.tmp"), 0o777)) + + w, err := New(nil, nil, dir, compression.None) + require.NoError(t, err) + defer w.Close() + + _, err = Checkpoint(promslog.NewNopLogger(), w, 0, 1000, func(_ chunks.HeadSeriesRef) bool { return true }, 1000) + require.NoError(t, err) + + files, err := os.ReadDir(dir) + require.NoError(t, err) + + var actualDirectories []string + for _, f := range files { + if !f.IsDir() { + continue + } + actualDirectories = append(actualDirectories, f.Name()) + } + require.Equal(t, []string{"checkpoint.00001000"}, actualDirectories) +} + +func TestDeleteTempCheckpoints(t *testing.T) { + testCases := []struct { + name string + checkpointDirectoriesToCreate []string + expectedDirectories []string + }{ + { + name: "no tmp checkpoints", + checkpointDirectoriesToCreate: nil, + expectedDirectories: nil, + }, + { + name: "one tmp checkpoint", + checkpointDirectoriesToCreate: []string{"checkpoint.00001000.tmp"}, + expectedDirectories: nil, + }, + { + name: "many tmp checkpoints", + checkpointDirectoriesToCreate: []string{"checkpoint.00000001.tmp", "checkpoint.00001000.tmp", "checkpoint.00002000.tmp"}, + expectedDirectories: nil, + }, + { + name: "mix of tmp and regular checkpoints", + checkpointDirectoriesToCreate: []string{"checkpoint.00000001", "checkpoint.00000001.tmp", "checkpoint.00001000.tmp", "checkpoint.00002000"}, + expectedDirectories: []string{"checkpoint.00000001", "checkpoint.00002000"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dir := t.TempDir() + for _, fn := range tc.checkpointDirectoriesToCreate { + require.NoError(t, os.MkdirAll(filepath.Join(dir, fn), 0o777)) + } + + require.NoError(t, DeleteTempCheckpoints(promslog.NewNopLogger(), dir)) + + files, err := os.ReadDir(dir) + require.NoError(t, err) + + var actualDirectories []string + for _, f := range files { + if !f.IsDir() { + continue + } + actualDirectories = append(actualDirectories, f.Name()) + } + require.Equal(t, tc.expectedDirectories, actualDirectories) + }) + } +} diff --git a/util/treecache/treecache.go b/util/treecache/treecache.go index 32912c5a94..deb950b55a 100644 --- a/util/treecache/treecache.go +++ b/util/treecache/treecache.go @@ -265,8 +265,7 @@ func (tc *ZookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTr } } - tc.wg.Add(1) - go func() { + tc.wg.Go(func() { numWatchers.Inc() // Pass up zookeeper events, until the node is deleted. select { @@ -277,8 +276,7 @@ func (tc *ZookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTr case <-node.done: } numWatchers.Dec() - tc.wg.Done() - }() + }) return nil } diff --git a/web/ui/mantine-ui/src/promql/functionDocs.tsx b/web/ui/mantine-ui/src/promql/functionDocs.tsx index 4cc70a39e6..c7f744ba6f 100644 --- a/web/ui/mantine-ui/src/promql/functionDocs.tsx +++ b/web/ui/mantine-ui/src/promql/functionDocs.tsx @@ -1543,6 +1543,33 @@ const funcDocs: Record = {

), + histogram_quantiles: ( + <> +

+ + This function has to be enabled via the{" "} + feature flag + --enable-feature=promql-experimental-functions. + +

+ +

+ histogram_quantiles(v instant-vector, quantile_label string, φ_1 scalar, φ_2 scalar, ...){" "} + calculates multiple (between 1 and 10) φ-quantiles (0 ≤ φ ≤ 1) from a{" "} + classic histogram or from a native + histogram. Quantile calculation works the same way as in histogram_quantile(). The second argument + (a string) specifies the label name that is used to identify different quantiles in the query result. +

+ +
+        
+          histogram_quantiles(sum(rate(foo[1m])), "quantile", 0.9, 0.99) # => {"{"}quantile="0.9"
+          {"}"} 123
+          {"{"}quantile="0.99"{"}"} 128
+        
+      
+ + ), histogram_stddev: ( <>

diff --git a/web/ui/mantine-ui/src/promql/functionSignatures.ts b/web/ui/mantine-ui/src/promql/functionSignatures.ts index da21a2d4aa..837a271dce 100644 --- a/web/ui/mantine-ui/src/promql/functionSignatures.ts +++ b/web/ui/mantine-ui/src/promql/functionSignatures.ts @@ -69,6 +69,12 @@ export const functionSignatures: Record = { variadic: 0, returnType: valueType.vector, }, + histogram_quantiles: { + name: "histogram_quantiles", + argTypes: [valueType.vector, valueType.string, valueType.scalar, valueType.scalar], + variadic: 9, + returnType: valueType.vector, + }, histogram_stddev: { name: "histogram_stddev", argTypes: [valueType.vector], diff --git a/web/ui/module/codemirror-promql/src/complete/promql.terms.ts b/web/ui/module/codemirror-promql/src/complete/promql.terms.ts index 3670fffff7..68d7b06553 100644 --- a/web/ui/module/codemirror-promql/src/complete/promql.terms.ts +++ b/web/ui/module/codemirror-promql/src/complete/promql.terms.ts @@ -243,6 +243,12 @@ export const functionIdentifierTerms = [ info: 'Calculate quantiles from native histograms and from conventional histogram buckets', type: 'function', }, + { + label: 'histogram_quantiles', + detail: 'function', + info: 'Calculate multiple quantiles from native histograms and from conventional histogram buckets', + type: 'function', + }, { label: 'histogram_sum', detail: 'function', diff --git a/web/ui/module/codemirror-promql/src/types/function.ts b/web/ui/module/codemirror-promql/src/types/function.ts index cfbf3524b5..cc1c0524fb 100644 --- a/web/ui/module/codemirror-promql/src/types/function.ts +++ b/web/ui/module/codemirror-promql/src/types/function.ts @@ -44,6 +44,7 @@ import { HistogramCount, HistogramFraction, HistogramQuantile, + HistogramQuantiles, HistogramStdDev, HistogramStdVar, HistogramSum, @@ -306,6 +307,12 @@ const promqlFunctions: { [key: number]: PromQLFunction } = { variadic: 0, returnType: ValueType.vector, }, + [HistogramQuantiles]: { + name: 'histogram_quantiles', + argTypes: [ValueType.vector, ValueType.string, ValueType.scalar, ValueType.scalar], + variadic: 10, + returnType: ValueType.vector, + }, [HistogramStdDev]: { name: 'histogram_stddev', argTypes: [ValueType.vector], diff --git a/web/ui/module/lezer-promql/src/promql.grammar b/web/ui/module/lezer-promql/src/promql.grammar index 9308ad01be..e4308186bb 100644 --- a/web/ui/module/lezer-promql/src/promql.grammar +++ b/web/ui/module/lezer-promql/src/promql.grammar @@ -167,6 +167,7 @@ FunctionIdentifier { HistogramCount | HistogramFraction | HistogramQuantile | + HistogramQuantiles | HistogramStdDev | HistogramStdVar | HistogramSum | @@ -426,6 +427,7 @@ NumberDurationLiteralInDurationContext { HistogramCount { condFn<"histogram_count"> } HistogramFraction { condFn<"histogram_fraction"> } HistogramQuantile { condFn<"histogram_quantile"> } + HistogramQuantiles { condFn<"histogram_quantiles"> } HistogramStdDev { condFn<"histogram_stddev"> } HistogramStdVar { condFn<"histogram_stdvar"> } HistogramSum { condFn<"histogram_sum"> } diff --git a/web/web_test.go b/web/web_test.go index cbcf15ffdc..5ead252cbe 100644 --- a/web/web_test.go +++ b/web/web_test.go @@ -118,12 +118,10 @@ func TestReadyAndHealthy(t *testing.T) { } }() - // Give some time for the web goroutine to run since we need the server - // to be up before starting tests. - time.Sleep(5 * time.Second) - baseURL := "http://localhost" + port + waitForServerReady(t, baseURL, 5*time.Second) + resp, err := http.Get(baseURL + "/-/healthy") require.NoError(t, err) require.Equal(t, http.StatusOK, resp.StatusCode) @@ -256,12 +254,10 @@ func TestRoutePrefix(t *testing.T) { } }() - // Give some time for the web goroutine to run since we need the server - // to be up before starting tests. - time.Sleep(5 * time.Second) - baseURL := "http://localhost" + port + waitForServerReady(t, baseURL+opts.RoutePrefix, 5*time.Second) + resp, err := http.Get(baseURL + opts.RoutePrefix + "/-/healthy") require.NoError(t, err) require.Equal(t, http.StatusOK, resp.StatusCode) @@ -449,9 +445,9 @@ func TestShutdownWithStaleConnection(t *testing.T) { close(closed) }() - // Give some time for the web goroutine to run since we need the server - // to be up before starting tests. - time.Sleep(5 * time.Second) + baseURL := "http://localhost" + port + + waitForServerReady(t, baseURL, 5*time.Second) // Open a socket, and don't use it. This connection should then be closed // after the ReadTimeout. @@ -500,23 +496,19 @@ func TestHandleMultipleQuitRequests(t *testing.T) { close(closed) }() - // Give some time for the web goroutine to run since we need the server - // to be up before starting tests. - time.Sleep(5 * time.Second) - baseURL := opts.ExternalURL.Scheme + "://" + opts.ExternalURL.Host + waitForServerReady(t, baseURL, 5*time.Second) + start := make(chan struct{}) var wg sync.WaitGroup for range 3 { - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { <-start resp, err := http.Post(baseURL+"/-/quit", "", strings.NewReader("")) require.NoError(t, err) require.Equal(t, http.StatusOK, resp.StatusCode) - }() + }) } close(start) wg.Wait() @@ -578,11 +570,10 @@ func TestAgentAPIEndPoints(t *testing.T) { } }() - // Give some time for the web goroutine to run since we need the server - // to be up before starting tests. - time.Sleep(5 * time.Second) baseURL := "http://localhost" + port + "/api/v1" + waitForServerReady(t, "http://localhost"+port, 5*time.Second) + // Test for non-available endpoints in the Agent mode. for path, methods := range map[string][]string{ "/labels": {http.MethodGet, http.MethodPost}, @@ -711,9 +702,7 @@ func TestMultipleListenAddresses(t *testing.T) { } }() - // Give some time for the web goroutine to run since we need the server - // to be up before starting tests. - time.Sleep(5 * time.Second) + waitForServerReady(t, "http://localhost"+port1, 5*time.Second) // Set to ready. webHandler.SetReady(Ready) @@ -732,3 +721,24 @@ func TestMultipleListenAddresses(t *testing.T) { cleanupTestResponse(t, resp) } } + +// Give some time for the web goroutine to run since we need the server +// to be up before starting tests. +func waitForServerReady(t *testing.T, baseURL string, timeout time.Duration) { + t.Helper() + + interval := 100 * time.Millisecond + deadline := time.Now().Add(timeout) + + for time.Now().Before(deadline) { + resp, err := http.Get(baseURL + "/-/healthy") + if resp != nil { + cleanupTestResponse(t, resp) + } + if err == nil && resp.StatusCode == http.StatusOK { + return + } + time.Sleep(interval) + } + t.Fatalf("Server did not become ready within %v", timeout) +}