Merge branch 'main' into fix/issue-16638-subquery-totalsamples-undercount

This commit is contained in:
Varun Chawla 2026-02-18 21:21:47 -08:00 committed by GitHub
commit 4dfc6cb53a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
41 changed files with 1017 additions and 448 deletions

View file

@ -128,8 +128,6 @@ linters:
# Disable this check for now since it introduces too many changes in our existing codebase.
# See https://pkg.go.dev/golang.org/x/tools/go/analysis/passes/modernize#hdr-Analyzer_omitzero for more details.
- omitzero
# Disable waitgroup check until we really move to Go 1.25.
- waitgroup
perfsprint:
# Optimizes even if it requires an int or uint type cast.
int-conversion: true

View file

@ -6,6 +6,7 @@ General maintainers:
* Bryan Boreham (bjboreham@gmail.com / @bboreham)
* Ayoub Mrini (ayoubmrini424@gmail.com / @machine424)
* Julien Pivotto (roidelapluie@prometheus.io / @roidelapluie)
* György Krajcsovits (<gyorgy.krajcsovits@grafana.com> / @krajorama)
Maintainers for specific parts of the codebase:
* `cmd`
@ -18,7 +19,7 @@ Maintainers for specific parts of the codebase:
* `storage`
* `remote`: Callum Styan (<callumstyan@gmail.com> / @cstyan), Bartłomiej Płotka (<bwplotka@gmail.com> / @bwplotka), Tom Wilkie (tom.wilkie@gmail.com / @tomwilkie), Alex Greenbank (<alexgreenbank@yahoo.com> / @alexgreenbank)
* `otlptranslator`: Arthur Silva Sens (<arthursens2005@gmail.com> / @ArthurSens), Arve Knudsen (<arve.knudsen@gmail.com> / @aknuds1), Jesús Vázquez (<jesus.vazquez@grafana.com> / @jesusvazquez)
* `tsdb`: Ganesh Vernekar (<ganesh@grafana.com> / @codesome), Bartłomiej Płotka (<bwplotka@gmail.com> / @bwplotka), Jesús Vázquez (<jesus.vazquez@grafana.com> / @jesusvazquez), George Krajcsovits (<gyorgy.krajcsovits@grafana.com> / @krajorama)
* `tsdb`: Ganesh Vernekar (<ganesh@grafana.com> / @codesome), Bartłomiej Płotka (<bwplotka@gmail.com> / @bwplotka), Jesús Vázquez (<jesus.vazquez@grafana.com> / @jesusvazquez)
* `web`
* `ui`: Julius Volz (<julius.volz@gmail.com> / @juliusv)
* `module`: Augustin Husson (<husson.augustin@gmail.com> / @nexucis)

View file

@ -80,6 +80,7 @@
"histogram_count": true,
"histogram_fraction": true,
"histogram_quantile": true,
"histogram_quantiles": false,
"histogram_stddev": true,
"histogram_stdvar": true,
"histogram_sum": true,

View file

@ -372,7 +372,7 @@ func main() {
os.Exit(CheckSD(*sdConfigFile, *sdJobName, *sdTimeout, prometheus.DefaultRegisterer))
case checkConfigCmd.FullCommand():
os.Exit(CheckConfig(*agentMode, *checkConfigSyntaxOnly, newConfigLintConfig(*checkConfigLint, *checkConfigLintFatal, *checkConfigIgnoreUnknownFields, model.UTF8Validation, model.Duration(*checkLookbackDelta)), *configFiles...))
os.Exit(CheckConfig(*agentMode, *checkConfigSyntaxOnly, newConfigLintConfig(*checkConfigLint, *checkConfigLintFatal, *checkConfigIgnoreUnknownFields, model.UTF8Validation, model.Duration(*checkLookbackDelta)), promtoolParser, *configFiles...))
case checkServerHealthCmd.FullCommand():
os.Exit(checkErr(CheckServerStatus(serverURL, checkHealth, httpRoundTripper)))
@ -598,7 +598,7 @@ func CheckServerStatus(serverURL *url.URL, checkEndpoint string, roundTripper ht
}
// CheckConfig validates configuration files.
func CheckConfig(agentMode, checkSyntaxOnly bool, lintSettings configLintConfig, files ...string) int {
func CheckConfig(agentMode, checkSyntaxOnly bool, lintSettings configLintConfig, p parser.Parser, files ...string) int {
failed := false
hasErrors := false
@ -619,7 +619,7 @@ func CheckConfig(agentMode, checkSyntaxOnly bool, lintSettings configLintConfig,
if !checkSyntaxOnly {
scrapeConfigsFailed := lintScrapeConfigs(scrapeConfigs, lintSettings)
failed = failed || scrapeConfigsFailed
rulesFailed, rulesHaveErrors := checkRules(ruleFiles, lintSettings.rulesLintConfig, parser.NewParser(parser.Options{}))
rulesFailed, rulesHaveErrors := checkRules(ruleFiles, lintSettings.rulesLintConfig, p)
failed = failed || rulesFailed
hasErrors = hasErrors || rulesHaveErrors
}

View file

@ -706,20 +706,21 @@ func TestCheckScrapeConfigs(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
// Non-fatal linting.
code := CheckConfig(false, false, newConfigLintConfig(lintOptionTooLongScrapeInterval, false, false, model.UTF8Validation, tc.lookbackDelta), "./testdata/prometheus-config.lint.too_long_scrape_interval.yml")
p := parser.NewParser(parser.Options{})
code := CheckConfig(false, false, newConfigLintConfig(lintOptionTooLongScrapeInterval, false, false, model.UTF8Validation, tc.lookbackDelta), p, "./testdata/prometheus-config.lint.too_long_scrape_interval.yml")
require.Equal(t, successExitCode, code, "Non-fatal linting should return success")
// Fatal linting.
code = CheckConfig(false, false, newConfigLintConfig(lintOptionTooLongScrapeInterval, true, false, model.UTF8Validation, tc.lookbackDelta), "./testdata/prometheus-config.lint.too_long_scrape_interval.yml")
code = CheckConfig(false, false, newConfigLintConfig(lintOptionTooLongScrapeInterval, true, false, model.UTF8Validation, tc.lookbackDelta), p, "./testdata/prometheus-config.lint.too_long_scrape_interval.yml")
if tc.expectError {
require.Equal(t, lintErrExitCode, code, "Fatal linting should return error")
} else {
require.Equal(t, successExitCode, code, "Fatal linting should return success when there are no problems")
}
// Check syntax only, no linting.
code = CheckConfig(false, true, newConfigLintConfig(lintOptionTooLongScrapeInterval, true, false, model.UTF8Validation, tc.lookbackDelta), "./testdata/prometheus-config.lint.too_long_scrape_interval.yml")
code = CheckConfig(false, true, newConfigLintConfig(lintOptionTooLongScrapeInterval, true, false, model.UTF8Validation, tc.lookbackDelta), p, "./testdata/prometheus-config.lint.too_long_scrape_interval.yml")
require.Equal(t, successExitCode, code, "Fatal linting should return success when checking syntax only")
// Lint option "none" should disable linting.
code = CheckConfig(false, false, newConfigLintConfig(lintOptionNone+","+lintOptionTooLongScrapeInterval, true, false, model.UTF8Validation, tc.lookbackDelta), "./testdata/prometheus-config.lint.too_long_scrape_interval.yml")
code = CheckConfig(false, false, newConfigLintConfig(lintOptionNone+","+lintOptionTooLongScrapeInterval, true, false, model.UTF8Validation, tc.lookbackDelta), p, "./testdata/prometheus-config.lint.too_long_scrape_interval.yml")
require.Equal(t, successExitCode, code, `Fatal linting should return success when lint option "none" is specified`)
})
}

View file

@ -159,17 +159,14 @@ func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (u
batch := lbls[:l]
lbls = lbls[l:]
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
n, err := b.ingestScrapesShard(batch, 100, int64(timeDelta*i))
if err != nil {
// exitWithError(err)
fmt.Println(" err", err)
}
total.Add(n)
}()
})
}
wg.Wait()
}

View file

@ -255,6 +255,7 @@ func (tg *testGroup) test(testname string, evalInterval time.Duration, groupOrde
Context: context.Background(),
NotifyFunc: func(context.Context, string, ...*rules.Alert) {},
Logger: promslog.NewNopLogger(),
Parser: tg.parser,
}
m := rules.NewManager(opts)
groupsMap, ers := m.LoadGroups(time.Duration(tg.Interval), tg.ExternalLabels, tg.ExternalURL, nil, ignoreUnknownFields, ruleFiles...)

View file

