From e3e104816633d75c50b4f6d849db1bc6a11fac1a Mon Sep 17 00:00:00 2001 From: 3Juhwan <13selfesteem91@naver.com> Date: Tue, 4 Nov 2025 16:51:41 -0800 Subject: [PATCH 01/10] Isolate fix: Remove 5s sleep for 99% speedup. Discarded unwanted code. Signed-off-by: 3Juhwan <13selfesteem91@naver.com> Signed-off-by: Sammy Tran --- web/web_test.go | 54 ++++++++++++++++++++++++++++++------------------- 1 file changed, 33 insertions(+), 21 deletions(-) diff --git a/web/web_test.go b/web/web_test.go index b07e26cfa8..aa63d09aef 100644 --- a/web/web_test.go +++ b/web/web_test.go @@ -118,12 +118,10 @@ func TestReadyAndHealthy(t *testing.T) { } }() - // Give some time for the web goroutine to run since we need the server - // to be up before starting tests. - time.Sleep(5 * time.Second) - baseURL := "http://localhost" + port + waitForServerReady(t, baseURL, 5*time.Second) + resp, err := http.Get(baseURL + "/-/healthy") require.NoError(t, err) require.Equal(t, http.StatusOK, resp.StatusCode) @@ -235,12 +233,10 @@ func TestRoutePrefix(t *testing.T) { } }() - // Give some time for the web goroutine to run since we need the server - // to be up before starting tests. - time.Sleep(5 * time.Second) - baseURL := "http://localhost" + port + waitForServerReady(t, baseURL+opts.RoutePrefix, 5*time.Second) + resp, err := http.Get(baseURL + opts.RoutePrefix + "/-/healthy") require.NoError(t, err) require.Equal(t, http.StatusOK, resp.StatusCode) @@ -426,9 +422,9 @@ func TestShutdownWithStaleConnection(t *testing.T) { close(closed) }() - // Give some time for the web goroutine to run since we need the server - // to be up before starting tests. - time.Sleep(5 * time.Second) + baseURL := "http://localhost" + port + + waitForServerReady(t, baseURL, 5*time.Second) // Open a socket, and don't use it. This connection should then be closed // after the ReadTimeout. @@ -477,12 +473,10 @@ func TestHandleMultipleQuitRequests(t *testing.T) { close(closed) }() - // Give some time for the web goroutine to run since we need the server - // to be up before starting tests. - time.Sleep(5 * time.Second) - baseURL := opts.ExternalURL.Scheme + "://" + opts.ExternalURL.Host + waitForServerReady(t, baseURL, 5*time.Second) + start := make(chan struct{}) var wg sync.WaitGroup for range 3 { @@ -555,11 +549,10 @@ func TestAgentAPIEndPoints(t *testing.T) { } }() - // Give some time for the web goroutine to run since we need the server - // to be up before starting tests. - time.Sleep(5 * time.Second) baseURL := "http://localhost" + port + "/api/v1" + waitForServerReady(t, "http://localhost"+port, 5*time.Second) + // Test for non-available endpoints in the Agent mode. for path, methods := range map[string][]string{ "/labels": {http.MethodGet, http.MethodPost}, @@ -688,9 +681,7 @@ func TestMultipleListenAddresses(t *testing.T) { } }() - // Give some time for the web goroutine to run since we need the server - // to be up before starting tests. - time.Sleep(5 * time.Second) + waitForServerReady(t, "http://localhost"+port1, 5*time.Second) // Set to ready. webHandler.SetReady(Ready) @@ -709,3 +700,24 @@ func TestMultipleListenAddresses(t *testing.T) { cleanupTestResponse(t, resp) } } + +// Give some time for the web goroutine to run since we need the server +// to be up before starting tests. +func waitForServerReady(t *testing.T, baseURL string, timeout time.Duration) { + t.Helper() + + interval := 100 * time.Millisecond + deadline := time.Now().Add(timeout) + + for time.Now().Before(deadline) { + resp, err := http.Get(baseURL + "/-/healthy") + if resp != nil { + cleanupTestResponse(t, resp) + } + if err == nil && resp.StatusCode == http.StatusOK { + return + } + time.Sleep(interval) + } + t.Fatalf("Server did not become ready within %v", timeout) +} From fcb68060cb8d24c7a3f55802ccce6d73626b3d9b Mon Sep 17 00:00:00 2001 From: Divyansh Mishra Date: Sat, 14 Feb 2026 14:33:17 +0530 Subject: [PATCH 02/10] tsdb: Optimize LabelValues for sparse intersections (Fixes #14551) Signed-off-by: Divyansh Mishra --- tsdb/index/postings.go | 14 +++--- tsdb/index/postings_test.go | 6 +-- tsdb/label_values_bench_test.go | 86 +++++++++++++++++++++++++++++++++ 3 files changed, 95 insertions(+), 11 deletions(-) create mode 100644 tsdb/label_values_bench_test.go diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index 31b93f850d..c0bf213c45 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -956,7 +956,7 @@ func FindIntersectingPostings(p Postings, candidates []Postings) (indexes []int, } if p.At() == h.at() { indexes = append(indexes, h.popIndex()) - } else if err := h.next(); err != nil { + } else if err := h.seekHead(p.At()); err != nil { return nil, err } } @@ -999,20 +999,18 @@ func (h *postingsWithIndexHeap) popIndex() int { // at provides the storage.SeriesRef where root Postings is pointing at this moment. func (h postingsWithIndexHeap) at() storage.SeriesRef { return h[0].p.At() } -// next performs the Postings.Next() operation on the root of the heap, performing the related operation on the heap -// and conveniently returning the result of calling Postings.Err() if the result of calling Next() was false. -// If Next() succeeds, heap is fixed to move the root to its new position, according to its Postings.At() value. -// If Next() returns fails and there's no error reported by Postings.Err(), then root is marked as removed and heap is fixed. -func (h *postingsWithIndexHeap) next() error { +// seekHead performs the Postings.Seek() operation on the root of the heap. +// If the root is exhausted or fails, it is removed from the heap. +func (h *postingsWithIndexHeap) seekHead(val storage.SeriesRef) error { pi := (*h)[0] - next := pi.p.Next() + next := pi.p.Seek(val) if next { heap.Fix(h, 0) return nil } if err := pi.p.Err(); err != nil { - return fmt.Errorf("postings %d: %w", pi.index, err) + return fmt.Errorf("seek postings %d: %w", pi.index, err) } h.popIndex() return nil diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index 77b43f76ab..5c67a2da6d 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -1192,7 +1192,7 @@ func (p *postingsFailingAfterNthCall) Err() error { } func TestPostingsWithIndexHeap(t *testing.T) { - t.Run("iterate", func(t *testing.T) { + t.Run("seekHead", func(t *testing.T) { h := postingsWithIndexHeap{ {index: 0, p: NewListPostings([]storage.SeriesRef{10, 20, 30})}, {index: 1, p: NewListPostings([]storage.SeriesRef{1, 5})}, @@ -1205,7 +1205,7 @@ func TestPostingsWithIndexHeap(t *testing.T) { for _, expected := range []storage.SeriesRef{1, 5, 10, 20, 25, 30, 50} { require.Equal(t, expected, h.at()) - require.NoError(t, h.next()) + require.NoError(t, h.seekHead(h.at()+1)) } require.True(t, h.empty()) }) @@ -1223,7 +1223,7 @@ func TestPostingsWithIndexHeap(t *testing.T) { for _, expected := range []storage.SeriesRef{1, 5, 10, 20} { require.Equal(t, expected, h.at()) - require.NoError(t, h.next()) + require.NoError(t, h.seekHead(h.at()+1)) } require.Equal(t, storage.SeriesRef(25), h.at()) node := heap.Pop(&h).(postingsWithIndex) diff --git a/tsdb/label_values_bench_test.go b/tsdb/label_values_bench_test.go new file mode 100644 index 0000000000..1e55cf80c0 --- /dev/null +++ b/tsdb/label_values_bench_test.go @@ -0,0 +1,86 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "context" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb/wlog" +) + +// BenchmarkLabelValues_SlowPath benchmarks the performance of LabelValues when the matcher +// is far ahead of the candidate posting list. This reproduces the performance regression +// described in #14551 where dense candidates caused O(N) iteration instead of O(log N) seeking. +func BenchmarkLabelValues_SlowPath(b *testing.B) { + // Create a head with some data. + opts := DefaultHeadOptions() + opts.ChunkDirRoot = b.TempDir() + h, err := NewHead(nil, nil, nil, nil, opts, nil) + require.NoError(b, err) + defer h.Close() + + app := h.Appender(context.Background()) + // 1. Create a large number of series for a "candidate" label (e.g. "job"). + // We want these to NOT match the target matcher, but be candidates for a different label. + // We use "job=api" and "instance=..." + // We want the interaction to be: + // LabelValues("instance", "job"="api") + // "job"="api" will have 1 series at the END. + // "instance" will have 100k series. + + // Actually, let's stick to the reproduction case: + // distinct values for "val1". + // "b"="1" matcher. + + // Create 100k series with the same label value ("common") but without the matcher label. + // This results in a single large posting list for that value, simulating a dense candidate. + for i := range 100000 { + _, err := app.Append(0, labels.FromStrings("val1", "common", "extra", strconv.Itoa(i)), time.Now().UnixMilli(), 1) + require.NoError(b, err) + } + + // Create 1 series that matches the label "b=1", with a series ID greater than all previous ones. + // This forces the intersection to skip over all 100k previous candidates. + _, err = app.Append(0, labels.FromStrings("val1", "common", "b", "1"), time.Now().UnixMilli(), 1) + require.NoError(b, err) + + require.NoError(b, app.Commit()) + + ctx := context.Background() + matcher := labels.MustNewMatcher(labels.MatchEqual, "b", "1") + + // Use the correct method to access label values. + idx, err := h.Index() + require.NoError(b, err) + + b.ResetTimer() + b.ReportAllocs() + + for b.Loop() { + // "val1"="common" has 100k+1 postings. + // "b=1" has 1 posting (the last one). + vals, err := idx.LabelValues(ctx, "val1", nil, matcher) + require.NoError(b, err) + require.Equal(b, []string{"common"}, vals) + } +} + +// Ensure wlog/wal needed for NewHead. +var _ = wlog.WL{} From e72bc1381c7ce829efe8c3ac97c36b0753c7ba2d Mon Sep 17 00:00:00 2001 From: Varun Chawla Date: Sat, 14 Feb 2026 02:49:49 -0800 Subject: [PATCH 03/10] fix: handle ErrTooOldSample as 400 Bad Request in OTLP and v2 histogram write paths The OTLP write handler and the PRW v2 histogram append path were missing ErrTooOldSample from their error type checks, causing these errors to fall through to the default case and return HTTP 500 Internal Server Error. This triggered unnecessary retries in OTLP clients like the Python SDK. The PRW v1 write handler (line 115) and the PRW v2 sample append path (line 377) already correctly handle ErrTooOldSample as a 400, and this change makes the remaining paths consistent. Also adds ErrTooOldSample to the v1 sample/histogram log checks so these errors are properly logged instead of silently returned. Fixes #16645 Signed-off-by: Varun Chawla --- storage/remote/write_handler.go | 9 ++++++--- storage/remote/write_otlp_handler.go | 2 +- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 3dac96f6a0..9fdd750692 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -225,7 +225,8 @@ func (h *writeHandler) appendV1Samples(app storage.Appender, ss []prompb.Sample, if err != nil { if errors.Is(err, storage.ErrOutOfOrderSample) || errors.Is(err, storage.ErrOutOfBounds) || - errors.Is(err, storage.ErrDuplicateSampleForTimestamp) { + errors.Is(err, storage.ErrDuplicateSampleForTimestamp) || + errors.Is(err, storage.ErrTooOldSample) { h.logger.Error("Out of order sample from remote write", "err", err.Error(), "series", labels.String(), "timestamp", s.Timestamp) } return err @@ -247,7 +248,8 @@ func (h *writeHandler) appendV1Histograms(app storage.Appender, hh []prompb.Hist // a note indicating its inclusion in the future. if errors.Is(err, storage.ErrOutOfOrderSample) || errors.Is(err, storage.ErrOutOfBounds) || - errors.Is(err, storage.ErrDuplicateSampleForTimestamp) { + errors.Is(err, storage.ErrDuplicateSampleForTimestamp) || + errors.Is(err, storage.ErrTooOldSample) { h.logger.Error("Out of order histogram from remote write", "err", err.Error(), "series", labels.String(), "timestamp", hp.Timestamp) } return err @@ -409,7 +411,8 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * // a note indicating its inclusion in the future. if errors.Is(err, storage.ErrOutOfOrderSample) || errors.Is(err, storage.ErrOutOfBounds) || - errors.Is(err, storage.ErrDuplicateSampleForTimestamp) { + errors.Is(err, storage.ErrDuplicateSampleForTimestamp) || + errors.Is(err, storage.ErrTooOldSample) { // TODO(bwplotka): Not too spammy log? h.logger.Error("Out of order histogram from remote write", "err", err.Error(), "series", ls.String(), "timestamp", hp.Timestamp) badRequestErrs = append(badRequestErrs, fmt.Errorf("%w for series %v", err, ls.String())) diff --git a/storage/remote/write_otlp_handler.go b/storage/remote/write_otlp_handler.go index b8888baeb8..6cb4a0fff0 100644 --- a/storage/remote/write_otlp_handler.go +++ b/storage/remote/write_otlp_handler.go @@ -176,7 +176,7 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch { case err == nil: - case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrOutOfBounds), errors.Is(err, storage.ErrDuplicateSampleForTimestamp): + case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrOutOfBounds), errors.Is(err, storage.ErrDuplicateSampleForTimestamp), errors.Is(err, storage.ErrTooOldSample): // Indicated an out of order sample is a bad request to prevent retries. http.Error(w, err.Error(), http.StatusBadRequest) return From b0718d5c932a8ef781671e5d45e2177dc91383fa Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Sun, 15 Feb 2026 11:44:06 +0100 Subject: [PATCH 04/10] tsdb: fix flaky TestBlockRanges by using explicit compaction Replace polling loops (for range 100 { time.Sleep }) with explicit db.Compact() calls after disabling background compaction, eliminating CI flakiness on slow machines. Also fix incorrect overlap assertions that were checking the wrong direction (LessOrEqual -> GreaterOrEqual). Signed-off-by: Arve Knudsen --- tsdb/db_append_v2_test.go | 32 ++++++++++++-------------------- tsdb/db_test.go | 32 ++++++++++++-------------------- 2 files changed, 24 insertions(+), 40 deletions(-) diff --git a/tsdb/db_append_v2_test.go b/tsdb/db_append_v2_test.go index e6bcfb696d..8083829537 100644 --- a/tsdb/db_append_v2_test.go +++ b/tsdb/db_append_v2_test.go @@ -1835,6 +1835,7 @@ func TestBlockRanges_AppendV2(t *testing.T) { createBlock(t, dir, genSeries(1, 1, 0, firstBlockMaxT)) db, err := open(dir, logger, nil, DefaultOptions(), []int64{10000}, nil) require.NoError(t, err) + db.DisableCompactions() rangeToTriggerCompaction := db.compactor.(*LeveledCompactor).ranges[0]/2*3 + 1 @@ -1851,21 +1852,16 @@ func TestBlockRanges_AppendV2(t *testing.T) { require.NoError(t, err) require.NoError(t, app.Commit()) - for range 100 { - if len(db.Blocks()) == 2 { - break - } - time.Sleep(100 * time.Millisecond) - } - require.Len(t, db.Blocks(), 2, "no new block created after the set timeout") + require.NoError(t, db.Compact(ctx)) + blocks := db.Blocks() + require.Len(t, blocks, 2, "no new block after compaction") - require.LessOrEqual(t, db.Blocks()[1].Meta().MinTime, db.Blocks()[0].Meta().MaxTime, - "new block overlaps old:%v,new:%v", db.Blocks()[0].Meta(), db.Blocks()[1].Meta()) + require.GreaterOrEqual(t, blocks[1].Meta().MinTime, blocks[0].Meta().MaxTime, + "new block overlaps old:%v,new:%v", blocks[0].Meta(), blocks[1].Meta()) // Test that wal records are skipped when an existing block covers the same time ranges // and compaction doesn't create an overlapping block. app = db.AppenderV2(ctx) - db.DisableCompactions() _, err = app.Append(0, lbl, 0, secondBlockMaxt+1, rand.Float64(), nil, nil, storage.AOptions{}) require.NoError(t, err) _, err = app.Append(0, lbl, 0, secondBlockMaxt+2, rand.Float64(), nil, nil, storage.AOptions{}) @@ -1882,6 +1878,7 @@ func TestBlockRanges_AppendV2(t *testing.T) { db, err = open(dir, logger, nil, DefaultOptions(), []int64{10000}, nil) require.NoError(t, err) + db.DisableCompactions() defer db.Close() require.Len(t, db.Blocks(), 3, "db doesn't include expected number of blocks") @@ -1891,17 +1888,12 @@ func TestBlockRanges_AppendV2(t *testing.T) { _, err = app.Append(0, lbl, 0, thirdBlockMaxt+rangeToTriggerCompaction, rand.Float64(), nil, nil, storage.AOptions{}) // Trigger a compaction require.NoError(t, err) require.NoError(t, app.Commit()) - for range 100 { - if len(db.Blocks()) == 4 { - break - } - time.Sleep(100 * time.Millisecond) - } + require.NoError(t, db.Compact(ctx)) + blocks = db.Blocks() + require.Len(t, blocks, 4, "no new block after compaction") - require.Len(t, db.Blocks(), 4, "no new block created after the set timeout") - - require.LessOrEqual(t, db.Blocks()[3].Meta().MinTime, db.Blocks()[2].Meta().MaxTime, - "new block overlaps old:%v,new:%v", db.Blocks()[2].Meta(), db.Blocks()[3].Meta()) + require.GreaterOrEqual(t, blocks[3].Meta().MinTime, blocks[2].Meta().MaxTime, + "new block overlaps old:%v,new:%v", blocks[2].Meta(), blocks[3].Meta()) } // TestDBReadOnly ensures that opening a DB in readonly mode doesn't modify any files on the disk. diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 18e969f952..3f2861d633 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -2408,6 +2408,7 @@ func TestBlockRanges(t *testing.T) { createBlock(t, dir, genSeries(1, 1, 0, firstBlockMaxT)) db, err := open(dir, logger, nil, DefaultOptions(), []int64{10000}, nil) require.NoError(t, err) + db.DisableCompactions() rangeToTriggerCompaction := db.compactor.(*LeveledCompactor).ranges[0]/2*3 + 1 @@ -2424,21 +2425,16 @@ func TestBlockRanges(t *testing.T) { require.NoError(t, err) require.NoError(t, app.Commit()) - for range 100 { - if len(db.Blocks()) == 2 { - break - } - time.Sleep(100 * time.Millisecond) - } - require.Len(t, db.Blocks(), 2, "no new block created after the set timeout") + require.NoError(t, db.Compact(ctx)) + blocks := db.Blocks() + require.Len(t, blocks, 2, "no new block after compaction") - require.LessOrEqual(t, db.Blocks()[1].Meta().MinTime, db.Blocks()[0].Meta().MaxTime, - "new block overlaps old:%v,new:%v", db.Blocks()[0].Meta(), db.Blocks()[1].Meta()) + require.GreaterOrEqual(t, blocks[1].Meta().MinTime, blocks[0].Meta().MaxTime, + "new block overlaps old:%v,new:%v", blocks[0].Meta(), blocks[1].Meta()) // Test that wal records are skipped when an existing block covers the same time ranges // and compaction doesn't create an overlapping block. app = db.Appender(ctx) - db.DisableCompactions() _, err = app.Append(0, lbl, secondBlockMaxt+1, rand.Float64()) require.NoError(t, err) _, err = app.Append(0, lbl, secondBlockMaxt+2, rand.Float64()) @@ -2455,6 +2451,7 @@ func TestBlockRanges(t *testing.T) { db, err = open(dir, logger, nil, DefaultOptions(), []int64{10000}, nil) require.NoError(t, err) + db.DisableCompactions() defer db.Close() require.Len(t, db.Blocks(), 3, "db doesn't include expected number of blocks") @@ -2464,17 +2461,12 @@ func TestBlockRanges(t *testing.T) { _, err = app.Append(0, lbl, thirdBlockMaxt+rangeToTriggerCompaction, rand.Float64()) // Trigger a compaction require.NoError(t, err) require.NoError(t, app.Commit()) - for range 100 { - if len(db.Blocks()) == 4 { - break - } - time.Sleep(100 * time.Millisecond) - } + require.NoError(t, db.Compact(ctx)) + blocks = db.Blocks() + require.Len(t, blocks, 4, "no new block after compaction") - require.Len(t, db.Blocks(), 4, "no new block created after the set timeout") - - require.LessOrEqual(t, db.Blocks()[3].Meta().MinTime, db.Blocks()[2].Meta().MaxTime, - "new block overlaps old:%v,new:%v", db.Blocks()[2].Meta(), db.Blocks()[3].Meta()) + require.GreaterOrEqual(t, blocks[3].Meta().MinTime, blocks[2].Meta().MaxTime, + "new block overlaps old:%v,new:%v", blocks[2].Meta(), blocks[3].Meta()) } // TestDBReadOnly ensures that opening a DB in readonly mode doesn't modify any files on the disk. From 3d2647dc6f5d1816fdc1f19b35d73dc5d0b1002e Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Mon, 16 Feb 2026 10:30:46 +0100 Subject: [PATCH 05/10] rules: fix flaky TestAsyncRuleEvaluation on Windows (#17965) Convert all timing-sensitive subtests of TestAsyncRuleEvaluation to use synctest for deterministic testing. This fixes flakiness on Windows caused by timer granularity and scheduling variance. The timing assertions are preserved using synctest's fake time, which allows accurate verification of sequential vs concurrent execution timing without relying on wall-clock time. Fixes #17961 Signed-off-by: Arve Knudsen --- rules/manager_test.go | 473 +++++++++++++++++++++--------------------- 1 file changed, 237 insertions(+), 236 deletions(-) diff --git a/rules/manager_test.go b/rules/manager_test.go index 1b9f4be7d5..27930fc4c7 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -49,6 +49,7 @@ import ( "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/util/teststorage" prom_testutil "github.com/prometheus/prometheus/util/testutil" + "github.com/prometheus/prometheus/util/testutil/synctest" ) func TestMain(m *testing.M) { @@ -2010,306 +2011,306 @@ func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { func TestAsyncRuleEvaluation(t *testing.T) { t.Run("synchronous evaluation with independent rules", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) + ctx := t.Context() - ruleManager := NewManager(optsFactory(storage, &maxInflight, &inflightQueries, 0)) - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) + ruleManager := NewManager(optsFactory(storage, &maxInflight, &inflightQueries, 0)) + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) - expectedRuleCount := 6 - expectedSampleCount := 4 + expectedRuleCount := 6 + expectedSampleCount := 4 - for _, group := range groups { - require.Len(t, group.rules, expectedRuleCount) + for _, group := range groups { + require.Len(t, group.rules, expectedRuleCount) - start := time.Now() - DefaultEvalIterationFunc(ctx, group, start) + start := time.Now() + DefaultEvalIterationFunc(ctx, group, start) - // Expected evaluation order - order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) - require.Nil(t, order) + // Expected evaluation order + order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) + require.Nil(t, order) - // Never expect more than 1 inflight query at a time. - require.EqualValues(t, 1, maxInflight.Load()) - // Each rule should take at least 1 second to execute sequentially. - require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) - // Each recording rule produces one vector. - require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) - // Group duration is higher than the sum of rule durations (group overhead). - require.GreaterOrEqual(t, group.GetEvaluationTime(), group.GetRuleEvaluationTimeSum()) - } + // Never expect more than 1 inflight query at a time. + require.EqualValues(t, 1, maxInflight.Load()) + // Each rule should take at least 1 second to execute sequentially. + require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) + // Each recording rule produces one vector. + require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + // Group duration is higher than the sum of rule durations (group overhead). + require.GreaterOrEqual(t, group.GetEvaluationTime(), group.GetRuleEvaluationTimeSum()) + } + }) }) t.Run("asynchronous evaluation with independent and dependent rules", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) + ctx := t.Context() - expectedRuleCount := 6 - expectedSampleCount := 4 - opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + expectedRuleCount := 6 + expectedSampleCount := 4 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) - // Configure concurrency settings. - opts.ConcurrentEvalsEnabled = true - opts.MaxConcurrentEvals = 2 - opts.RuleConcurrencyController = nil - ruleManager := NewManager(opts) + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) - for _, group := range groups { - require.Len(t, group.rules, expectedRuleCount) + for _, group := range groups { + require.Len(t, group.rules, expectedRuleCount) - start := time.Now() - DefaultEvalIterationFunc(ctx, group, start) + start := time.Now() + DefaultEvalIterationFunc(ctx, group, start) - // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. - require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) - // Some rules should execute concurrently so should complete quicker. - require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) - // Each recording rule produces one vector. - require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) - } + // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. + require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) + // Each recording rule produces one vector. + require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + } + }) }) t.Run("asynchronous evaluation of all independent rules, insufficient concurrency", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) + ctx := t.Context() - expectedRuleCount := 8 - expectedSampleCount := expectedRuleCount - opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + expectedRuleCount := 8 + expectedSampleCount := expectedRuleCount + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) - // Configure concurrency settings. - opts.ConcurrentEvalsEnabled = true - opts.MaxConcurrentEvals = 2 - opts.RuleConcurrencyController = nil - ruleManager := NewManager(opts) + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_independent.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_independent.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) - for _, group := range groups { - require.Len(t, group.rules, expectedRuleCount) + for _, group := range groups { + require.Len(t, group.rules, expectedRuleCount) - start := time.Now() - DefaultEvalIterationFunc(ctx, group, start) + start := time.Now() + DefaultEvalIterationFunc(ctx, group, start) - // Expected evaluation order (isn't affected by concurrency settings) - order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) - require.Equal(t, []ConcurrentRules{ - {0, 1, 2, 3, 4, 5, 6, 7}, - }, order) + // Expected evaluation order (isn't affected by concurrency settings) + order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) + require.Equal(t, []ConcurrentRules{ + {0, 1, 2, 3, 4, 5, 6, 7}, + }, order) - // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. - require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) - // Some rules should execute concurrently so should complete quicker. - require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) - // Each recording rule produces one vector. - require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) - } + // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. + require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) + // Each recording rule produces one vector. + require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + } + }) }) t.Run("asynchronous evaluation of all independent rules, sufficient concurrency", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) + ctx := t.Context() - expectedRuleCount := 8 - expectedSampleCount := expectedRuleCount - opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + expectedRuleCount := 8 + expectedSampleCount := expectedRuleCount + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) - // Configure concurrency settings. - opts.ConcurrentEvalsEnabled = true - opts.MaxConcurrentEvals = int64(expectedRuleCount) * 2 - opts.RuleConcurrencyController = nil - ruleManager := NewManager(opts) + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = int64(expectedRuleCount) * 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_independent.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_independent.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) - for _, group := range groups { - require.Len(t, group.rules, expectedRuleCount) + for _, group := range groups { + require.Len(t, group.rules, expectedRuleCount) + + start := time.Now() + + DefaultEvalIterationFunc(ctx, group, start) + + // Expected evaluation order + order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) + require.Equal(t, []ConcurrentRules{ + {0, 1, 2, 3, 4, 5, 6, 7}, + }, order) + + // Max inflight can be up to MaxConcurrentEvals concurrent evals, since there is sufficient concurrency to run all rules at once. + require.LessOrEqual(t, int64(maxInflight.Load()), opts.MaxConcurrentEvals) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) + // Each recording rule produces one vector. + require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + // Group duration is less than the sum of rule durations + require.Less(t, group.GetEvaluationTime(), group.GetRuleEvaluationTimeSum()) + } + }) + }) + + t.Run("asynchronous evaluation of independent rules, with indeterminate. Should be synchronous", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) + + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} + + ctx := t.Context() + + ruleCount := 7 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = int64(ruleCount) * 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) + + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_indeterminates.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + + for _, group := range groups { + require.Len(t, group.rules, ruleCount) + + start := time.Now() + + group.Eval(ctx, start) + + // Never expect more than 1 inflight query at a time. + require.EqualValues(t, 1, maxInflight.Load()) + // Each rule should take at least 1 second to execute sequentially. + require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) + // Each rule produces one vector. + require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + } + }) + }) + + t.Run("asynchronous evaluation of rules that benefit from reordering", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) + + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} + + ctx := t.Context() + + ruleCount := 8 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = int64(ruleCount) * 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) + + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_dependents_on_base.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + var group *Group + for _, g := range groups { + group = g + } start := time.Now() - DefaultEvalIterationFunc(ctx, group, start) - // Expected evaluation order order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) require.Equal(t, []ConcurrentRules{ - {0, 1, 2, 3, 4, 5, 6, 7}, + {0, 4}, + {1, 2, 3, 5, 6, 7}, }, order) - // Max inflight can be up to MaxConcurrentEvals concurrent evals, since there is sufficient concurrency to run all rules at once. - require.LessOrEqual(t, int64(maxInflight.Load()), opts.MaxConcurrentEvals) - // Some rules should execute concurrently so should complete quicker. - require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) - // Each recording rule produces one vector. - require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) - // Group duration is less than the sum of rule durations - require.Less(t, group.GetEvaluationTime(), group.GetRuleEvaluationTimeSum()) - } - }) - - t.Run("asynchronous evaluation of independent rules, with indeterminate. Should be synchronous", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) - - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} - - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - ruleCount := 7 - opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) - - // Configure concurrency settings. - opts.ConcurrentEvalsEnabled = true - opts.MaxConcurrentEvals = int64(ruleCount) * 2 - opts.RuleConcurrencyController = nil - ruleManager := NewManager(opts) - - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_indeterminates.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) - - for _, group := range groups { - require.Len(t, group.rules, ruleCount) - - start := time.Now() - group.Eval(ctx, start) - // Never expect more than 1 inflight query at a time. - require.EqualValues(t, 1, maxInflight.Load()) - // Each rule should take at least 1 second to execute sequentially. - require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) + // Inflight queries should be equal to 6. This is the size of the second batch of rules that can be executed concurrently. + require.EqualValues(t, 6, maxInflight.Load()) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) // Each rule produces one vector. require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) - } - }) - - t.Run("asynchronous evaluation of rules that benefit from reordering", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) - - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} - - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - ruleCount := 8 - opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) - - // Configure concurrency settings. - opts.ConcurrentEvalsEnabled = true - opts.MaxConcurrentEvals = int64(ruleCount) * 2 - opts.RuleConcurrencyController = nil - ruleManager := NewManager(opts) - - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_dependents_on_base.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) - var group *Group - for _, g := range groups { - group = g - } - - start := time.Now() - - // Expected evaluation order - order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) - require.Equal(t, []ConcurrentRules{ - {0, 4}, - {1, 2, 3, 5, 6, 7}, - }, order) - - group.Eval(ctx, start) - - // Inflight queries should be equal to 6. This is the size of the second batch of rules that can be executed concurrently. - require.EqualValues(t, 6, maxInflight.Load()) - // Some rules should execute concurrently so should complete quicker. - require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) - // Each rule produces one vector. - require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + }) }) t.Run("attempted asynchronous evaluation of chained rules", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) + ctx := t.Context() - ruleCount := 7 - opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + ruleCount := 7 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) - // Configure concurrency settings. - opts.ConcurrentEvalsEnabled = true - opts.MaxConcurrentEvals = int64(ruleCount) * 2 - opts.RuleConcurrencyController = nil - ruleManager := NewManager(opts) + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = int64(ruleCount) * 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_chain.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) - var group *Group - for _, g := range groups { - group = g - } + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_chain.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + var group *Group + for _, g := range groups { + group = g + } - start := time.Now() + start := time.Now() - // Expected evaluation order - order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) - require.Equal(t, []ConcurrentRules{ - {0, 1}, - {2}, - {3}, - {4, 5, 6}, - }, order) + // Expected evaluation order + order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) + require.Equal(t, []ConcurrentRules{ + {0, 1}, + {2}, + {3}, + {4, 5, 6}, + }, order) - group.Eval(ctx, start) + group.Eval(ctx, start) - require.EqualValues(t, 3, maxInflight.Load()) - // Some rules should execute concurrently so should complete quicker. - require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) - // Each rule produces one vector. - require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + require.EqualValues(t, 3, maxInflight.Load()) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) + // Each rule produces one vector. + require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + }) }) } From f230a3ad0e7f7730b7e79d296a3a81046c063102 Mon Sep 17 00:00:00 2001 From: George Krajcsovits Date: Mon, 16 Feb 2026 11:53:05 +0100 Subject: [PATCH 06/10] Move krajorama to general maintainer (#18095) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit He's been participating in the bug scrub for a year and provides reviews all over the code base. Also fix name spelling. Signed-off-by: György Krajcsovits --- MAINTAINERS.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/MAINTAINERS.md b/MAINTAINERS.md index 2e4a982382..ae61059af5 100644 --- a/MAINTAINERS.md +++ b/MAINTAINERS.md @@ -6,6 +6,7 @@ General maintainers: * Bryan Boreham (bjboreham@gmail.com / @bboreham) * Ayoub Mrini (ayoubmrini424@gmail.com / @machine424) * Julien Pivotto (roidelapluie@prometheus.io / @roidelapluie) +* György Krajcsovits ( / @krajorama) Maintainers for specific parts of the codebase: * `cmd` @@ -18,7 +19,7 @@ Maintainers for specific parts of the codebase: * `storage` * `remote`: Callum Styan ( / @cstyan), Bartłomiej Płotka ( / @bwplotka), Tom Wilkie (tom.wilkie@gmail.com / @tomwilkie), Alex Greenbank ( / @alexgreenbank) * `otlptranslator`: Arthur Silva Sens ( / @ArthurSens), Arve Knudsen ( / @aknuds1), Jesús Vázquez ( / @jesusvazquez) -* `tsdb`: Ganesh Vernekar ( / @codesome), Bartłomiej Płotka ( / @bwplotka), Jesús Vázquez ( / @jesusvazquez), George Krajcsovits ( / @krajorama) +* `tsdb`: Ganesh Vernekar ( / @codesome), Bartłomiej Płotka ( / @bwplotka), Jesús Vázquez ( / @jesusvazquez) * `web` * `ui`: Julius Volz ( / @juliusv) * `module`: Augustin Husson ( / @nexucis) From 78020ad60ea11485f8087dce949e5f034d197673 Mon Sep 17 00:00:00 2001 From: Martin Valiente Ainz <64830185+tinitiuset@users.noreply.github.com> Date: Mon, 16 Feb 2026 15:38:26 +0100 Subject: [PATCH 07/10] promtool: fix --enable-feature flags ignored in check config and test rules (#18097) Both are regressions from the parser refactoring in #17977. - Fixes #18092 - Fixes #18093 Signed-off-by: Martin Valiente Ainz <64830185+tinitiuset@users.noreply.github.com> --- cmd/promtool/main.go | 6 +++--- cmd/promtool/main_test.go | 9 +++++---- cmd/promtool/unittest.go | 1 + 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index abb709c31d..183b918ba0 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -372,7 +372,7 @@ func main() { os.Exit(CheckSD(*sdConfigFile, *sdJobName, *sdTimeout, prometheus.DefaultRegisterer)) case checkConfigCmd.FullCommand(): - os.Exit(CheckConfig(*agentMode, *checkConfigSyntaxOnly, newConfigLintConfig(*checkConfigLint, *checkConfigLintFatal, *checkConfigIgnoreUnknownFields, model.UTF8Validation, model.Duration(*checkLookbackDelta)), *configFiles...)) + os.Exit(CheckConfig(*agentMode, *checkConfigSyntaxOnly, newConfigLintConfig(*checkConfigLint, *checkConfigLintFatal, *checkConfigIgnoreUnknownFields, model.UTF8Validation, model.Duration(*checkLookbackDelta)), promtoolParser, *configFiles...)) case checkServerHealthCmd.FullCommand(): os.Exit(checkErr(CheckServerStatus(serverURL, checkHealth, httpRoundTripper))) @@ -598,7 +598,7 @@ func CheckServerStatus(serverURL *url.URL, checkEndpoint string, roundTripper ht } // CheckConfig validates configuration files. -func CheckConfig(agentMode, checkSyntaxOnly bool, lintSettings configLintConfig, files ...string) int { +func CheckConfig(agentMode, checkSyntaxOnly bool, lintSettings configLintConfig, p parser.Parser, files ...string) int { failed := false hasErrors := false @@ -619,7 +619,7 @@ func CheckConfig(agentMode, checkSyntaxOnly bool, lintSettings configLintConfig, if !checkSyntaxOnly { scrapeConfigsFailed := lintScrapeConfigs(scrapeConfigs, lintSettings) failed = failed || scrapeConfigsFailed - rulesFailed, rulesHaveErrors := checkRules(ruleFiles, lintSettings.rulesLintConfig, parser.NewParser(parser.Options{})) + rulesFailed, rulesHaveErrors := checkRules(ruleFiles, lintSettings.rulesLintConfig, p) failed = failed || rulesFailed hasErrors = hasErrors || rulesHaveErrors } diff --git a/cmd/promtool/main_test.go b/cmd/promtool/main_test.go index 3b1730d894..297dd35d70 100644 --- a/cmd/promtool/main_test.go +++ b/cmd/promtool/main_test.go @@ -706,20 +706,21 @@ func TestCheckScrapeConfigs(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { // Non-fatal linting. - code := CheckConfig(false, false, newConfigLintConfig(lintOptionTooLongScrapeInterval, false, false, model.UTF8Validation, tc.lookbackDelta), "./testdata/prometheus-config.lint.too_long_scrape_interval.yml") + p := parser.NewParser(parser.Options{}) + code := CheckConfig(false, false, newConfigLintConfig(lintOptionTooLongScrapeInterval, false, false, model.UTF8Validation, tc.lookbackDelta), p, "./testdata/prometheus-config.lint.too_long_scrape_interval.yml") require.Equal(t, successExitCode, code, "Non-fatal linting should return success") // Fatal linting. - code = CheckConfig(false, false, newConfigLintConfig(lintOptionTooLongScrapeInterval, true, false, model.UTF8Validation, tc.lookbackDelta), "./testdata/prometheus-config.lint.too_long_scrape_interval.yml") + code = CheckConfig(false, false, newConfigLintConfig(lintOptionTooLongScrapeInterval, true, false, model.UTF8Validation, tc.lookbackDelta), p, "./testdata/prometheus-config.lint.too_long_scrape_interval.yml") if tc.expectError { require.Equal(t, lintErrExitCode, code, "Fatal linting should return error") } else { require.Equal(t, successExitCode, code, "Fatal linting should return success when there are no problems") } // Check syntax only, no linting. - code = CheckConfig(false, true, newConfigLintConfig(lintOptionTooLongScrapeInterval, true, false, model.UTF8Validation, tc.lookbackDelta), "./testdata/prometheus-config.lint.too_long_scrape_interval.yml") + code = CheckConfig(false, true, newConfigLintConfig(lintOptionTooLongScrapeInterval, true, false, model.UTF8Validation, tc.lookbackDelta), p, "./testdata/prometheus-config.lint.too_long_scrape_interval.yml") require.Equal(t, successExitCode, code, "Fatal linting should return success when checking syntax only") // Lint option "none" should disable linting. - code = CheckConfig(false, false, newConfigLintConfig(lintOptionNone+","+lintOptionTooLongScrapeInterval, true, false, model.UTF8Validation, tc.lookbackDelta), "./testdata/prometheus-config.lint.too_long_scrape_interval.yml") + code = CheckConfig(false, false, newConfigLintConfig(lintOptionNone+","+lintOptionTooLongScrapeInterval, true, false, model.UTF8Validation, tc.lookbackDelta), p, "./testdata/prometheus-config.lint.too_long_scrape_interval.yml") require.Equal(t, successExitCode, code, `Fatal linting should return success when lint option "none" is specified`) }) } diff --git a/cmd/promtool/unittest.go b/cmd/promtool/unittest.go index 7e3db94501..c9278d8a46 100644 --- a/cmd/promtool/unittest.go +++ b/cmd/promtool/unittest.go @@ -255,6 +255,7 @@ func (tg *testGroup) test(testname string, evalInterval time.Duration, groupOrde Context: context.Background(), NotifyFunc: func(context.Context, string, ...*rules.Alert) {}, Logger: promslog.NewNopLogger(), + Parser: tg.parser, } m := rules.NewManager(opts) groupsMap, ers := m.LoadGroups(time.Duration(tg.Interval), tg.ExternalLabels, tg.ExternalURL, nil, ignoreUnknownFields, ruleFiles...) From ae062151cd52526dbfb875c72a9f7cb7f2377cf8 Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Tue, 17 Feb 2026 03:23:54 -0500 Subject: [PATCH 08/10] tsdb/wlog: Remove any temproary checkpoints when creating a Checkpoint (#17598) * RemoveTmpDirs function to tsdbutil * Refactor db to use RemoveTmpDirs and no longer cleanup checkpoint tmp dirs * Use RemoveTmpDirs in wlog checkpoint to cleanup all checkpoint tmp folders * Add tests for RemoveTmpDirs * Ensure db.Open will still cleanup extra temporary checkpoints Signed-off-by: Kyle Eckhart --- tsdb/db.go | 32 ++----- tsdb/tsdbutil/remove_tmp_dirs.go | 45 ++++++++++ tsdb/tsdbutil/remove_tmp_dirs_test.go | 124 ++++++++++++++++++++++++++ tsdb/wlog/checkpoint.go | 37 +++++--- tsdb/wlog/checkpoint_test.go | 78 ++++++++++++++++ 5 files changed, 280 insertions(+), 36 deletions(-) create mode 100644 tsdb/tsdbutil/remove_tmp_dirs.go create mode 100644 tsdb/tsdbutil/remove_tmp_dirs_test.go diff --git a/tsdb/db.go b/tsdb/db.go index 1d73628bfd..a4a4a77f3c 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -929,9 +929,13 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn for _, tmpDir := range []string{walDir, dir} { // Remove tmp dirs. - if err := removeBestEffortTmpDirs(l, tmpDir); err != nil { + if err := tsdbutil.RemoveTmpDirs(l, tmpDir, isTmpDir); err != nil { return nil, fmt.Errorf("remove tmp dirs: %w", err) } + // Remove any temporary checkpoints that might have been interrupted during creation. + if err := wlog.DeleteTempCheckpoints(l, tmpDir); err != nil { + return nil, fmt.Errorf("delete temp checkpoints: %w", err) + } } db := &DB{ @@ -1115,26 +1119,6 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn return db, nil } -func removeBestEffortTmpDirs(l *slog.Logger, dir string) error { - files, err := os.ReadDir(dir) - if os.IsNotExist(err) { - return nil - } - if err != nil { - return err - } - for _, f := range files { - if isTmpDir(f) { - if err := os.RemoveAll(filepath.Join(dir, f.Name())); err != nil { - l.Error("failed to delete tmp block dir", "dir", filepath.Join(dir, f.Name()), "err", err) - continue - } - l.Info("Found and deleted tmp block dir", "dir", filepath.Join(dir, f.Name())) - } - } - return nil -} - // StartTime implements the Storage interface. func (db *DB) StartTime() (int64, error) { db.mtx.RLock() @@ -2538,8 +2522,7 @@ func isBlockDir(fi fs.DirEntry) bool { return err == nil } -// isTmpDir returns true if the given file-info contains a block ULID, a checkpoint prefix, -// or a chunk snapshot prefix and a tmp extension. +// isTmpDir returns true if the given file-info contains a block ULID, or a chunk snapshot prefix and a tmp extension. func isTmpDir(fi fs.DirEntry) bool { if !fi.IsDir() { return false @@ -2548,9 +2531,6 @@ func isTmpDir(fi fs.DirEntry) bool { fn := fi.Name() ext := filepath.Ext(fn) if ext == tmpForDeletionBlockDirSuffix || ext == tmpForCreationBlockDirSuffix || ext == tmpLegacy { - if strings.HasPrefix(fn, wlog.CheckpointPrefix) { - return true - } if strings.HasPrefix(fn, chunkSnapshotPrefix) { return true } diff --git a/tsdb/tsdbutil/remove_tmp_dirs.go b/tsdb/tsdbutil/remove_tmp_dirs.go new file mode 100644 index 0000000000..a95db3159e --- /dev/null +++ b/tsdb/tsdbutil/remove_tmp_dirs.go @@ -0,0 +1,45 @@ +// Copyright 2018 The Prometheus Authors + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdbutil + +import ( + "io/fs" + "log/slog" + "os" + "path/filepath" +) + +// RemoveTmpDirs attempts to remove directories in the specified directory which match the isTmpDir predicate. +// Errors encountered during reading the directory that other than non-existence are returned. All other errors +// encountered during removal of tmp directories are logged but do not cause early termination. +func RemoveTmpDirs(l *slog.Logger, dir string, isTmpDir func(fi fs.DirEntry) bool) error { + files, err := os.ReadDir(dir) + if os.IsNotExist(err) { + return nil + } + if err != nil { + return err + } + for _, f := range files { + if isTmpDir(f) { + if err := os.RemoveAll(filepath.Join(dir, f.Name())); err != nil { + l.Error("failed to delete tmp dir", "dir", filepath.Join(dir, f.Name()), "err", err) + continue + } + l.Info("Found and deleted tmp dir", "dir", filepath.Join(dir, f.Name())) + } + } + return nil +} diff --git a/tsdb/tsdbutil/remove_tmp_dirs_test.go b/tsdb/tsdbutil/remove_tmp_dirs_test.go new file mode 100644 index 0000000000..4ab282d3b3 --- /dev/null +++ b/tsdb/tsdbutil/remove_tmp_dirs_test.go @@ -0,0 +1,124 @@ +// Copyright 2018 The Prometheus Authors + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdbutil + +import ( + "io/fs" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/prometheus/common/promslog" + "github.com/stretchr/testify/require" +) + +func TestRemoveTmpDirs(t *testing.T) { + tests := []struct { + name string + isTmpDir func(fi fs.DirEntry) bool + setup func(t *testing.T, dir string) + expectedDirs []string // Directories that should remain after cleanup + }{ + { + name: "remove directories with tmp prefix", + isTmpDir: func(fi fs.DirEntry) bool { + return fi.IsDir() && strings.HasPrefix(fi.Name(), "tmp") + }, + setup: func(t *testing.T, dir string) { + require.NoError(t, os.Mkdir(filepath.Join(dir, "tmpdir1"), 0o755)) + require.NoError(t, os.Mkdir(filepath.Join(dir, "tmpdir2"), 0o755)) + require.NoError(t, os.Mkdir(filepath.Join(dir, "normaldir"), 0o755)) + }, + expectedDirs: []string{"normaldir"}, + }, + { + name: "remove directories with specific suffix", + isTmpDir: func(fi fs.DirEntry) bool { + return fi.IsDir() && strings.HasSuffix(fi.Name(), ".tmp") + }, + setup: func(t *testing.T, dir string) { + require.NoError(t, os.Mkdir(filepath.Join(dir, "data.tmp"), 0o755)) + require.NoError(t, os.Mkdir(filepath.Join(dir, "cache.tmp"), 0o755)) + require.NoError(t, os.Mkdir(filepath.Join(dir, "permanent"), 0o755)) + }, + expectedDirs: []string{"permanent"}, + }, + { + name: "no temporary directories to remove", + isTmpDir: func(fi fs.DirEntry) bool { + return fi.IsDir() && strings.HasPrefix(fi.Name(), "tmp") + }, + setup: func(t *testing.T, dir string) { + require.NoError(t, os.Mkdir(filepath.Join(dir, "normaldir1"), 0o755)) + require.NoError(t, os.Mkdir(filepath.Join(dir, "normaldir2"), 0o755)) + }, + expectedDirs: []string{"normaldir1", "normaldir2"}, + }, + { + name: "empty directory", + isTmpDir: func(fi fs.DirEntry) bool { + return fi.IsDir() && strings.HasPrefix(fi.Name(), "tmp") + }, + setup: func(_ *testing.T, _ string) {}, // No setup needed - directory is empty + expectedDirs: []string{}, + }, + { + name: "directory with files only (no directories)", + isTmpDir: func(fi fs.DirEntry) bool { + return fi.IsDir() && strings.HasPrefix(fi.Name(), "tmp") + }, + setup: func(t *testing.T, dir string) { + require.NoError(t, os.WriteFile(filepath.Join(dir, "tmpfile1.txt"), []byte("test"), 0o644)) + require.NoError(t, os.WriteFile(filepath.Join(dir, "tmpfile2.txt"), []byte("test"), 0o644)) + }, + expectedDirs: []string{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testDir := t.TempDir() + + if tt.setup != nil { + tt.setup(t, testDir) + } + + require.NoError(t, RemoveTmpDirs(promslog.NewNopLogger(), testDir, tt.isTmpDir)) + + entries, err := os.ReadDir(testDir) + require.NoError(t, err) + + // Get actual remaining directories + var actualDirs []string + for _, entry := range entries { + if entry.IsDir() { + actualDirs = append(actualDirs, entry.Name()) + } + } + + require.ElementsMatch(t, tt.expectedDirs, actualDirs, "Remaining directories don't match expected") + }) + } +} + +func TestRemoveTmpDirs_NonExistentDirectory(t *testing.T) { + testDir := t.TempDir() + nonExistent := filepath.Join(testDir, "does_not_exist") + + require.NoError(t, RemoveTmpDirs(promslog.NewNopLogger(), nonExistent, func(_ fs.DirEntry) bool { + return true + })) +} diff --git a/tsdb/wlog/checkpoint.go b/tsdb/wlog/checkpoint.go index 6742141fbc..3a4e194fec 100644 --- a/tsdb/wlog/checkpoint.go +++ b/tsdb/wlog/checkpoint.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "io" + "io/fs" "log/slog" "math" "os" @@ -31,6 +32,7 @@ import ( "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/tombstones" + "github.com/prometheus/prometheus/tsdb/tsdbutil" ) // CheckpointStats returns stats about a created checkpoint. @@ -80,8 +82,16 @@ func DeleteCheckpoints(dir string, maxIndex int) error { return errors.Join(errs...) } -// CheckpointPrefix is the prefix used for checkpoint files. -const CheckpointPrefix = "checkpoint." +// checkpointTempFileSuffix is the suffix used when creating temporary checkpoint files. +const checkpointTempFileSuffix = ".tmp" + +// DeleteTempCheckpoints deletes all temporary checkpoint directories in the given directory. +func DeleteTempCheckpoints(logger *slog.Logger, dir string) error { + if err := tsdbutil.RemoveTmpDirs(logger, dir, isTempDir); err != nil { + return fmt.Errorf("remove previous temporary checkpoint dirs: %w", err) + } + return nil +} // Checkpoint creates a compacted checkpoint of segments in range [from, to] in the given WAL. // It includes the most recent checkpoint if it exists. @@ -123,13 +133,13 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He defer sgmReader.Close() } - cpdir := checkpointDir(w.Dir(), to) - cpdirtmp := cpdir + ".tmp" - - if err := os.RemoveAll(cpdirtmp); err != nil { - return nil, fmt.Errorf("remove previous temporary checkpoint dir: %w", err) + if err := DeleteTempCheckpoints(logger, w.Dir()); err != nil { + return nil, err } + cpdir := checkpointDir(w.Dir(), to) + cpdirtmp := cpdir + checkpointTempFileSuffix + if err := os.MkdirAll(cpdirtmp, 0o777); err != nil { return nil, fmt.Errorf("create checkpoint dir: %w", err) } @@ -394,8 +404,11 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He return stats, nil } +// checkpointPrefix is the prefix used for checkpoint files. +const checkpointPrefix = "checkpoint." + func checkpointDir(dir string, i int) string { - return filepath.Join(dir, fmt.Sprintf(CheckpointPrefix+"%08d", i)) + return filepath.Join(dir, fmt.Sprintf(checkpointPrefix+"%08d", i)) } type checkpointRef struct { @@ -411,13 +424,13 @@ func listCheckpoints(dir string) (refs []checkpointRef, err error) { for i := range files { fi := files[i] - if !strings.HasPrefix(fi.Name(), CheckpointPrefix) { + if !strings.HasPrefix(fi.Name(), checkpointPrefix) { continue } if !fi.IsDir() { return nil, fmt.Errorf("checkpoint %s is not a directory", fi.Name()) } - idx, err := strconv.Atoi(fi.Name()[len(CheckpointPrefix):]) + idx, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) if err != nil { continue } @@ -431,3 +444,7 @@ func listCheckpoints(dir string) (refs []checkpointRef, err error) { return refs, nil } + +func isTempDir(fi fs.DirEntry) bool { + return strings.HasPrefix(fi.Name(), checkpointPrefix) && strings.HasSuffix(fi.Name(), checkpointTempFileSuffix) +} diff --git a/tsdb/wlog/checkpoint_test.go b/tsdb/wlog/checkpoint_test.go index 97ca2e768d..a348239ec7 100644 --- a/tsdb/wlog/checkpoint_test.go +++ b/tsdb/wlog/checkpoint_test.go @@ -417,3 +417,81 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) { }) require.NoError(t, err) } + +func TestCheckpointDeletesTemporaryCheckpoints(t *testing.T) { + dir := t.TempDir() + + // Create one tmp checkpoint directory + require.NoError(t, os.MkdirAll(filepath.Join(dir, "checkpoint.00001000.tmp"), 0o777)) + + w, err := New(nil, nil, dir, compression.None) + require.NoError(t, err) + defer w.Close() + + _, err = Checkpoint(promslog.NewNopLogger(), w, 0, 1000, func(_ chunks.HeadSeriesRef) bool { return true }, 1000) + require.NoError(t, err) + + files, err := os.ReadDir(dir) + require.NoError(t, err) + + var actualDirectories []string + for _, f := range files { + if !f.IsDir() { + continue + } + actualDirectories = append(actualDirectories, f.Name()) + } + require.Equal(t, []string{"checkpoint.00001000"}, actualDirectories) +} + +func TestDeleteTempCheckpoints(t *testing.T) { + testCases := []struct { + name string + checkpointDirectoriesToCreate []string + expectedDirectories []string + }{ + { + name: "no tmp checkpoints", + checkpointDirectoriesToCreate: nil, + expectedDirectories: nil, + }, + { + name: "one tmp checkpoint", + checkpointDirectoriesToCreate: []string{"checkpoint.00001000.tmp"}, + expectedDirectories: nil, + }, + { + name: "many tmp checkpoints", + checkpointDirectoriesToCreate: []string{"checkpoint.00000001.tmp", "checkpoint.00001000.tmp", "checkpoint.00002000.tmp"}, + expectedDirectories: nil, + }, + { + name: "mix of tmp and regular checkpoints", + checkpointDirectoriesToCreate: []string{"checkpoint.00000001", "checkpoint.00000001.tmp", "checkpoint.00001000.tmp", "checkpoint.00002000"}, + expectedDirectories: []string{"checkpoint.00000001", "checkpoint.00002000"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dir := t.TempDir() + for _, fn := range tc.checkpointDirectoriesToCreate { + require.NoError(t, os.MkdirAll(filepath.Join(dir, fn), 0o777)) + } + + require.NoError(t, DeleteTempCheckpoints(promslog.NewNopLogger(), dir)) + + files, err := os.ReadDir(dir) + require.NoError(t, err) + + var actualDirectories []string + for _, f := range files { + if !f.IsDir() { + continue + } + actualDirectories = append(actualDirectories, f.Name()) + } + require.Equal(t, tc.expectedDirectories, actualDirectories) + }) + } +} From 7d0a39ac934ff30e89074336801c5f6b536c1f3c Mon Sep 17 00:00:00 2001 From: Julien Pivotto <291750+roidelapluie@users.noreply.github.com> Date: Tue, 17 Feb 2026 11:59:11 +0100 Subject: [PATCH 09/10] chore(lint): enable wg.Go Since our minimum supported go version is now go 1.25, we can use wg.Go. Signed-off-by: Julien Pivotto <291750+roidelapluie@users.noreply.github.com> --- .golangci.yml | 2 -- cmd/promtool/tsdb.go | 7 ++----- discovery/manager_test.go | 12 ++++-------- promql/engine_test.go | 12 ++++-------- rules/manager_test.go | 6 ++---- tsdb/chunks/chunk_write_queue.go | 7 ++----- tsdb/chunks/queue_test.go | 16 ++++------------ tsdb/compact_test.go | 7 ++----- tsdb/head_append_v2_test.go | 6 ++---- tsdb/head_test.go | 18 ++++++------------ tsdb/isolation_test.go | 21 ++++++--------------- util/treecache/treecache.go | 6 ++---- web/web_test.go | 6 ++---- 13 files changed, 38 insertions(+), 88 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 8cb3265f4f..05a23b53b2 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -128,8 +128,6 @@ linters: # Disable this check for now since it introduces too many changes in our existing codebase. # See https://pkg.go.dev/golang.org/x/tools/go/analysis/passes/modernize#hdr-Analyzer_omitzero for more details. - omitzero - # Disable waitgroup check until we really move to Go 1.25. - - waitgroup perfsprint: # Optimizes even if it requires an int or uint type cast. int-conversion: true diff --git a/cmd/promtool/tsdb.go b/cmd/promtool/tsdb.go index 1aaf87bc42..f43da0e1d0 100644 --- a/cmd/promtool/tsdb.go +++ b/cmd/promtool/tsdb.go @@ -159,17 +159,14 @@ func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (u batch := lbls[:l] lbls = lbls[l:] - wg.Add(1) - go func() { - defer wg.Done() - + wg.Go(func() { n, err := b.ingestScrapesShard(batch, 100, int64(timeDelta*i)) if err != nil { // exitWithError(err) fmt.Println(" err", err) } total.Add(n) - }() + }) } wg.Wait() } diff --git a/discovery/manager_test.go b/discovery/manager_test.go index 162730d9aa..8a49005100 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -1562,11 +1562,9 @@ func TestConfigReloadAndShutdownRace(t *testing.T) { discoveryManager.updatert = 100 * time.Millisecond var wgDiscovery sync.WaitGroup - wgDiscovery.Add(1) - go func() { + wgDiscovery.Go(func() { discoveryManager.Run() - wgDiscovery.Done() - }() + }) time.Sleep(time.Millisecond * 200) var wgBg sync.WaitGroup @@ -1588,11 +1586,9 @@ func TestConfigReloadAndShutdownRace(t *testing.T) { discoveryManager.ApplyConfig(c) delete(c, "prometheus") - wgBg.Add(1) - go func() { + wgBg.Go(func() { discoveryManager.ApplyConfig(c) - wgBg.Done() - }() + }) mgrCancel() wgDiscovery.Wait() diff --git a/promql/engine_test.go b/promql/engine_test.go index f911419c62..5dfffd7cc7 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -94,11 +94,9 @@ func TestQueryConcurrency(t *testing.T) { var wg sync.WaitGroup for range maxConcurrency { q := engine.NewTestQuery(f) - wg.Add(1) - go func() { + wg.Go(func() { q.Exec(ctx) - wg.Done() - }() + }) select { case <-processing: // Expected. @@ -108,11 +106,9 @@ func TestQueryConcurrency(t *testing.T) { } q := engine.NewTestQuery(f) - wg.Add(1) - go func() { + wg.Go(func() { q.Exec(ctx) - wg.Done() - }() + }) select { case <-processing: diff --git a/rules/manager_test.go b/rules/manager_test.go index 27930fc4c7..19c815e50c 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -2473,11 +2473,9 @@ func TestBoundedRuleEvalConcurrency(t *testing.T) { // Evaluate groups concurrently (like they normally do). var wg sync.WaitGroup for _, group := range groups { - wg.Add(1) - go func() { + wg.Go(func() { group.Eval(ctx, time.Now()) - wg.Done() - }() + }) } wg.Wait() diff --git a/tsdb/chunks/chunk_write_queue.go b/tsdb/chunks/chunk_write_queue.go index 1a046ea00a..a87c2602cd 100644 --- a/tsdb/chunks/chunk_write_queue.go +++ b/tsdb/chunks/chunk_write_queue.go @@ -111,10 +111,7 @@ func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChu } func (c *chunkWriteQueue) start() { - c.workerWg.Add(1) - go func() { - defer c.workerWg.Done() - + c.workerWg.Go(func() { for { job, ok := c.jobs.pop() if !ok { @@ -123,7 +120,7 @@ func (c *chunkWriteQueue) start() { c.processJob(job) } - }() + }) c.isRunningMtx.Lock() c.isRunning = true diff --git a/tsdb/chunks/queue_test.go b/tsdb/chunks/queue_test.go index 377a8181ff..2e3fff59a8 100644 --- a/tsdb/chunks/queue_test.go +++ b/tsdb/chunks/queue_test.go @@ -269,34 +269,26 @@ func TestQueuePushPopManyGoroutines(t *testing.T) { readersWG := sync.WaitGroup{} for range readGoroutines { - readersWG.Add(1) - - go func() { - defer readersWG.Done() - + readersWG.Go(func() { for j, ok := queue.pop(); ok; j, ok = queue.pop() { refsMx.Lock() refs[j.seriesRef] = true refsMx.Unlock() } - }() + }) } id := atomic.Uint64{} writersWG := sync.WaitGroup{} for range writeGoroutines { - writersWG.Add(1) - - go func() { - defer writersWG.Done() - + writersWG.Go(func() { for range writes { ref := id.Inc() require.True(t, queue.push(chunkWriteJob{seriesRef: HeadSeriesRef(ref)})) } - }() + }) } // Wait until all writes are done. diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index afe15a5f31..44a0921eec 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -1717,10 +1717,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) { var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - + wg.Go(func() { // Ingest sparse histograms. for _, ah := range allSparseSeries { var ( @@ -1743,7 +1740,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) { sparseULIDs, err = compactor.Write(sparseHead.opts.ChunkDirRoot, sparseHead, mint, maxt, nil) require.NoError(t, err) require.Len(t, sparseULIDs, 1) - }() + }) wg.Add(1) go func(c testcase) { diff --git a/tsdb/head_append_v2_test.go b/tsdb/head_append_v2_test.go index 082d756e60..61b2eecf4e 100644 --- a/tsdb/head_append_v2_test.go +++ b/tsdb/head_append_v2_test.go @@ -1334,13 +1334,11 @@ func TestDataMissingOnQueryDuringCompaction_AppenderV2(t *testing.T) { require.NoError(t, err) var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { // Compacting head while the querier spans the compaction time. require.NoError(t, db.Compact(ctx)) require.NotEmpty(t, db.Blocks()) - }() + }) // Give enough time for compaction to finish. // We expect it to be blocked until querier is closed. diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 7b8ae0ecbd..142fbc18e7 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -3259,12 +3259,10 @@ func testHeadSeriesChunkRace(t *testing.T) { defer q.Close() var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { h.updateMinMaxTime(20, 25) h.gc() - }() + }) ss := q.Select(context.Background(), false, nil, matcher) for ss.Next() { } @@ -3748,13 +3746,11 @@ func TestChunkNotFoundHeadGCRace(t *testing.T) { s := ss.At() var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { // Compacting head while the querier spans the compaction time. require.NoError(t, db.Compact(ctx)) require.NotEmpty(t, db.Blocks()) - }() + }) // Give enough time for compaction to finish. // We expect it to be blocked until querier is closed. @@ -3812,13 +3808,11 @@ func TestDataMissingOnQueryDuringCompaction(t *testing.T) { require.NoError(t, err) var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { // Compacting head while the querier spans the compaction time. require.NoError(t, db.Compact(ctx)) require.NotEmpty(t, db.Blocks()) - }() + }) // Give enough time for compaction to finish. // We expect it to be blocked until querier is closed. diff --git a/tsdb/isolation_test.go b/tsdb/isolation_test.go index f2671024e8..2b2e1a6487 100644 --- a/tsdb/isolation_test.go +++ b/tsdb/isolation_test.go @@ -88,10 +88,7 @@ func BenchmarkIsolation(b *testing.B) { start := make(chan struct{}) for range goroutines { - wg.Add(1) - - go func() { - defer wg.Done() + wg.Go(func() { <-start for b.Loop() { @@ -99,7 +96,7 @@ func BenchmarkIsolation(b *testing.B) { iso.closeAppend(appendID) } - }() + }) } b.ResetTimer() @@ -118,10 +115,7 @@ func BenchmarkIsolationWithState(b *testing.B) { start := make(chan struct{}) for range goroutines { - wg.Add(1) - - go func() { - defer wg.Done() + wg.Go(func() { <-start for b.Loop() { @@ -129,7 +123,7 @@ func BenchmarkIsolationWithState(b *testing.B) { iso.closeAppend(appendID) } - }() + }) } readers := goroutines / 100 @@ -138,17 +132,14 @@ func BenchmarkIsolationWithState(b *testing.B) { } for g := 0; g < readers; g++ { - wg.Add(1) - - go func() { - defer wg.Done() + wg.Go(func() { <-start for b.Loop() { s := iso.State(math.MinInt64, math.MaxInt64) s.Close() } - }() + }) } b.ResetTimer() diff --git a/util/treecache/treecache.go b/util/treecache/treecache.go index 32912c5a94..deb950b55a 100644 --- a/util/treecache/treecache.go +++ b/util/treecache/treecache.go @@ -265,8 +265,7 @@ func (tc *ZookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTr } } - tc.wg.Add(1) - go func() { + tc.wg.Go(func() { numWatchers.Inc() // Pass up zookeeper events, until the node is deleted. select { @@ -277,8 +276,7 @@ func (tc *ZookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTr case <-node.done: } numWatchers.Dec() - tc.wg.Done() - }() + }) return nil } diff --git a/web/web_test.go b/web/web_test.go index cbcf15ffdc..ff486beee0 100644 --- a/web/web_test.go +++ b/web/web_test.go @@ -509,14 +509,12 @@ func TestHandleMultipleQuitRequests(t *testing.T) { start := make(chan struct{}) var wg sync.WaitGroup for range 3 { - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { <-start resp, err := http.Post(baseURL+"/-/quit", "", strings.NewReader("")) require.NoError(t, err) require.Equal(t, http.StatusOK, resp.StatusCode) - }() + }) } close(start) wg.Wait() From 5bd0d00f8ca1ec808015b30f30b3cf1dba656518 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Linas=20Med=C5=BEi=C5=ABnas?= Date: Wed, 18 Feb 2026 18:32:29 +0200 Subject: [PATCH 10/10] PromQL: Add experimental histogram_quantiles variadic function (#17285) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Linas Medziunas Signed-off-by: Björn Rabenstein Signed-off-by: beorn7 Co-authored-by: Björn Rabenstein Co-authored-by: beorn7 --- cmd/prometheus/testdata/features.json | 1 + docs/querying/functions.md | 17 ++++ promql/engine.go | 34 ++++++- promql/functions.go | 88 ++++++++++++++++++- promql/parser/functions.go | 7 ++ promql/promqltest/testdata/histograms.test | 86 +++++++++++++++++- .../testdata/native_histograms.test | 15 ++++ web/ui/mantine-ui/src/promql/functionDocs.tsx | 27 ++++++ .../src/promql/functionSignatures.ts | 6 ++ .../src/complete/promql.terms.ts | 6 ++ .../codemirror-promql/src/types/function.ts | 7 ++ web/ui/module/lezer-promql/src/promql.grammar | 2 + 12 files changed, 292 insertions(+), 4 deletions(-) diff --git a/cmd/prometheus/testdata/features.json b/cmd/prometheus/testdata/features.json index c39f60ab33..5fc01aa195 100644 --- a/cmd/prometheus/testdata/features.json +++ b/cmd/prometheus/testdata/features.json @@ -80,6 +80,7 @@ "histogram_count": true, "histogram_fraction": true, "histogram_quantile": true, + "histogram_quantiles": false, "histogram_stddev": true, "histogram_stdvar": true, "histogram_sum": true, diff --git a/docs/querying/functions.md b/docs/querying/functions.md index 3a9b7025f8..68a003359d 100644 --- a/docs/querying/functions.md +++ b/docs/querying/functions.md @@ -433,6 +433,23 @@ and is therefore flagged by an info-level annotation reading `input to histogram_quantile needed to be fixed for monotonicity`. If you encounter this annotation, you should find and remove the source of the invalid data. +## `histogram_quantiles()` + +**This function has to be enabled via the [feature +flag](../feature_flags.md#experimental-promql-functions) +`--enable-feature=promql-experimental-functions`.** + +`histogram_quantiles(v instant-vector, quantile_label string, φ_1 scalar, φ_2 scalar, ...)` calculates multiple (between 1 and 10) φ-quantiles (0 ≤ +φ ≤ 1) from a [classic +histogram](https://prometheus.io/docs/concepts/metric_types/#histogram) or from +a native histogram. Quantile calculation works the same way as in `histogram_quantile()`. +The second argument (a string) specifies the label name that is used to identify different quantiles in the query result. +``` +histogram_quantiles(sum(rate(foo[1m])), "quantile", 0.9, 0.99) +# => {quantile="0.9"} 123 + {quantile="0.99"} 128 +``` + ## `histogram_stddev()` and `histogram_stdvar()` `histogram_stddev(v instant-vector)` returns the estimated standard deviation diff --git a/promql/engine.go b/promql/engine.go index eb41e40605..bd7b868d86 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1214,6 +1214,9 @@ type EvalNodeHelper struct { // funcHistogramQuantile and funcHistogramFraction for classic histograms. signatureToMetricWithBuckets map[string]*metricWithBuckets nativeHistogramSamples []Sample + // funcHistogramQuantiles for histograms. + quantileStrs map[float64]string + signatureToLabelsWithQuantile map[string]map[float64]labels.Labels lb *labels.Builder lblBuf []byte @@ -1305,6 +1308,35 @@ func (enh *EvalNodeHelper) resetHistograms(inVec Vector, arg parser.Expr) annota return annos } +func (enh *EvalNodeHelper) getOrCreateLblsWithQuantile(lbls labels.Labels, quantileLabel string, q float64) labels.Labels { + if enh.signatureToLabelsWithQuantile == nil { + enh.signatureToLabelsWithQuantile = make(map[string]map[float64]labels.Labels) + } + + enh.lblBuf = lbls.Bytes(enh.lblBuf) + cachedLbls, ok := enh.signatureToLabelsWithQuantile[string(enh.lblBuf)] + if !ok { + cachedLbls = make(map[float64]labels.Labels, len(enh.quantileStrs)) + enh.signatureToLabelsWithQuantile[string(enh.lblBuf)] = cachedLbls + } + + cachedLblsWithQuantile, ok := cachedLbls[q] + if !ok { + quantileStr := "NaN" + if !math.IsNaN(q) { + // Cannot do map lookup by NaN key. + quantileStr = enh.quantileStrs[q] + } + cachedLblsWithQuantile = labels.NewBuilder(lbls). + Set(quantileLabel, quantileStr). + Labels() + + cachedLbls[q] = cachedLblsWithQuantile + } + + return cachedLblsWithQuantile +} + // rangeEval evaluates the given expressions, and then for each step calls // the given funcCall with the values computed for each expression at that // step. The return value is the combination into time series of all the @@ -4320,7 +4352,7 @@ func detectHistogramStatsDecoding(expr parser.Expr) { // further up (the latter wouldn't make sense, // but no harm in detecting it). n.SkipHistogramBuckets = true - case "histogram_quantile", "histogram_fraction": + case "histogram_quantile", "histogram_quantiles", "histogram_fraction": // If we ever see a function that needs the // whole histogram, we will not skip the // buckets. diff --git a/promql/functions.go b/promql/functions.go index 2cb90a9b6c..546f94df12 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -1720,8 +1720,8 @@ func funcHistogramQuantile(vectorVals []Vector, _ Matrix, args parser.Expression inVec := vectorVals[1] var annos annotations.Annotations - if math.IsNaN(q) || q < 0 || q > 1 { - annos.Add(annotations.NewInvalidQuantileWarning(q, args[0].PositionRange())) + if err := validateQuantile(q, args[0]); err != nil { + annos.Add(err) } annos.Merge(enh.resetHistograms(inVec, args[1])) @@ -1770,6 +1770,89 @@ func funcHistogramQuantile(vectorVals []Vector, _ Matrix, args parser.Expression return enh.Out, annos } +func validateQuantile(q float64, arg parser.Expr) error { + if math.IsNaN(q) || q < 0 || q > 1 { + return annotations.NewInvalidQuantileWarning(q, arg.PositionRange()) + } + return nil +} + +// === histogram_quantiles(Vector parser.ValueTypeVector, label parser.ValueTypeString, q0 parser.ValueTypeScalar, qs parser.ValueTypeScalar...) (Vector, Annotations) === +func funcHistogramQuantiles(vectorVals []Vector, _ Matrix, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) { + var ( + inVec = vectorVals[0] + quantileLabel = args[1].(*parser.StringLiteral).Val + numQuantiles = len(vectorVals[2:]) + qs = make([]float64, 0, numQuantiles) + + annos annotations.Annotations + ) + + if enh.quantileStrs == nil { + enh.quantileStrs = make(map[float64]string, numQuantiles) + } + for i := 2; i < len(vectorVals); i++ { + q := vectorVals[i][0].F + + if err := validateQuantile(q, args[i]); err != nil { + annos.Add(err) + } + + if _, ok := enh.quantileStrs[q]; !ok { + enh.quantileStrs[q] = labels.FormatOpenMetricsFloat(q) + } + qs = append(qs, q) + } + + annos.Merge(enh.resetHistograms(inVec, args[0])) + + for _, q := range qs { + // Deal with the native histograms. + for _, sample := range enh.nativeHistogramSamples { + if sample.H == nil { + // Native histogram conflicts with classic histogram at the same timestamp, ignore. + continue + } + if !enh.enableDelayedNameRemoval { + sample.Metric = sample.Metric.DropReserved(schema.IsMetadataLabel) + } + hq, hqAnnos := HistogramQuantile(q, sample.H, sample.Metric.Get(model.MetricNameLabel), args[0].PositionRange()) + annos.Merge(hqAnnos) + enh.Out = append(enh.Out, Sample{ + Metric: enh.getOrCreateLblsWithQuantile(sample.Metric, quantileLabel, q), + F: hq, + DropName: true, + }) + } + + // Deal with classic histograms that have already been filtered for conflicting native histograms. + for _, mb := range enh.signatureToMetricWithBuckets { + if len(mb.buckets) > 0 { + hq, forcedMonotonicity, _, minBucket, maxBucket, maxDiff := BucketQuantile(q, mb.buckets) + if forcedMonotonicity { + metricName := "" + if enh.enableDelayedNameRemoval { + metricName = getMetricName(mb.metric) + } + annos.Add(annotations.NewHistogramQuantileForcedMonotonicityInfo(metricName, args[1].PositionRange(), enh.Ts, minBucket, maxBucket, maxDiff)) + } + + if !enh.enableDelayedNameRemoval { + mb.metric = mb.metric.DropReserved(schema.IsMetadataLabel) + } + + enh.Out = append(enh.Out, Sample{ + Metric: enh.getOrCreateLblsWithQuantile(mb.metric, quantileLabel, q), + F: hq, + DropName: true, + }) + } + } + } + + return enh.Out, annos +} + // pickFirstSampleIndex returns the index of the last sample before // or at the range start, or 0 if none exist before the range start. // If the vector selector is not anchored, it always returns 0, true. @@ -2100,6 +2183,7 @@ var FunctionCalls = map[string]FunctionCall{ "histogram_count": funcHistogramCount, "histogram_fraction": funcHistogramFraction, "histogram_quantile": funcHistogramQuantile, + "histogram_quantiles": funcHistogramQuantiles, "histogram_sum": funcHistogramSum, "histogram_stddev": funcHistogramStdDev, "histogram_stdvar": funcHistogramStdVar, diff --git a/promql/parser/functions.go b/promql/parser/functions.go index c7c7332305..180a255ab0 100644 --- a/promql/parser/functions.go +++ b/promql/parser/functions.go @@ -205,6 +205,13 @@ var Functions = map[string]*Function{ ArgTypes: []ValueType{ValueTypeScalar, ValueTypeVector}, ReturnType: ValueTypeVector, }, + "histogram_quantiles": { + Name: "histogram_quantiles", + ArgTypes: []ValueType{ValueTypeVector, ValueTypeString, ValueTypeScalar, ValueTypeScalar}, + Variadic: 9, + ReturnType: ValueTypeVector, + Experimental: true, + }, "double_exponential_smoothing": { Name: "double_exponential_smoothing", ArgTypes: []ValueType{ValueTypeMatrix, ValueTypeScalar, ValueTypeScalar}, diff --git a/promql/promqltest/testdata/histograms.test b/promql/promqltest/testdata/histograms.test index 436390ee41..db7d5de230 100644 --- a/promql/promqltest/testdata/histograms.test +++ b/promql/promqltest/testdata/histograms.test @@ -598,6 +598,40 @@ eval instant at 50m histogram_quantile(1, testhistogram3_bucket) {start="positive"} 1 {start="negative"} -0.1 +eval instant at 50m histogram_quantiles(testhistogram3, "q", 0, 0.25, 0.5, 0.75, 1) + expect no_warn + {q="0.0", start="positive"} 0 + {q="0.0", start="negative"} -0.25 + {q="0.25", start="positive"} 0.055 + {q="0.25", start="negative"} -0.225 + {q="0.5", start="positive"} 0.125 + {q="0.5", start="negative"} -0.2 + {q="0.75", start="positive"} 0.45 + {q="0.75", start="negative"} -0.15 + {q="1.0", start="positive"} 1 + {q="1.0", start="negative"} -0.1 + +eval instant at 50m histogram_quantiles(testhistogram3_bucket, "q", 0, 0.25, 0.5, 0.75, 1) + expect no_warn + {q="0.0", start="positive"} 0 + {q="0.0", start="negative"} -0.25 + {q="0.25", start="positive"} 0.055 + {q="0.25", start="negative"} -0.225 + {q="0.5", start="positive"} 0.125 + {q="0.5", start="negative"} -0.2 + {q="0.75", start="positive"} 0.45 + {q="0.75", start="negative"} -0.15 + {q="1.0", start="positive"} 1 + {q="1.0", start="negative"} -0.1 + +# Break label set uniqueness. + +eval instant at 50m histogram_quantiles(testhistogram3, "start", 0, 0.25, 0.5, 0.75, 1) + expect fail + +eval instant at 50m histogram_quantiles(testhistogram3_bucket, "start", 0, 0.25, 0.5, 0.75, 1) + expect fail + # Quantile too low. eval instant at 50m histogram_quantile(-0.1, testhistogram) @@ -610,6 +644,16 @@ eval instant at 50m histogram_quantile(-0.1, testhistogram_bucket) {start="positive"} -Inf {start="negative"} -Inf +eval instant at 50m histogram_quantiles(testhistogram, "q", -0.1) + expect warn + {q="-0.1", start="positive"} -Inf + {q="-0.1", start="negative"} -Inf + +eval instant at 50m histogram_quantiles(testhistogram_bucket, "q", -0.1) + expect warn + {q="-0.1", start="positive"} -Inf + {q="-0.1", start="negative"} -Inf + # Quantile too high. eval instant at 50m histogram_quantile(1.01, testhistogram) @@ -622,6 +666,16 @@ eval instant at 50m histogram_quantile(1.01, testhistogram_bucket) {start="positive"} +Inf {start="negative"} +Inf +eval instant at 50m histogram_quantiles(testhistogram, "q", 1.01) + expect warn + {q="1.01", start="positive"} +Inf + {q="1.01", start="negative"} +Inf + +eval instant at 50m histogram_quantiles(testhistogram_bucket, "q", 1.01) + expect warn + {q="1.01", start="positive"} +Inf + {q="1.01", start="negative"} +Inf + # Quantile invalid. eval instant at 50m histogram_quantile(NaN, testhistogram) @@ -634,9 +688,22 @@ eval instant at 50m histogram_quantile(NaN, testhistogram_bucket) {start="positive"} NaN {start="negative"} NaN +eval instant at 50m histogram_quantiles(testhistogram, "q", NaN) + expect warn + {q="NaN", start="positive"} NaN + {q="NaN", start="negative"} NaN + +eval instant at 50m histogram_quantiles(testhistogram_bucket, "q", NaN) + expect warn + {q="NaN", start="positive"} NaN + {q="NaN", start="negative"} NaN + eval instant at 50m histogram_quantile(NaN, non_existent) expect warn msg: PromQL warning: quantile value should be between 0 and 1, got NaN +eval instant at 50m histogram_quantiles(non_existent, "q", NaN) + expect warn msg: PromQL warning: quantile value should be between 0 and 1, got NaN + # Quantile value in lowest bucket. eval instant at 50m histogram_quantile(0, testhistogram) @@ -967,6 +1034,12 @@ eval instant at 50m histogram_quantile(0.99, nonmonotonic_bucket) expect info {} 979.75 +eval instant at 50m histogram_quantiles(nonmonotonic_bucket, "q", 0.01, 0.5, 0.99) + expect info + {q="0.01"} 0.0045 + {q="0.5"} 8.5 + {q="0.99"} 979.75 + # Buckets with different representations of the same upper bound. eval instant at 50m histogram_quantile(0.5, rate(mixed_bucket[10m])) {instance="ins1", job="job1"} 0.15 @@ -1002,9 +1075,15 @@ load_with_nhcb 5m eval instant at 50m histogram_quantile(0.99, {__name__=~"request_duration_seconds\\d*_bucket"}) expect fail +eval instant at 50m histogram_quantiles({__name__=~"request_duration_seconds\\d*_bucket"}, "q", 0.99) + expect fail + eval instant at 50m histogram_quantile(0.99, {__name__=~"request_duration_seconds\\d*"}) expect fail +eval instant at 50m histogram_quantiles({__name__=~"request_duration_seconds\\d*"}, "q", 0.99) + expect fail + # Histogram with constant buckets. load_with_nhcb 1m const_histogram_bucket{le="0.0"} 1 1 1 1 1 @@ -1066,7 +1145,7 @@ eval instant at 10m histogram_sum(increase(histogram_with_reset[15m])) clear -# Test histogram_quantile and histogram_fraction with conflicting classic and native histograms. +# Test histogram_quantile(s) and histogram_fraction with conflicting classic and native histograms. load 1m series{host="a"} {{schema:0 sum:5 count:4 buckets:[9 2 1]}} series{host="a", le="0.1"} 2 @@ -1081,6 +1160,11 @@ eval instant at 0 histogram_quantile(0.8, series) expect warn msg: PromQL warning: vector contains a mix of classic and native histograms for metric name "series" # Should return no results. +eval instant at 0 histogram_quantiles(series, "q", 0.1, 0.2) + expect no_info + expect warn msg: PromQL warning: vector contains a mix of classic and native histograms for metric name "series" + # Should return no results. + eval instant at 0 histogram_fraction(-Inf, 1, series) expect no_info expect warn msg: PromQL warning: vector contains a mix of classic and native histograms for metric name "series" diff --git a/promql/promqltest/testdata/native_histograms.test b/promql/promqltest/testdata/native_histograms.test index 3b497e5ff4..40789b295a 100644 --- a/promql/promqltest/testdata/native_histograms.test +++ b/promql/promqltest/testdata/native_histograms.test @@ -55,6 +55,10 @@ eval instant at 1m histogram_quantile(0.5, single_histogram) expect no_info {} 1.414213562373095 +eval instant at 1m histogram_quantiles(single_histogram, "q", 0.5) + expect no_info + {q="0.5"} 1.414213562373095 + clear # Repeat the same histogram 10 times. @@ -1605,6 +1609,11 @@ eval instant at 1m histogram_quantile(0.81, histogram_nan) {case="100% NaNs"} NaN {case="20% NaNs"} NaN +eval instant at 1m histogram_quantiles(histogram_nan, "q", 0.81) + expect info msg: PromQL info: input to histogram_quantile has NaN observations, result is NaN for metric name "histogram_nan" + {case="100% NaNs", q="0.81"} NaN + {case="20% NaNs", q="0.81"} NaN + eval instant at 1m histogram_quantile(0.8, histogram_nan{case="100% NaNs"}) expect info msg: PromQL info: input to histogram_quantile has NaN observations, result is NaN for metric name "histogram_nan" {case="100% NaNs"} NaN @@ -1891,6 +1900,9 @@ eval instant at 1m histogram_quantile(0.5, myHistogram2) eval instant at 1m histogram_quantile(0.5, mixedHistogram) expect warn msg: PromQL warning: vector contains a mix of classic and native histograms for metric name "mixedHistogram" +eval instant at 1m histogram_quantiles(mixedHistogram, "q", 0.5) + expect warn msg: PromQL warning: vector contains a mix of classic and native histograms for metric name "mixedHistogram" + clear # A counter reset only in a bucket. Sub-queries still need to detect @@ -1960,6 +1972,9 @@ eval instant at 1m histogram_count(histogram unless histogram_quantile(0.5, hist eval instant at 1m histogram_quantile(0.5, histogram unless histogram_count(histogram) == 0) {} 3.1748021039363987 +eval instant at 1m histogram_quantiles(histogram unless histogram_count(histogram) == 0, "q", 0.5) + {q="0.5"} 3.1748021039363987 + clear # Regression test for: diff --git a/web/ui/mantine-ui/src/promql/functionDocs.tsx b/web/ui/mantine-ui/src/promql/functionDocs.tsx index 4cc70a39e6..c7f744ba6f 100644 --- a/web/ui/mantine-ui/src/promql/functionDocs.tsx +++ b/web/ui/mantine-ui/src/promql/functionDocs.tsx @@ -1543,6 +1543,33 @@ const funcDocs: Record = {

), + histogram_quantiles: ( + <> +

+ + This function has to be enabled via the{" "} + feature flag + --enable-feature=promql-experimental-functions. + +

+ +

+ histogram_quantiles(v instant-vector, quantile_label string, φ_1 scalar, φ_2 scalar, ...){" "} + calculates multiple (between 1 and 10) φ-quantiles (0 ≤ φ ≤ 1) from a{" "} + classic histogram or from a native + histogram. Quantile calculation works the same way as in histogram_quantile(). The second argument + (a string) specifies the label name that is used to identify different quantiles in the query result. +

+ +
+        
+          histogram_quantiles(sum(rate(foo[1m])), "quantile", 0.9, 0.99) # => {"{"}quantile="0.9"
+          {"}"} 123
+          {"{"}quantile="0.99"{"}"} 128
+        
+      
+ + ), histogram_stddev: ( <>

diff --git a/web/ui/mantine-ui/src/promql/functionSignatures.ts b/web/ui/mantine-ui/src/promql/functionSignatures.ts index da21a2d4aa..837a271dce 100644 --- a/web/ui/mantine-ui/src/promql/functionSignatures.ts +++ b/web/ui/mantine-ui/src/promql/functionSignatures.ts @@ -69,6 +69,12 @@ export const functionSignatures: Record = { variadic: 0, returnType: valueType.vector, }, + histogram_quantiles: { + name: "histogram_quantiles", + argTypes: [valueType.vector, valueType.string, valueType.scalar, valueType.scalar], + variadic: 9, + returnType: valueType.vector, + }, histogram_stddev: { name: "histogram_stddev", argTypes: [valueType.vector], diff --git a/web/ui/module/codemirror-promql/src/complete/promql.terms.ts b/web/ui/module/codemirror-promql/src/complete/promql.terms.ts index 3670fffff7..68d7b06553 100644 --- a/web/ui/module/codemirror-promql/src/complete/promql.terms.ts +++ b/web/ui/module/codemirror-promql/src/complete/promql.terms.ts @@ -243,6 +243,12 @@ export const functionIdentifierTerms = [ info: 'Calculate quantiles from native histograms and from conventional histogram buckets', type: 'function', }, + { + label: 'histogram_quantiles', + detail: 'function', + info: 'Calculate multiple quantiles from native histograms and from conventional histogram buckets', + type: 'function', + }, { label: 'histogram_sum', detail: 'function', diff --git a/web/ui/module/codemirror-promql/src/types/function.ts b/web/ui/module/codemirror-promql/src/types/function.ts index cfbf3524b5..cc1c0524fb 100644 --- a/web/ui/module/codemirror-promql/src/types/function.ts +++ b/web/ui/module/codemirror-promql/src/types/function.ts @@ -44,6 +44,7 @@ import { HistogramCount, HistogramFraction, HistogramQuantile, + HistogramQuantiles, HistogramStdDev, HistogramStdVar, HistogramSum, @@ -306,6 +307,12 @@ const promqlFunctions: { [key: number]: PromQLFunction } = { variadic: 0, returnType: ValueType.vector, }, + [HistogramQuantiles]: { + name: 'histogram_quantiles', + argTypes: [ValueType.vector, ValueType.string, ValueType.scalar, ValueType.scalar], + variadic: 10, + returnType: ValueType.vector, + }, [HistogramStdDev]: { name: 'histogram_stddev', argTypes: [ValueType.vector], diff --git a/web/ui/module/lezer-promql/src/promql.grammar b/web/ui/module/lezer-promql/src/promql.grammar index 9308ad01be..e4308186bb 100644 --- a/web/ui/module/lezer-promql/src/promql.grammar +++ b/web/ui/module/lezer-promql/src/promql.grammar @@ -167,6 +167,7 @@ FunctionIdentifier { HistogramCount | HistogramFraction | HistogramQuantile | + HistogramQuantiles | HistogramStdDev | HistogramStdVar | HistogramSum | @@ -426,6 +427,7 @@ NumberDurationLiteralInDurationContext { HistogramCount { condFn<"histogram_count"> } HistogramFraction { condFn<"histogram_fraction"> } HistogramQuantile { condFn<"histogram_quantile"> } + HistogramQuantiles { condFn<"histogram_quantiles"> } HistogramStdDev { condFn<"histogram_stddev"> } HistogramStdVar { condFn<"histogram_stdvar"> } HistogramSum { condFn<"histogram_sum"> }