@ -1562,11 +1562,9 @@ func TestConfigReloadAndShutdownRace(t *testing.T) {
discoveryManager.updatert = 100 * time.Millisecond
var wgDiscovery sync.WaitGroup
wgDiscovery.Add(1)
go func() {
wgDiscovery.Go(func() {
discoveryManager.Run()
wgDiscovery.Done()
}()
})
time.Sleep(time.Millisecond * 200)
var wgBg sync.WaitGroup
@ -1588,11 +1586,9 @@ func TestConfigReloadAndShutdownRace(t *testing.T) {
discoveryManager.ApplyConfig(c)
delete(c, "prometheus")
wgBg.Add(1)
go func() {
wgBg.Go(func() {
discoveryManager.ApplyConfig(c)
wgBg.Done()
}()
})
mgrCancel()
wgDiscovery.Wait()

View file

@ -433,6 +433,23 @@ and is therefore flagged by an info-level annotation reading `input to
histogram_quantile needed to be fixed for monotonicity`. If you encounter this
annotation, you should find and remove the source of the invalid data.
## `histogram_quantiles()`
**This function has to be enabled via the [feature
flag](../feature_flags.md#experimental-promql-functions)
`--enable-feature=promql-experimental-functions`.**
`histogram_quantiles(v instant-vector, quantile_label string, φ_1 scalar, φ_2 scalar, ...)` calculates multiple (between 1 and 10) φ-quantiles (0 ≤
φ ≤ 1) from a [classic
histogram](https://prometheus.io/docs/concepts/metric_types/#histogram) or from
a native histogram. Quantile calculation works the same way as in `histogram_quantile()`.
The second argument (a string) specifies the label name that is used to identify different quantiles in the query result.
```
histogram_quantiles(sum(rate(foo[1m])), "quantile", 0.9, 0.99)
# => {quantile="0.9"} 123
{quantile="0.99"} 128
```
## `histogram_stddev()` and `histogram_stdvar()`
`histogram_stddev(v instant-vector)` returns the estimated standard deviation

View file

@ -1214,6 +1214,9 @@ type EvalNodeHelper struct {
// funcHistogramQuantile and funcHistogramFraction for classic histograms.
signatureToMetricWithBuckets map[string]*metricWithBuckets
nativeHistogramSamples []Sample
// funcHistogramQuantiles for histograms.
quantileStrs map[float64]string
signatureToLabelsWithQuantile map[string]map[float64]labels.Labels
lb *labels.Builder
lblBuf []byte
@ -1305,6 +1308,35 @@ func (enh *EvalNodeHelper) resetHistograms(inVec Vector, arg parser.Expr) annota
return annos
}
func (enh *EvalNodeHelper) getOrCreateLblsWithQuantile(lbls labels.Labels, quantileLabel string, q float64) labels.Labels {
if enh.signatureToLabelsWithQuantile == nil {
enh.signatureToLabelsWithQuantile = make(map[string]map[float64]labels.Labels)
}
enh.lblBuf = lbls.Bytes(enh.lblBuf)
cachedLbls, ok := enh.signatureToLabelsWithQuantile[string(enh.lblBuf)]
if !ok {
cachedLbls = make(map[float64]labels.Labels, len(enh.quantileStrs))
enh.signatureToLabelsWithQuantile[string(enh.lblBuf)] = cachedLbls
}
cachedLblsWithQuantile, ok := cachedLbls[q]
if !ok {
quantileStr := "NaN"
if !math.IsNaN(q) {
// Cannot do map lookup by NaN key.
quantileStr = enh.quantileStrs[q]
}
cachedLblsWithQuantile = labels.NewBuilder(lbls).
Set(quantileLabel, quantileStr).
Labels()
cachedLbls[q] = cachedLblsWithQuantile
}
return cachedLblsWithQuantile
}
// rangeEval evaluates the given expressions, and then for each step calls
// the given funcCall with the values computed for each expression at that
// step. The return value is the combination into time series of all the
@ -4332,7 +4364,7 @@ func detectHistogramStatsDecoding(expr parser.Expr) {
// further up (the latter wouldn't make sense,
// but no harm in detecting it).
n.SkipHistogramBuckets = true
case "histogram_quantile", "histogram_fraction":
case "histogram_quantile", "histogram_quantiles", "histogram_fraction":
// If we ever see a function that needs the
// whole histogram, we will not skip the
// buckets.

View file

@ -94,11 +94,9 @@ func TestQueryConcurrency(t *testing.T) {
var wg sync.WaitGroup
for range maxConcurrency {
q := engine.NewTestQuery(f)
wg.Add(1)
go func() {
wg.Go(func() {
q.Exec(ctx)
wg.Done()
}()
})
select {
case <-processing:
// Expected.
@ -108,11 +106,9 @@ func TestQueryConcurrency(t *testing.T) {
}
q := engine.NewTestQuery(f)
wg.Add(1)
go func() {
wg.Go(func() {
q.Exec(ctx)
wg.Done()
}()
})
select {
case <-processing:

View file

@ -1720,8 +1720,8 @@ func funcHistogramQuantile(vectorVals []Vector, _ Matrix, args parser.Expression
inVec := vectorVals[1]
var annos annotations.Annotations
if math.IsNaN(q) || q < 0 || q > 1 {
annos.Add(annotations.NewInvalidQuantileWarning(q, args[0].PositionRange()))
if err := validateQuantile(q, args[0]); err != nil {
annos.Add(err)
}
annos.Merge(enh.resetHistograms(inVec, args[1]))
@ -1770,6 +1770,89 @@ func funcHistogramQuantile(vectorVals []Vector, _ Matrix, args parser.Expression
return enh.Out, annos
}
func validateQuantile(q float64, arg parser.Expr) error {
if math.IsNaN(q) || q < 0 || q > 1 {
return annotations.NewInvalidQuantileWarning(q, arg.PositionRange())
}
return nil
}
// === histogram_quantiles(Vector parser.ValueTypeVector, label parser.ValueTypeString, q0 parser.ValueTypeScalar, qs parser.ValueTypeScalar...) (Vector, Annotations) ===
func funcHistogramQuantiles(vectorVals []Vector, _ Matrix, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
var (
inVec = vectorVals[0]
quantileLabel = args[1].(*parser.StringLiteral).Val
numQuantiles = len(vectorVals[2:])
qs = make([]float64, 0, numQuantiles)
annos annotations.Annotations
)
if enh.quantileStrs == nil {
enh.quantileStrs = make(map[float64]string, numQuantiles)
}
for i := 2; i < len(vectorVals); i++ {
q := vectorVals[i][0].F
if err := validateQuantile(q, args[i]); err != nil {
annos.Add(err)
}
if _, ok := enh.quantileStrs[q]; !ok {
enh.quantileStrs[q] = labels.FormatOpenMetricsFloat(q)
}
qs = append(qs, q)
}
annos.Merge(enh.resetHistograms(inVec, args[0]))
for _, q := range qs {
// Deal with the native histograms.
for _, sample := range enh.nativeHistogramSamples {
if sample.H == nil {
// Native histogram conflicts with classic histogram at the same timestamp, ignore.
continue
}
if !enh.enableDelayedNameRemoval {
sample.Metric = sample.Metric.DropReserved(schema.IsMetadataLabel)
}
hq, hqAnnos := HistogramQuantile(q, sample.H, sample.Metric.Get(model.MetricNameLabel), args[0].PositionRange())
annos.Merge(hqAnnos)
enh.Out = append(enh.Out, Sample{
Metric: enh.getOrCreateLblsWithQuantile(sample.Metric, quantileLabel, q),
F: hq,
DropName: true,
})
}
// Deal with classic histograms that have already been filtered for conflicting native histograms.
for _, mb := range enh.signatureToMetricWithBuckets {
if len(mb.buckets) > 0 {
hq, forcedMonotonicity, _, minBucket, maxBucket, maxDiff := BucketQuantile(q, mb.buckets)
if forcedMonotonicity {
metricName := ""
if enh.enableDelayedNameRemoval {
metricName = getMetricName(mb.metric)
}
annos.Add(annotations.NewHistogramQuantileForcedMonotonicityInfo(metricName, args[1].PositionRange(), enh.Ts, minBucket, maxBucket, maxDiff))
}
if !enh.enableDelayedNameRemoval {
mb.metric = mb.metric.DropReserved(schema.IsMetadataLabel)
}
enh.Out = append(enh.Out, Sample{
Metric: enh.getOrCreateLblsWithQuantile(mb.metric, quantileLabel, q),
F: hq,
DropName: true,
})
}
}
}
return enh.Out, annos
}
// pickFirstSampleIndex returns the index of the last sample before
// or at the range start, or 0 if none exist before the range start.
// If the vector selector is not anchored, it always returns 0, true.
@ -2100,6 +2183,7 @@ var FunctionCalls = map[string]FunctionCall{
"histogram_count": funcHistogramCount,
"histogram_fraction": funcHistogramFraction,
"histogram_quantile": funcHistogramQuantile,
"histogram_quantiles": funcHistogramQuantiles,
"histogram_sum": funcHistogramSum,
"histogram_stddev": funcHistogramStdDev,
"histogram_stdvar": funcHistogramStdVar,

View file

@ -205,6 +205,13 @@ var Functions = map[string]*Function{
ArgTypes: []ValueType{ValueTypeScalar, ValueTypeVector},
ReturnType: ValueTypeVector,
},
"histogram_quantiles": {
Name: "histogram_quantiles",
ArgTypes: []ValueType{ValueTypeVector, ValueTypeString, ValueTypeScalar, ValueTypeScalar},
Variadic: 9,
ReturnType: ValueTypeVector,
Experimental: true,
},
"double_exponential_smoothing": {
Name: "double_exponential_smoothing",
ArgTypes: []ValueType{ValueTypeMatrix, ValueTypeScalar, ValueTypeScalar},

View file

@ -598,6 +598,40 @@ eval instant at 50m histogram_quantile(1, testhistogram3_bucket)
{start="positive"} 1
{start="negative"} -0.1
eval instant at 50m histogram_quantiles(testhistogram3, "q", 0, 0.25, 0.5, 0.75, 1)
expect no_warn
{q="0.0", start="positive"} 0
{q="0.0", start="negative"} -0.25
{q="0.25", start="positive"} 0.055
{q="0.25", start="negative"} -0.225
{q="0.5", start="positive"} 0.125
{q="0.5", start="negative"} -0.2
{q="0.75", start="positive"} 0.45
{q="0.75", start="negative"} -0.15
{q="1.0", start="positive"} 1
{q="1.0", start="negative"} -0.1
eval instant at 50m histogram_quantiles(testhistogram3_bucket, "q", 0, 0.25, 0.5, 0.75, 1)
expect no_warn
{q="0.0", start="positive"} 0
{q="0.0", start="negative"} -0.25
{q="0.25", start="positive"} 0.055
{q="0.25", start="negative"} -0.225
{q="0.5", start="positive"} 0.125
{q="0.5", start="negative"} -0.2
{q="0.75", start="positive"} 0.45
{q="0.75", start="negative"} -0.15
{q="1.0", start="positive"} 1
{q="1.0", start="negative"} -0.1
# Break label set uniqueness.
eval instant at 50m histogram_quantiles(testhistogram3, "start", 0, 0.25, 0.5, 0.75, 1)
expect fail
eval instant at 50m histogram_quantiles(testhistogram3_bucket, "start", 0, 0.25, 0.5, 0.75, 1)
expect fail
# Quantile too low.
eval instant at 50m histogram_quantile(-0.1, testhistogram)
@ -610,6 +644,16 @@ eval instant at 50m histogram_quantile(-0.1, testhistogram_bucket)
{start="positive"} -Inf
{start="negative"} -Inf
eval instant at 50m histogram_quantiles(testhistogram, "q", -0.1)
expect warn
{q="-0.1", start="positive"} -Inf
{q="-0.1", start="negative"} -Inf
eval instant at 50m histogram_quantiles(testhistogram_bucket, "q", -0.1)
expect warn
{q="-0.1", start="positive"} -Inf
{q="-0.1", start="negative"} -Inf
# Quantile too high.
eval instant at 50m histogram_quantile(1.01, testhistogram)
@ -622,6 +666,16 @@ eval instant at 50m histogram_quantile(1.01, testhistogram_bucket)
{start="positive"} +Inf
{start="negative"} +Inf
eval instant at 50m histogram_quantiles(testhistogram, "q", 1.01)
expect warn
{q="1.01", start="positive"} +Inf
{q="1.01", start="negative"} +Inf
eval instant at 50m histogram_quantiles(testhistogram_bucket, "q", 1.01)
expect warn
{q="1.01", start="positive"} +Inf
{q="1.01", start="negative"} +Inf
# Quantile invalid.
eval instant at 50m histogram_quantile(NaN, testhistogram)
@ -634,9 +688,22 @@ eval instant at 50m histogram_quantile(NaN, testhistogram_bucket)
{start="positive"} NaN
{start="negative"} NaN
eval instant at 50m histogram_quantiles(testhistogram, "q", NaN)
expect warn
{q="NaN", start="positive"} NaN
{q="NaN", start="negative"} NaN
eval instant at 50m histogram_quantiles(testhistogram_bucket, "q", NaN)
expect warn
{q="NaN", start="positive"} NaN
{q="NaN", start="negative"} NaN
eval instant at 50m histogram_quantile(NaN, non_existent)
expect warn msg: PromQL warning: quantile value should be between 0 and 1, got NaN
eval instant at 50m histogram_quantiles(non_existent, "q", NaN)
expect warn msg: PromQL warning: quantile value should be between 0 and 1, got NaN
# Quantile value in lowest bucket.
eval instant at 50m histogram_quantile(0, testhistogram)
@ -967,6 +1034,12 @@ eval instant at 50m histogram_quantile(0.99, nonmonotonic_bucket)
expect info
{} 979.75
eval instant at 50m histogram_quantiles(nonmonotonic_bucket, "q", 0.01, 0.5, 0.99)
expect info
{q="0.01"} 0.0045
{q="0.5"} 8.5
{q="0.99"} 979.75
# Buckets with different representations of the same upper bound.
eval instant at 50m histogram_quantile(0.5, rate(mixed_bucket[10m]))
{instance="ins1", job="job1"} 0.15
@ -1002,9 +1075,15 @@ load_with_nhcb 5m
eval instant at 50m histogram_quantile(0.99, {__name__=~"request_duration_seconds\\d*_bucket"})
expect fail
eval instant at 50m histogram_quantiles({__name__=~"request_duration_seconds\\d*_bucket"}, "q", 0.99)
expect fail
eval instant at 50m histogram_quantile(0.99, {__name__=~"request_duration_seconds\\d*"})
expect fail
eval instant at 50m histogram_quantiles({__name__=~"request_duration_seconds\\d*"}, "q", 0.99)
expect fail
# Histogram with constant buckets.
load_with_nhcb 1m
const_histogram_bucket{le="0.0"} 1 1 1 1 1
@ -1066,7 +1145,7 @@ eval instant at 10m histogram_sum(increase(histogram_with_reset[15m]))
clear
# Test histogram_quantile and histogram_fraction with conflicting classic and native histograms.
# Test histogram_quantile(s) and histogram_fraction with conflicting classic and native histograms.
load 1m
series{host="a"} {{schema:0 sum:5 count:4 buckets:[9 2 1]}}
series{host="a", le="0.1"} 2
@ -1081,6 +1160,11 @@ eval instant at 0 histogram_quantile(0.8, series)
expect warn msg: PromQL warning: vector contains a mix of classic and native histograms for metric name "series"
# Should return no results.
eval instant at 0 histogram_quantiles(series, "q", 0.1, 0.2)
expect no_info
expect warn msg: PromQL warning: vector contains a mix of classic and native histograms for metric name "series"
# Should return no results.
eval instant at 0 histogram_fraction(-Inf, 1, series)
expect no_info
expect warn msg: PromQL warning: vector contains a mix of classic and native histograms for metric name "series"

View file

@ -55,6 +55,10 @@ eval instant at 1m histogram_quantile(0.5, single_histogram)
expect no_info
{} 1.414213562373095
eval instant at 1m histogram_quantiles(single_histogram, "q", 0.5)
expect no_info
{q="0.5"} 1.414213562373095
clear
# Repeat the same histogram 10 times.
@ -1605,6 +1609,11 @@ eval instant at 1m histogram_quantile(0.81, histogram_nan)
{case="100% NaNs"} NaN
{case="20% NaNs"} NaN
eval instant at 1m histogram_quantiles(histogram_nan, "q", 0.81)
expect info msg: PromQL info: input to histogram_quantile has NaN observations, result is NaN for metric name "histogram_nan"
{case="100% NaNs", q="0.81"} NaN
{case="20% NaNs", q="0.81"} NaN
eval instant at 1m histogram_quantile(0.8, histogram_nan{case="100% NaNs"})
expect info msg: PromQL info: input to histogram_quantile has NaN observations, result is NaN for metric name "histogram_nan"
{case="100% NaNs"} NaN
@ -1891,6 +1900,9 @@ eval instant at 1m histogram_quantile(0.5, myHistogram2)
eval instant at 1m histogram_quantile(0.5, mixedHistogram)
expect warn msg: PromQL warning: vector contains a mix of classic and native histograms for metric name "mixedHistogram"
eval instant at 1m histogram_quantiles(mixedHistogram, "q", 0.5)
expect warn msg: PromQL warning: vector contains a mix of classic and native histograms for metric name "mixedHistogram"
clear
# A counter reset only in a bucket. Sub-queries still need to detect
@ -1960,6 +1972,9 @@ eval instant at 1m histogram_count(histogram unless histogram_quantile(0.5, hist
eval instant at 1m histogram_quantile(0.5, histogram unless histogram_count(histogram) == 0)
{} 3.1748021039363987
eval instant at 1m histogram_quantiles(histogram unless histogram_count(histogram) == 0, "q", 0.5)
{q="0.5"} 3.1748021039363987
clear
# Regression test for:

View file

@ -49,6 +49,7 @@ import (
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/util/teststorage"
prom_testutil "github.com/prometheus/prometheus/util/testutil"
"github.com/prometheus/prometheus/util/testutil/synctest"
)
func TestMain(m *testing.M) {
@ -2010,306 +2011,306 @@ func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) {
func TestAsyncRuleEvaluation(t *testing.T) {
t.Run("synchronous evaluation with independent rules", func(t *testing.T) {
t.Parallel()
storage := teststorage.New(t)
synctest.Test(t, func(t *testing.T) {
storage := teststorage.New(t)
inflightQueries := atomic.Int32{}
maxInflight := atomic.Int32{}
inflightQueries := atomic.Int32{}
maxInflight := atomic.Int32{}
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
ctx := t.Context()
ruleManager := NewManager(optsFactory(storage, &maxInflight, &inflightQueries, 0))
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple.yaml"}...)
require.Empty(t, errs)
require.Len(t, groups, 1)
ruleManager := NewManager(optsFactory(storage, &maxInflight, &inflightQueries, 0))
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple.yaml"}...)
require.Empty(t, errs)
require.Len(t, groups, 1)
expectedRuleCount := 6
expectedSampleCount := 4
expectedRuleCount := 6
expectedSampleCount := 4
for _, group := range groups {
require.Len(t, group.rules, expectedRuleCount)
for _, group := range groups {
require.Len(t, group.rules, expectedRuleCount)
start := time.Now()
DefaultEvalIterationFunc(ctx, group, start)
start := time.Now()
DefaultEvalIterationFunc(ctx, group, start)
// Expected evaluation order
order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group)
require.Nil(t, order)
// Expected evaluation order
order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group)
require.Nil(t, order)
// Never expect more than 1 inflight query at a time.
require.EqualValues(t, 1, maxInflight.Load())
// Each rule should take at least 1 second to execute sequentially.
require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds())
// Each recording rule produces one vector.
require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples))
// Group duration is higher than the sum of rule durations (group overhead).
require.GreaterOrEqual(t, group.GetEvaluationTime(), group.GetRuleEvaluationTimeSum())
}
// Never expect more than 1 inflight query at a time.
require.EqualValues(t, 1, maxInflight.Load())
// Each rule should take at least 1 second to execute sequentially.
require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds())
// Each recording rule produces one vector.
require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples))
// Group duration is higher than the sum of rule durations (group overhead).
require.GreaterOrEqual(t, group.GetEvaluationTime(), group.GetRuleEvaluationTimeSum())
}
})
})
t.Run("asynchronous evaluation with independent and dependent rules", func(t *testing.T) {
t.Parallel()
storage := teststorage.New(t)
synctest.Test(t, func(t *testing.T) {
storage := teststorage.New(t)
inflightQueries := atomic.Int32{}
maxInflight := atomic.Int32{}
inflightQueries := atomic.Int32{}
maxInflight := atomic.Int32{}
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
ctx := t.Context()
expectedRuleCount := 6
expectedSampleCount := 4
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
expectedRuleCount := 6
expectedSampleCount := 4
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
// Configure concurrency settings.
opts.ConcurrentEvalsEnabled = true
opts.MaxConcurrentEvals = 2
opts.RuleConcurrencyController = nil
ruleManager := NewManager(opts)
// Configure concurrency settings.
opts.ConcurrentEvalsEnabled = true
opts.MaxConcurrentEvals = 2
opts.RuleConcurrencyController = nil
ruleManager := NewManager(opts)
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple.yaml"}...)
require.Empty(t, errs)
require.Len(t, groups, 1)
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple.yaml"}...)
require.Empty(t, errs)
require.Len(t, groups, 1)
for _, group := range groups {
require.Len(t, group.rules, expectedRuleCount)
for _, group := range groups {
require.Len(t, group.rules, expectedRuleCount)
start := time.Now()
DefaultEvalIterationFunc(ctx, group, start)
start := time.Now()
DefaultEvalIterationFunc(ctx, group, start)
// Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals.
require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load())
// Some rules should execute concurrently so should complete quicker.
require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds())
// Each recording rule produces one vector.
require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples))
}
// Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals.
require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load())
// Some rules should execute concurrently so should complete quicker.
require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds())
// Each recording rule produces one vector.
require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples))
}
})
})
t.Run("asynchronous evaluation of all independent rules, insufficient concurrency", func(t *testing.T) {
t.Parallel()
storage := teststorage.New(t)
synctest.Test(t, func(t *testing.T) {
storage := teststorage.New(t)
inflightQueries := atomic.Int32{}
maxInflight := atomic.Int32{}
inflightQueries := atomic.Int32{}
maxInflight := atomic.Int32{}
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
ctx := t.Context()
expectedRuleCount := 8
expectedSampleCount := expectedRuleCount
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
expectedRuleCount := 8
expectedSampleCount := expectedRuleCount
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
// Configure concurrency settings.
opts.ConcurrentEvalsEnabled = true
opts.MaxConcurrentEvals = 2
opts.RuleConcurrencyController = nil
ruleManager := NewManager(opts)
// Configure concurrency settings.
opts.ConcurrentEvalsEnabled = true
opts.MaxConcurrentEvals = 2
opts.RuleConcurrencyController = nil
ruleManager := NewManager(opts)
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_independent.yaml"}...)
require.Empty(t, errs)
require.Len(t, groups, 1)
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_independent.yaml"}...)
require.Empty(t, errs)
require.Len(t, groups, 1)
for _, group := range groups {
require.Len(t, group.rules, expectedRuleCount)
for _, group := range groups {
require.Len(t, group.rules, expectedRuleCount)
start := time.Now()
DefaultEvalIterationFunc(ctx, group, start)
start := time.Now()
DefaultEvalIterationFunc(ctx, group, start)
// Expected evaluation order (isn't affected by concurrency settings)
order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group)
require.Equal(t, []ConcurrentRules{
{0, 1, 2, 3, 4, 5, 6, 7},
}, order)
// Expected evaluation order (isn't affected by concurrency settings)
order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group)
require.Equal(t, []ConcurrentRules{
{0, 1, 2, 3, 4, 5, 6, 7},
}, order)
// Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals.
require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load())
// Some rules should execute concurrently so should complete quicker.
require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds())
// Each recording rule produces one vector.
require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples))
}
// Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals.
require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load())
// Some rules should execute concurrently so should complete quicker.
require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds())
// Each recording rule produces one vector.
require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples))
}
})
})
t.Run("asynchronous evaluation of all independent rules, sufficient concurrency", func(t *testing.T) {
t.Parallel()
storage := teststorage.New(t)
synctest.Test(t, func(t *testing.T) {
storage := teststorage.New(t)
inflightQueries := atomic.Int32{}
maxInflight := atomic.Int32{}
inflightQueries := atomic.Int32{}
maxInflight := atomic.Int32{}
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
ctx := t.Context()
expectedRuleCount := 8
expectedSampleCount := expectedRuleCount
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
expectedRuleCount := 8
expectedSampleCount := expectedRuleCount
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
// Configure concurrency settings.
opts.ConcurrentEvalsEnabled = true
opts.MaxConcurrentEvals = int64(expectedRuleCount) * 2
opts.RuleConcurrencyController = nil
ruleManager := NewManager(opts)
// Configure concurrency settings.
opts.ConcurrentEvalsEnabled = true
opts.MaxConcurrentEvals = int64(expectedRuleCount) * 2
opts.RuleConcurrencyController = nil
ruleManager := NewManager(opts)
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_independent.yaml"}...)
require.Empty(t, errs)
require.Len(t, groups, 1)
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_independent.yaml"}...)
require.Empty(t, errs)
require.Len(t, groups, 1)
for _, group := range groups {
require.Len(t, group.rules, expectedRuleCount)
for _, group := range groups {
require.Len(t, group.rules, expectedRuleCount)
start := time.Now()
DefaultEvalIterationFunc(ctx, group, start)
// Expected evaluation order
order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group)
require.Equal(t, []ConcurrentRules{
{0, 1, 2, 3, 4, 5, 6, 7},
}, order)
// Max inflight can be up to MaxConcurrentEvals concurrent evals, since there is sufficient concurrency to run all rules at once.
require.LessOrEqual(t, int64(maxInflight.Load()), opts.MaxConcurrentEvals)
// Some rules should execute concurrently so should complete quicker.
require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds())
// Each recording rule produces one vector.
require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples))
// Group duration is less than the sum of rule durations
require.Less(t, group.GetEvaluationTime(), group.GetRuleEvaluationTimeSum())
}
})
})
t.Run("asynchronous evaluation of independent rules, with indeterminate. Should be synchronous", func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
storage := teststorage.New(t)
inflightQueries := atomic.Int32{}
maxInflight := atomic.Int32{}
ctx := t.Context()
ruleCount := 7
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
// Configure concurrency settings.
opts.ConcurrentEvalsEnabled = true
opts.MaxConcurrentEvals = int64(ruleCount) * 2
opts.RuleConcurrencyController = nil
ruleManager := NewManager(opts)
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_indeterminates.yaml"}...)
require.Empty(t, errs)
require.Len(t, groups, 1)
for _, group := range groups {
require.Len(t, group.rules, ruleCount)
start := time.Now()
group.Eval(ctx, start)
// Never expect more than 1 inflight query at a time.
require.EqualValues(t, 1, maxInflight.Load())
// Each rule should take at least 1 second to execute sequentially.
require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
// Each rule produces one vector.
require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
}
})
})
t.Run("asynchronous evaluation of rules that benefit from reordering", func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
storage := teststorage.New(t)
inflightQueries := atomic.Int32{}
maxInflight := atomic.Int32{}
ctx := t.Context()
ruleCount := 8
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
// Configure concurrency settings.
opts.ConcurrentEvalsEnabled = true
opts.MaxConcurrentEvals = int64(ruleCount) * 2
opts.RuleConcurrencyController = nil
ruleManager := NewManager(opts)
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_dependents_on_base.yaml"}...)
require.Empty(t, errs)
require.Len(t, groups, 1)
var group *Group
for _, g := range groups {
group = g
}
start := time.Now()
DefaultEvalIterationFunc(ctx, group, start)
// Expected evaluation order
order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group)
require.Equal(t, []ConcurrentRules{
{0, 1, 2, 3, 4, 5, 6, 7},
{0, 4},
{1, 2, 3, 5, 6, 7},
}, order)
// Max inflight can be up to MaxConcurrentEvals concurrent evals, since there is sufficient concurrency to run all rules at once.
require.LessOrEqual(t, int64(maxInflight.Load()), opts.MaxConcurrentEvals)
// Some rules should execute concurrently so should complete quicker.
require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds())
// Each recording rule produces one vector.
require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples))
// Group duration is less than the sum of rule durations
require.Less(t, group.GetEvaluationTime(), group.GetRuleEvaluationTimeSum())
}
})
t.Run("asynchronous evaluation of independent rules, with indeterminate. Should be synchronous", func(t *testing.T) {
t.Parallel()
storage := teststorage.New(t)
inflightQueries := atomic.Int32{}
maxInflight := atomic.Int32{}
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
ruleCount := 7
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
// Configure concurrency settings.
opts.ConcurrentEvalsEnabled = true
opts.MaxConcurrentEvals = int64(ruleCount) * 2
opts.RuleConcurrencyController = nil
ruleManager := NewManager(opts)
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_indeterminates.yaml"}...)
require.Empty(t, errs)
require.Len(t, groups, 1)
for _, group := range groups {
require.Len(t, group.rules, ruleCount)
start := time.Now()
group.Eval(ctx, start)
// Never expect more than 1 inflight query at a time.
require.EqualValues(t, 1, maxInflight.Load())
// Each rule should take at least 1 second to execute sequentially.
require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
// Inflight queries should be equal to 6. This is the size of the second batch of rules that can be executed concurrently.
require.EqualValues(t, 6, maxInflight.Load())
// Some rules should execute concurrently so should complete quicker.
require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
// Each rule produces one vector.
require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
}
})
t.Run("asynchronous evaluation of rules that benefit from reordering", func(t *testing.T) {
t.Parallel()
storage := teststorage.New(t)
inflightQueries := atomic.Int32{}
maxInflight := atomic.Int32{}
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
ruleCount := 8
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
// Configure concurrency settings.
opts.ConcurrentEvalsEnabled = true
opts.MaxConcurrentEvals = int64(ruleCount) * 2
opts.RuleConcurrencyController = nil
ruleManager := NewManager(opts)
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_dependents_on_base.yaml"}...)
require.Empty(t, errs)
require.Len(t, groups, 1)
var group *Group
for _, g := range groups {
group = g
}
start := time.Now()
// Expected evaluation order
order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group)
require.Equal(t, []ConcurrentRules{
{0, 4},
{1, 2, 3, 5, 6, 7},
}, order)
group.Eval(ctx, start)
// Inflight queries should be equal to 6. This is the size of the second batch of rules that can be executed concurrently.
require.EqualValues(t, 6, maxInflight.Load())
// Some rules should execute concurrently so should complete quicker.
require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
// Each rule produces one vector.
require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
})
})
t.Run("attempted asynchronous evaluation of chained rules", func(t *testing.T) {
t.Parallel()
storage := teststorage.New(t)
synctest.Test(t, func(t *testing.T) {
storage := teststorage.New(t)
inflightQueries := atomic.Int32{}
maxInflight := atomic.Int32{}
inflightQueries := atomic.Int32{}
maxInflight := atomic.Int32{}
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
ctx := t.Context()
ruleCount := 7
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
ruleCount := 7
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
// Configure concurrency settings.
opts.ConcurrentEvalsEnabled = true
opts.MaxConcurrentEvals = int64(ruleCount) * 2
opts.RuleConcurrencyController = nil
ruleManager := NewManager(opts)
// Configure concurrency settings.
opts.ConcurrentEvalsEnabled = true
opts.MaxConcurrentEvals = int64(ruleCount) * 2
opts.RuleConcurrencyController = nil
ruleManager := NewManager(opts)
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_chain.yaml"}...)
require.Empty(t, errs)
require.Len(t, groups, 1)
var group *Group
for _, g := range groups {
group = g
}
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_chain.yaml"}...)
require.Empty(t, errs)
require.Len(t, groups, 1)
var group *Group
for _, g := range groups {
group = g
}
start := time.Now()
start := time.Now()
// Expected evaluation order
order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group)
require.Equal(t, []ConcurrentRules{
{0, 1},
{2},
{3},
{4, 5, 6},
}, order)
// Expected evaluation order
order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group)
require.Equal(t, []ConcurrentRules{
{0, 1},
{2},
{3},
{4, 5, 6},
}, order)
group.Eval(ctx, start)
group.Eval(ctx, start)
require.EqualValues(t, 3, maxInflight.Load())
// Some rules should execute concurrently so should complete quicker.
require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
// Each rule produces one vector.
require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
require.EqualValues(t, 3, maxInflight.Load())
// Some rules should execute concurrently so should complete quicker.
require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
// Each rule produces one vector.
require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
})
})
}
@ -2472,11 +2473,9 @@ func TestBoundedRuleEvalConcurrency(t *testing.T) {
// Evaluate groups concurrently (like they normally do).
var wg sync.WaitGroup
for _, group := range groups {
wg.Add(1)
go func() {
wg.Go(func() {
group.Eval(ctx, time.Now())
wg.Done()
}()
})
}
wg.Wait()

View file

@ -225,7 +225,8 @@ func (h *writeHandler) appendV1Samples(app storage.Appender, ss []prompb.Sample,
if err != nil {
if errors.Is(err, storage.ErrOutOfOrderSample) ||
errors.Is(err, storage.ErrOutOfBounds) ||
errors.Is(err, storage.ErrDuplicateSampleForTimestamp) {
errors.Is(err, storage.ErrDuplicateSampleForTimestamp) ||
errors.Is(err, storage.ErrTooOldSample) {
h.logger.Error("Out of order sample from remote write", "err", err.Error(), "series", labels.String(), "timestamp", s.Timestamp)
}
return err
@ -247,7 +248,8 @@ func (h *writeHandler) appendV1Histograms(app storage.Appender, hh []prompb.Hist
// a note indicating its inclusion in the future.
if errors.Is(err, storage.ErrOutOfOrderSample) ||
errors.Is(err, storage.ErrOutOfBounds) ||
errors.Is(err, storage.ErrDuplicateSampleForTimestamp) {
errors.Is(err, storage.ErrDuplicateSampleForTimestamp) ||
errors.Is(err, storage.ErrTooOldSample) {
h.logger.Error("Out of order histogram from remote write", "err", err.Error(), "series", labels.String(), "timestamp", hp.Timestamp)
}
return err
@ -409,7 +411,8 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *
// a note indicating its inclusion in the future.
if errors.Is(err, storage.ErrOutOfOrderSample) ||
errors.Is(err, storage.ErrOutOfBounds) ||
errors.Is(err, storage.ErrDuplicateSampleForTimestamp) {
errors.Is(err, storage.ErrDuplicateSampleForTimestamp) ||
errors.Is(err, storage.ErrTooOldSample) {
// TODO(bwplotka): Not too spammy log?
h.logger.Error("Out of order histogram from remote write", "err", err.Error(), "series", ls.String(), "timestamp", hp.Timestamp)
badRequestErrs = append(badRequestErrs, fmt.Errorf("%w for series %v", err, ls.String()))

View file

@ -176,7 +176,7 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch {
case err == nil:
case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrOutOfBounds), errors.Is(err, storage.ErrDuplicateSampleForTimestamp):
case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrOutOfBounds), errors.Is(err, storage.ErrDuplicateSampleForTimestamp), errors.Is(err, storage.ErrTooOldSample):
// Indicated an out of order sample is a bad request to prevent retries.
http.Error(w, err.Error(), http.StatusBadRequest)
return

View file

@ -111,10 +111,7 @@ func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChu
}
func (c *chunkWriteQueue) start() {
c.workerWg.Add(1)
go func() {
defer c.workerWg.Done()
c.workerWg.Go(func() {
for {
job, ok := c.jobs.pop()
if !ok {
@ -123,7 +120,7 @@ func (c *chunkWriteQueue) start() {
c.processJob(job)
}
}()
})
c.isRunningMtx.Lock()
c.isRunning = true

View file

@ -269,34 +269,26 @@ func TestQueuePushPopManyGoroutines(t *testing.T) {
readersWG := sync.WaitGroup{}
for range readGoroutines {
readersWG.Add(1)
go func() {
defer readersWG.Done()
readersWG.Go(func() {
for j, ok := queue.pop(); ok; j, ok = queue.pop() {
refsMx.Lock()
refs[j.seriesRef] = true
refsMx.Unlock()
}
}()
})
}
id := atomic.Uint64{}
writersWG := sync.WaitGroup{}
for range writeGoroutines {
writersWG.Add(1)
go func() {
defer writersWG.Done()
writersWG.Go(func() {
for range writes {
ref := id.Inc()
require.True(t, queue.push(chunkWriteJob{seriesRef: HeadSeriesRef(ref)}))
}
}()
})
}
// Wait until all writes are done.

View file

@ -1717,10 +1717,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
// Ingest sparse histograms.
for _, ah := range allSparseSeries {
var (
@ -1743,7 +1740,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) {
sparseULIDs, err = compactor.Write(sparseHead.opts.ChunkDirRoot, sparseHead, mint, maxt, nil)
require.NoError(t, err)
require.Len(t, sparseULIDs, 1)
}()
})
wg.Add(1)
go func(c testcase) {

View file

@ -929,9 +929,13 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn
for _, tmpDir := range []string{walDir, dir} {
// Remove tmp dirs.
if err := removeBestEffortTmpDirs(l, tmpDir); err != nil {
if err := tsdbutil.RemoveTmpDirs(l, tmpDir, isTmpDir); err != nil {
return nil, fmt.Errorf("remove tmp dirs: %w", err)
}
// Remove any temporary checkpoints that might have been interrupted during creation.
if err := wlog.DeleteTempCheckpoints(l, tmpDir); err != nil {
return nil, fmt.Errorf("delete temp checkpoints: %w", err)
}
}
db := &DB{
@ -1115,26 +1119,6 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn
return db, nil
}
func removeBestEffortTmpDirs(l *slog.Logger, dir string) error {
files, err := os.ReadDir(dir)
if os.IsNotExist(err) {
return nil
}
if err != nil {
return err
}
for _, f := range files {
if isTmpDir(f) {
if err := os.RemoveAll(filepath.Join(dir, f.Name())); err != nil {
l.Error("failed to delete tmp block dir", "dir", filepath.Join(dir, f.Name()), "err", err)
continue
}
l.Info("Found and deleted tmp block dir", "dir", filepath.Join(dir, f.Name()))
}
}
return nil
}
// StartTime implements the Storage interface.
func (db *DB) StartTime() (int64, error) {
db.mtx.RLock()
@ -2538,8 +2522,7 @@ func isBlockDir(fi fs.DirEntry) bool {
return err == nil
}
// isTmpDir returns true if the given file-info contains a block ULID, a checkpoint prefix,
// or a chunk snapshot prefix and a tmp extension.
// isTmpDir returns true if the given file-info contains a block ULID, or a chunk snapshot prefix and a tmp extension.
func isTmpDir(fi fs.DirEntry) bool {
if !fi.IsDir() {
return false
@ -2548,9 +2531,6 @@ func isTmpDir(fi fs.DirEntry) bool {
fn := fi.Name()
ext := filepath.Ext(fn)
if ext == tmpForDeletionBlockDirSuffix || ext == tmpForCreationBlockDirSuffix || ext == tmpLegacy {
if strings.HasPrefix(fn, wlog.CheckpointPrefix) {
return true
}
if strings.HasPrefix(fn, chunkSnapshotPrefix) {
return true
}

View file

@ -1835,6 +1835,7 @@ func TestBlockRanges_AppendV2(t *testing.T) {
createBlock(t, dir, genSeries(1, 1, 0, firstBlockMaxT))
db, err := open(dir, logger, nil, DefaultOptions(), []int64{10000}, nil)
require.NoError(t, err)
db.DisableCompactions()
rangeToTriggerCompaction := db.compactor.(*LeveledCompactor).ranges[0]/2*3 + 1
@ -1851,21 +1852,16 @@ func TestBlockRanges_AppendV2(t *testing.T) {
require.NoError(t, err)
require.NoError(t, app.Commit())
for range 100 {
if len(db.Blocks()) == 2 {
break
}
time.Sleep(100 * time.Millisecond)
}
require.Len(t, db.Blocks(), 2, "no new block created after the set timeout")
require.NoError(t, db.Compact(ctx))
blocks := db.Blocks()
require.Len(t, blocks, 2, "no new block after compaction")
require.LessOrEqual(t, db.Blocks()[1].Meta().MinTime, db.Blocks()[0].Meta().MaxTime,
"new block overlaps old:%v,new:%v", db.Blocks()[0].Meta(), db.Blocks()[1].Meta())
require.GreaterOrEqual(t, blocks[1].Meta().MinTime, blocks[0].Meta().MaxTime,
"new block overlaps old:%v,new:%v", blocks[0].Meta(), blocks[1].Meta())
// Test that wal records are skipped when an existing block covers the same time ranges
// and compaction doesn't create an overlapping block.
app = db.AppenderV2(ctx)
db.DisableCompactions()
_, err = app.Append(0, lbl, 0, secondBlockMaxt+1, rand.Float64(), nil, nil, storage.AOptions{})
require.NoError(t, err)
_, err = app.Append(0, lbl, 0, secondBlockMaxt+2, rand.Float64(), nil, nil, storage.AOptions{})
@ -1882,6 +1878,7 @@ func TestBlockRanges_AppendV2(t *testing.T) {
db, err = open(dir, logger, nil, DefaultOptions(), []int64{10000}, nil)
require.NoError(t, err)
db.DisableCompactions()
defer db.Close()
require.Len(t, db.Blocks(), 3, "db doesn't include expected number of blocks")
@ -1891,17 +1888,12 @@ func TestBlockRanges_AppendV2(t *testing.T) {
_, err = app.Append(0, lbl, 0, thirdBlockMaxt+rangeToTriggerCompaction, rand.Float64(), nil, nil, storage.AOptions{}) // Trigger a compaction
require.NoError(t, err)
require.NoError(t, app.Commit())
for range 100 {
if len(db.Blocks()) == 4 {
break
}
time.Sleep(100 * time.Millisecond)
}
require.NoError(t, db.Compact(ctx))
blocks = db.Blocks()
require.Len(t, blocks, 4, "no new block after compaction")
require.Len(t, db.Blocks(), 4, "no new block created after the set timeout")
require.LessOrEqual(t, db.Blocks()[3].Meta().MinTime, db.Blocks()[2].Meta().MaxTime,
"new block overlaps old:%v,new:%v", db.Blocks()[2].Meta(), db.Blocks()[3].Meta())
require.GreaterOrEqual(t, blocks[3].Meta().MinTime, blocks[2].Meta().MaxTime,
"new block overlaps old:%v,new:%v", blocks[2].Meta(), blocks[3].Meta())
}
// TestDBReadOnly ensures that opening a DB in readonly mode doesn't modify any files on the disk.

View file

@ -2408,6 +2408,7 @@ func TestBlockRanges(t *testing.T) {
createBlock(t, dir, genSeries(1, 1, 0, firstBlockMaxT))
db, err := open(dir, logger, nil, DefaultOptions(), []int64{10000}, nil)
require.NoError(t, err)
db.DisableCompactions()
rangeToTriggerCompaction := db.compactor.(*LeveledCompactor).ranges[0]/2*3 + 1
@ -2424,21 +2425,16 @@ func TestBlockRanges(t *testing.T) {
require.NoError(t, err)
require.NoError(t, app.Commit())
for range 100 {
if len(db.Blocks()) == 2 {
break
}
time.Sleep(100 * time.Millisecond)
}
require.Len(t, db.Blocks(), 2, "no new block created after the set timeout")
require.NoError(t, db.Compact(ctx))
blocks := db.Blocks()
require.Len(t, blocks, 2, "no new block after compaction")
require.LessOrEqual(t, db.Blocks()[1].Meta().MinTime, db.Blocks()[0].Meta().MaxTime,
"new block overlaps old:%v,new:%v", db.Blocks()[0].Meta(), db.Blocks()[1].Meta())
require.GreaterOrEqual(t, blocks[1].Meta().MinTime, blocks[0].Meta().MaxTime,
"new block overlaps old:%v,new:%v", blocks[0].Meta(), blocks[1].Meta())
// Test that wal records are skipped when an existing block covers the same time ranges
// and compaction doesn't create an overlapping block.
app = db.Appender(ctx)
db.DisableCompactions()
_, err = app.Append(0, lbl, secondBlockMaxt+1, rand.Float64())
require.NoError(t, err)
_, err = app.Append(0, lbl, secondBlockMaxt+2, rand.Float64())
@ -2455,6 +2451,7 @@ func TestBlockRanges(t *testing.T) {
db, err = open(dir, logger, nil, DefaultOptions(), []int64{10000}, nil)
require.NoError(t, err)
db.DisableCompactions()
defer db.Close()
require.Len(t, db.Blocks(), 3, "db doesn't include expected number of blocks")
@ -2464,17 +2461,12 @@ func TestBlockRanges(t *testing.T) {
_, err = app.Append(0, lbl, thirdBlockMaxt+rangeToTriggerCompaction, rand.Float64()) // Trigger a compaction
require.NoError(t, err)
require.NoError(t, app.Commit())
for range 100 {
if len(db.Blocks()) == 4 {
break
}
time.Sleep(100 * time.Millisecond)
}
require.NoError(t, db.Compact(ctx))
blocks = db.Blocks()
require.Len(t, blocks, 4, "no new block after compaction")
require.Len(t, db.Blocks(), 4, "no new block created after the set timeout")
require.LessOrEqual(t, db.Blocks()[3].Meta().MinTime, db.Blocks()[2].Meta().MaxTime,
"new block overlaps old:%v,new:%v", db.Blocks()[2].Meta(), db.Blocks()[3].Meta())
require.GreaterOrEqual(t, blocks[3].Meta().MinTime, blocks[2].Meta().MaxTime,
"new block overlaps old:%v,new:%v", blocks[2].Meta(), blocks[3].Meta())
}
// TestDBReadOnly ensures that opening a DB in readonly mode doesn't modify any files on the disk.

View file

@ -1334,13 +1334,11 @@ func TestDataMissingOnQueryDuringCompaction_AppenderV2(t *testing.T) {
require.NoError(t, err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
// Compacting head while the querier spans the compaction time.
require.NoError(t, db.Compact(ctx))
require.NotEmpty(t, db.Blocks())
}()
})
// Give enough time for compaction to finish.
// We expect it to be blocked until querier is closed.

View file

@ -3259,12 +3259,10 @@ func testHeadSeriesChunkRace(t *testing.T) {
defer q.Close()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
h.updateMinMaxTime(20, 25)
h.gc()
}()
})
ss := q.Select(context.Background(), false, nil, matcher)
for ss.Next() {
}
@ -3748,13 +3746,11 @@ func TestChunkNotFoundHeadGCRace(t *testing.T) {
s := ss.At()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
// Compacting head while the querier spans the compaction time.
require.NoError(t, db.Compact(ctx))
require.NotEmpty(t, db.Blocks())
}()
})
// Give enough time for compaction to finish.
// We expect it to be blocked until querier is closed.
@ -3812,13 +3808,11 @@ func TestDataMissingOnQueryDuringCompaction(t *testing.T) {
require.NoError(t, err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
// Compacting head while the querier spans the compaction time.
require.NoError(t, db.Compact(ctx))
require.NotEmpty(t, db.Blocks())
}()
})
// Give enough time for compaction to finish.
// We expect it to be blocked until querier is closed.

View file

@ -956,7 +956,7 @@ func FindIntersectingPostings(p Postings, candidates []Postings) (indexes []int,
}
if p.At() == h.at() {
indexes = append(indexes, h.popIndex())
} else if err := h.next(); err != nil {
} else if err := h.seekHead(p.At()); err != nil {
return nil, err
}
}
@ -999,20 +999,18 @@ func (h *postingsWithIndexHeap) popIndex() int {
// at provides the storage.SeriesRef where root Postings is pointing at this moment.
func (h postingsWithIndexHeap) at() storage.SeriesRef { return h[0].p.At() }
// next performs the Postings.Next() operation on the root of the heap, performing the related operation on the heap
// and conveniently returning the result of calling Postings.Err() if the result of calling Next() was false.
// If Next() succeeds, heap is fixed to move the root to its new position, according to its Postings.At() value.
// If Next() returns fails and there's no error reported by Postings.Err(), then root is marked as removed and heap is fixed.
func (h *postingsWithIndexHeap) next() error {
// seekHead performs the Postings.Seek() operation on the root of the heap.
// If the root is exhausted or fails, it is removed from the heap.
func (h *postingsWithIndexHeap) seekHead(val storage.SeriesRef) error {
pi := (*h)[0]
next := pi.p.Next()
next := pi.p.Seek(val)
if next {
heap.Fix(h, 0)
return nil
}
if err := pi.p.Err(); err != nil {
return fmt.Errorf("postings %d: %w", pi.index, err)
return fmt.Errorf("seek postings %d: %w", pi.index, err)
}
h.popIndex()
return nil

View file

@ -1192,7 +1192,7 @@ func (p *postingsFailingAfterNthCall) Err() error {
}
func TestPostingsWithIndexHeap(t *testing.T) {
t.Run("iterate", func(t *testing.T) {
t.Run("seekHead", func(t *testing.T) {
h := postingsWithIndexHeap{
{index: 0, p: NewListPostings([]storage.SeriesRef{10, 20, 30})},
{index: 1, p: NewListPostings([]storage.SeriesRef{1, 5})},
@ -1205,7 +1205,7 @@ func TestPostingsWithIndexHeap(t *testing.T) {
for _, expected := range []storage.SeriesRef{1, 5, 10, 20, 25, 30, 50} {
require.Equal(t, expected, h.at())
require.NoError(t, h.next())
require.NoError(t, h.seekHead(h.at()+1))
}
require.True(t, h.empty())
})
@ -1223,7 +1223,7 @@ func TestPostingsWithIndexHeap(t *testing.T) {
for _, expected := range []storage.SeriesRef{1, 5, 10, 20} {
require.Equal(t, expected, h.at())
require.NoError(t, h.next())
require.NoError(t, h.seekHead(h.at()+1))
}
require.Equal(t, storage.SeriesRef(25), h.at())
node := heap.Pop(&h).(postingsWithIndex)

View file

@ -88,10 +88,7 @@ func BenchmarkIsolation(b *testing.B) {
start := make(chan struct{})
for range goroutines {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
<-start
for b.Loop() {
@ -99,7 +96,7 @@ func BenchmarkIsolation(b *testing.B) {
iso.closeAppend(appendID)
}
}()
})
}
b.ResetTimer()
@ -118,10 +115,7 @@ func BenchmarkIsolationWithState(b *testing.B) {
start := make(chan struct{})
for range goroutines {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
<-start
for b.Loop() {
@ -129,7 +123,7 @@ func BenchmarkIsolationWithState(b *testing.B) {
iso.closeAppend(appendID)
}
}()
})
}
readers := goroutines / 100
@ -138,17 +132,14 @@ func BenchmarkIsolationWithState(b *testing.B) {
}
for g := 0; g < readers; g++ {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
<-start
for b.Loop() {
s := iso.State(math.MinInt64, math.MaxInt64)
s.Close()
}
}()
})
}
b.ResetTimer()

View file

@ -0,0 +1,86 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"context"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/wlog"
)
// BenchmarkLabelValues_SlowPath benchmarks the performance of LabelValues when the matcher
// is far ahead of the candidate posting list. This reproduces the performance regression
// described in #14551 where dense candidates caused O(N) iteration instead of O(log N) seeking.
func BenchmarkLabelValues_SlowPath(b *testing.B) {
// Create a head with some data.
opts := DefaultHeadOptions()
opts.ChunkDirRoot = b.TempDir()
h, err := NewHead(nil, nil, nil, nil, opts, nil)
require.NoError(b, err)
defer h.Close()
app := h.Appender(context.Background())
// 1. Create a large number of series for a "candidate" label (e.g. "job").
// We want these to NOT match the target matcher, but be candidates for a different label.
// We use "job=api" and "instance=..."
// We want the interaction to be:
// LabelValues("instance", "job"="api")
// "job"="api" will have 1 series at the END.
// "instance" will have 100k series.
// Actually, let's stick to the reproduction case:
// distinct values for "val1".
// "b"="1" matcher.
// Create 100k series with the same label value ("common") but without the matcher label.
// This results in a single large posting list for that value, simulating a dense candidate.
for i := range 100000 {
_, err := app.Append(0, labels.FromStrings("val1", "common", "extra", strconv.Itoa(i)), time.Now().UnixMilli(), 1)
require.NoError(b, err)
}
// Create 1 series that matches the label "b=1", with a series ID greater than all previous ones.
// This forces the intersection to skip over all 100k previous candidates.
_, err = app.Append(0, labels.FromStrings("val1", "common", "b", "1"), time.Now().UnixMilli(), 1)
require.NoError(b, err)
require.NoError(b, app.Commit())
ctx := context.Background()
matcher := labels.MustNewMatcher(labels.MatchEqual, "b", "1")
// Use the correct method to access label values.
idx, err := h.Index()
require.NoError(b, err)
b.ResetTimer()
b.ReportAllocs()
for b.Loop() {
// "val1"="common" has 100k+1 postings.
// "b=1" has 1 posting (the last one).
vals, err := idx.LabelValues(ctx, "val1", nil, matcher)
require.NoError(b, err)
require.Equal(b, []string{"common"}, vals)
}
}
// Ensure wlog/wal needed for NewHead.
var _ = wlog.WL{}

View file

@ -0,0 +1,45 @@
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdbutil
import (
"io/fs"
"log/slog"
"os"
"path/filepath"
)
// RemoveTmpDirs attempts to remove directories in the specified directory which match the isTmpDir predicate.
// Errors encountered during reading the directory that other than non-existence are returned. All other errors
// encountered during removal of tmp directories are logged but do not cause early termination.
func RemoveTmpDirs(l *slog.Logger, dir string, isTmpDir func(fi fs.DirEntry) bool) error {
files, err := os.ReadDir(dir)
if os.IsNotExist(err) {
return nil
}
if err != nil {
return err
}
for _, f := range files {
if isTmpDir(f) {
if err := os.RemoveAll(filepath.Join(dir, f.Name())); err != nil {
l.Error("failed to delete tmp dir", "dir", filepath.Join(dir, f.Name()), "err", err)
continue
}
l.Info("Found and deleted tmp dir", "dir", filepath.Join(dir, f.Name()))
}
}
return nil
}

View file

@ -0,0 +1,124 @@
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdbutil
import (
"io/fs"
"os"
"path/filepath"
"strings"
"testing"
"github.com/prometheus/common/promslog"
"github.com/stretchr/testify/require"
)
func TestRemoveTmpDirs(t *testing.T) {
tests := []struct {
name string
isTmpDir func(fi fs.DirEntry) bool
setup func(t *testing.T, dir string)
expectedDirs []string // Directories that should remain after cleanup
}{
{
name: "remove directories with tmp prefix",
isTmpDir: func(fi fs.DirEntry) bool {
return fi.IsDir() && strings.HasPrefix(fi.Name(), "tmp")
},
setup: func(t *testing.T, dir string) {
require.NoError(t, os.Mkdir(filepath.Join(dir, "tmpdir1"), 0o755))
require.NoError(t, os.Mkdir(filepath.Join(dir, "tmpdir2"), 0o755))
require.NoError(t, os.Mkdir(filepath.Join(dir, "normaldir"), 0o755))
},
expectedDirs: []string{"normaldir"},
},
{
name: "remove directories with specific suffix",
isTmpDir: func(fi fs.DirEntry) bool {
return fi.IsDir() && strings.HasSuffix(fi.Name(), ".tmp")
},
setup: func(t *testing.T, dir string) {
require.NoError(t, os.Mkdir(filepath.Join(dir, "data.tmp"), 0o755))
require.NoError(t, os.Mkdir(filepath.Join(dir, "cache.tmp"), 0o755))
require.NoError(t, os.Mkdir(filepath.Join(dir, "permanent"), 0o755))
},
expectedDirs: []string{"permanent"},
},
{
name: "no temporary directories to remove",
isTmpDir: func(fi fs.DirEntry) bool {
return fi.IsDir() && strings.HasPrefix(fi.Name(), "tmp")
},
setup: func(t *testing.T, dir string) {
require.NoError(t, os.Mkdir(filepath.Join(dir, "normaldir1"), 0o755))
require.NoError(t, os.Mkdir(filepath.Join(dir, "normaldir2"), 0o755))
},
expectedDirs: []string{"normaldir1", "normaldir2"},
},
{
name: "empty directory",
isTmpDir: func(fi fs.DirEntry) bool {
return fi.IsDir() && strings.HasPrefix(fi.Name(), "tmp")
},
setup: func(_ *testing.T, _ string) {}, // No setup needed - directory is empty
expectedDirs: []string{},
},
{
name: "directory with files only (no directories)",
isTmpDir: func(fi fs.DirEntry) bool {
return fi.IsDir() && strings.HasPrefix(fi.Name(), "tmp")
},
setup: func(t *testing.T, dir string) {
require.NoError(t, os.WriteFile(filepath.Join(dir, "tmpfile1.txt"), []byte("test"), 0o644))
require.NoError(t, os.WriteFile(filepath.Join(dir, "tmpfile2.txt"), []byte("test"), 0o644))
},
expectedDirs: []string{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testDir := t.TempDir()
if tt.setup != nil {
tt.setup(t, testDir)
}
require.NoError(t, RemoveTmpDirs(promslog.NewNopLogger(), testDir, tt.isTmpDir))
entries, err := os.ReadDir(testDir)
require.NoError(t, err)
// Get actual remaining directories
var actualDirs []string
for _, entry := range entries {
if entry.IsDir() {
actualDirs = append(actualDirs, entry.Name())
}
}
require.ElementsMatch(t, tt.expectedDirs, actualDirs, "Remaining directories don't match expected")
})
}
}
func TestRemoveTmpDirs_NonExistentDirectory(t *testing.T) {
testDir := t.TempDir()
nonExistent := filepath.Join(testDir, "does_not_exist")
require.NoError(t, RemoveTmpDirs(promslog.NewNopLogger(), nonExistent, func(_ fs.DirEntry) bool {
return true
}))
}

View file

@ -18,6 +18,7 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"log/slog"
"math"
"os"
@ -31,6 +32,7 @@ import (
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
)
// CheckpointStats returns stats about a created checkpoint.
@ -80,8 +82,16 @@ func DeleteCheckpoints(dir string, maxIndex int) error {
return errors.Join(errs...)
}
// CheckpointPrefix is the prefix used for checkpoint files.
const CheckpointPrefix = "checkpoint."
// checkpointTempFileSuffix is the suffix used when creating temporary checkpoint files.
const checkpointTempFileSuffix = ".tmp"
// DeleteTempCheckpoints deletes all temporary checkpoint directories in the given directory.
func DeleteTempCheckpoints(logger *slog.Logger, dir string) error {
if err := tsdbutil.RemoveTmpDirs(logger, dir, isTempDir); err != nil {
return fmt.Errorf("remove previous temporary checkpoint dirs: %w", err)
}
return nil
}
// Checkpoint creates a compacted checkpoint of segments in range [from, to] in the given WAL.
// It includes the most recent checkpoint if it exists.
@ -123,13 +133,13 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
defer sgmReader.Close()
}
cpdir := checkpointDir(w.Dir(), to)
cpdirtmp := cpdir + ".tmp"
if err := os.RemoveAll(cpdirtmp); err != nil {
return nil, fmt.Errorf("remove previous temporary checkpoint dir: %w", err)
if err := DeleteTempCheckpoints(logger, w.Dir()); err != nil {
return nil, err
}
cpdir := checkpointDir(w.Dir(), to)
cpdirtmp := cpdir + checkpointTempFileSuffix
if err := os.MkdirAll(cpdirtmp, 0o777); err != nil {
return nil, fmt.Errorf("create checkpoint dir: %w", err)
}
@ -394,8 +404,11 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
return stats, nil
}
// checkpointPrefix is the prefix used for checkpoint files.
const checkpointPrefix = "checkpoint."
func checkpointDir(dir string, i int) string {
return filepath.Join(dir, fmt.Sprintf(CheckpointPrefix+"%08d", i))
return filepath.Join(dir, fmt.Sprintf(checkpointPrefix+"%08d", i))
}
type checkpointRef struct {
@ -411,13 +424,13 @@ func listCheckpoints(dir string) (refs []checkpointRef, err error) {
for i := range files {
fi := files[i]
if !strings.HasPrefix(fi.Name(), CheckpointPrefix) {
if !strings.HasPrefix(fi.Name(), checkpointPrefix) {
continue
}
if !fi.IsDir() {
return nil, fmt.Errorf("checkpoint %s is not a directory", fi.Name())
}
idx, err := strconv.Atoi(fi.Name()[len(CheckpointPrefix):])
idx, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):])
if err != nil {
continue
}
@ -431,3 +444,7 @@ func listCheckpoints(dir string) (refs []checkpointRef, err error) {
return refs, nil
}
func isTempDir(fi fs.DirEntry) bool {
return strings.HasPrefix(fi.Name(), checkpointPrefix) && strings.HasSuffix(fi.Name(), checkpointTempFileSuffix)
}

View file

@ -417,3 +417,81 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
})
require.NoError(t, err)
}
func TestCheckpointDeletesTemporaryCheckpoints(t *testing.T) {
dir := t.TempDir()
// Create one tmp checkpoint directory
require.NoError(t, os.MkdirAll(filepath.Join(dir, "checkpoint.00001000.tmp"), 0o777))
w, err := New(nil, nil, dir, compression.None)
require.NoError(t, err)
defer w.Close()
_, err = Checkpoint(promslog.NewNopLogger(), w, 0, 1000, func(_ chunks.HeadSeriesRef) bool { return true }, 1000)
require.NoError(t, err)
files, err := os.ReadDir(dir)
require.NoError(t, err)
var actualDirectories []string
for _, f := range files {
if !f.IsDir() {
continue
}
actualDirectories = append(actualDirectories, f.Name())
}
require.Equal(t, []string{"checkpoint.00001000"}, actualDirectories)
}
func TestDeleteTempCheckpoints(t *testing.T) {
testCases := []struct {
name string
checkpointDirectoriesToCreate []string
expectedDirectories []string
}{
{
name: "no tmp checkpoints",
checkpointDirectoriesToCreate: nil,
expectedDirectories: nil,
},
{
name: "one tmp checkpoint",
checkpointDirectoriesToCreate: []string{"checkpoint.00001000.tmp"},
expectedDirectories: nil,
},
{
name: "many tmp checkpoints",
checkpointDirectoriesToCreate: []string{"checkpoint.00000001.tmp", "checkpoint.00001000.tmp", "checkpoint.00002000.tmp"},
expectedDirectories: nil,
},
{
name: "mix of tmp and regular checkpoints",
checkpointDirectoriesToCreate: []string{"checkpoint.00000001", "checkpoint.00000001.tmp", "checkpoint.00001000.tmp", "checkpoint.00002000"},
expectedDirectories: []string{"checkpoint.00000001", "checkpoint.00002000"},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
dir := t.TempDir()
for _, fn := range tc.checkpointDirectoriesToCreate {
require.NoError(t, os.MkdirAll(filepath.Join(dir, fn), 0o777))
}
require.NoError(t, DeleteTempCheckpoints(promslog.NewNopLogger(), dir))
files, err := os.ReadDir(dir)
require.NoError(t, err)
var actualDirectories []string
for _, f := range files {
if !f.IsDir() {
continue
}
actualDirectories = append(actualDirectories, f.Name())
}
require.Equal(t, tc.expectedDirectories, actualDirectories)
})
}
}

View file

@ -265,8 +265,7 @@ func (tc *ZookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTr
}
}
tc.wg.Add(1)
go func() {
tc.wg.Go(func() {
numWatchers.Inc()
// Pass up zookeeper events, until the node is deleted.
select {
@ -277,8 +276,7 @@ func (tc *ZookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTr
case <-node.done:
}
numWatchers.Dec()
tc.wg.Done()
}()
})
return nil
}

View file

@ -1543,6 +1543,33 @@ const funcDocs: Record<string, React.ReactNode> = {
</p>
</>
),
histogram_quantiles: (
<>
<p>
<strong>
This function has to be enabled via the{" "}
<a href="../feature_flags.md#experimental-promql-functions">feature flag</a>
<code>--enable-feature=promql-experimental-functions</code>.
</strong>
</p>
<p>
<code>histogram_quantiles(v instant-vector, quantile_label string, φ_1 scalar, φ_2 scalar, ...)</code>{" "}
calculates multiple (between 1 and 10) φ-quantiles (0 φ 1) from a{" "}
<a href="https://prometheus.io/docs/concepts/metric_types/#histogram">classic histogram</a> or from a native
histogram. Quantile calculation works the same way as in <code>histogram_quantile()</code>. The second argument
(a string) specifies the label name that is used to identify different quantiles in the query result.
</p>
<pre>
<code>
histogram_quantiles(sum(rate(foo[1m])), &quot;quantile&quot;, 0.9, 0.99) # =&gt; {"{"}quantile=&quot;0.9&quot;
{"}"} 123
{"{"}quantile=&quot;0.99&quot;{"}"} 128
</code>
</pre>
</>
),
histogram_stddev: (
<>
<p>

View file

@ -69,6 +69,12 @@ export const functionSignatures: Record<string, Func> = {
variadic: 0,
returnType: valueType.vector,
},
histogram_quantiles: {
name: "histogram_quantiles",
argTypes: [valueType.vector, valueType.string, valueType.scalar, valueType.scalar],
variadic: 9,
returnType: valueType.vector,
},
histogram_stddev: {
name: "histogram_stddev",
argTypes: [valueType.vector],

View file

@ -243,6 +243,12 @@ export const functionIdentifierTerms = [
info: 'Calculate quantiles from native histograms and from conventional histogram buckets',
type: 'function',
},
{
label: 'histogram_quantiles',
detail: 'function',
info: 'Calculate multiple quantiles from native histograms and from conventional histogram buckets',
type: 'function',
},
{
label: 'histogram_sum',
detail: 'function',

View file

@ -44,6 +44,7 @@ import {
HistogramCount,
HistogramFraction,
HistogramQuantile,
HistogramQuantiles,
HistogramStdDev,
HistogramStdVar,
HistogramSum,
@ -306,6 +307,12 @@ const promqlFunctions: { [key: number]: PromQLFunction } = {
variadic: 0,
returnType: ValueType.vector,
},
[HistogramQuantiles]: {
name: 'histogram_quantiles',
argTypes: [ValueType.vector, ValueType.string, ValueType.scalar, ValueType.scalar],
variadic: 10,
returnType: ValueType.vector,
},
[HistogramStdDev]: {
name: 'histogram_stddev',
argTypes: [ValueType.vector],

View file

@ -167,6 +167,7 @@ FunctionIdentifier {
HistogramCount |
HistogramFraction |
HistogramQuantile |
HistogramQuantiles |
HistogramStdDev |
HistogramStdVar |
HistogramSum |
@ -426,6 +427,7 @@ NumberDurationLiteralInDurationContext {
HistogramCount { condFn<"histogram_count"> }
HistogramFraction { condFn<"histogram_fraction"> }
HistogramQuantile { condFn<"histogram_quantile"> }
HistogramQuantiles { condFn<"histogram_quantiles"> }
HistogramStdDev { condFn<"histogram_stddev"> }
HistogramStdVar { condFn<"histogram_stdvar"> }
HistogramSum { condFn<"histogram_sum"> }

View file

@ -118,12 +118,10 @@ func TestReadyAndHealthy(t *testing.T) {
}
}()
// Give some time for the web goroutine to run since we need the server
// to be up before starting tests.
time.Sleep(5 * time.Second)
baseURL := "http://localhost" + port
waitForServerReady(t, baseURL, 5*time.Second)
resp, err := http.Get(baseURL + "/-/healthy")
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
@ -256,12 +254,10 @@ func TestRoutePrefix(t *testing.T) {
}
}()
// Give some time for the web goroutine to run since we need the server
// to be up before starting tests.
time.Sleep(5 * time.Second)
baseURL := "http://localhost" + port
waitForServerReady(t, baseURL+opts.RoutePrefix, 5*time.Second)
resp, err := http.Get(baseURL + opts.RoutePrefix + "/-/healthy")
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
@ -449,9 +445,9 @@ func TestShutdownWithStaleConnection(t *testing.T) {
close(closed)
}()
// Give some time for the web goroutine to run since we need the server
// to be up before starting tests.
time.Sleep(5 * time.Second)
baseURL := "http://localhost" + port
waitForServerReady(t, baseURL, 5*time.Second)
// Open a socket, and don't use it. This connection should then be closed
// after the ReadTimeout.
@ -500,23 +496,19 @@ func TestHandleMultipleQuitRequests(t *testing.T) {
close(closed)
}()
// Give some time for the web goroutine to run since we need the server
// to be up before starting tests.
time.Sleep(5 * time.Second)
baseURL := opts.ExternalURL.Scheme + "://" + opts.ExternalURL.Host
waitForServerReady(t, baseURL, 5*time.Second)
start := make(chan struct{})
var wg sync.WaitGroup
for range 3 {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
<-start
resp, err := http.Post(baseURL+"/-/quit", "", strings.NewReader(""))
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
}()
})
}
close(start)
wg.Wait()
@ -578,11 +570,10 @@ func TestAgentAPIEndPoints(t *testing.T) {
}
}()
// Give some time for the web goroutine to run since we need the server
// to be up before starting tests.
time.Sleep(5 * time.Second)
baseURL := "http://localhost" + port + "/api/v1"
waitForServerReady(t, "http://localhost"+port, 5*time.Second)
// Test for non-available endpoints in the Agent mode.
for path, methods := range map[string][]string{
"/labels": {http.MethodGet, http.MethodPost},
@ -711,9 +702,7 @@ func TestMultipleListenAddresses(t *testing.T) {
}
}()
// Give some time for the web goroutine to run since we need the server
// to be up before starting tests.
time.Sleep(5 * time.Second)
waitForServerReady(t, "http://localhost"+port1, 5*time.Second)
// Set to ready.
webHandler.SetReady(Ready)
@ -732,3 +721,24 @@ func TestMultipleListenAddresses(t *testing.T) {
cleanupTestResponse(t, resp)
}
}
// Give some time for the web goroutine to run since we need the server
// to be up before starting tests.
func waitForServerReady(t *testing.T, baseURL string, timeout time.Duration) {
t.Helper()
interval := 100 * time.Millisecond
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
resp, err := http.Get(baseURL + "/-/healthy")
if resp != nil {
cleanupTestResponse(t, resp)
}
if err == nil && resp.StatusCode == http.StatusOK {
return
}
time.Sleep(interval)
}
t.Fatalf("Server did not become ready within %v", timeout)
}