mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-18 18:25:24 -05:00
tsdb: Optimize LabelValues for sparse intersections (Fixes #14551)
Signed-off-by: Divyansh Mishra <divyanshmishra@Divyanshs-MacBook-Air-3.local>
This commit is contained in:
parent
3155c95c1f
commit
fcb68060cb
3 changed files with 95 additions and 11 deletions
|
|
@ -956,7 +956,7 @@ func FindIntersectingPostings(p Postings, candidates []Postings) (indexes []int,
|
|||
}
|
||||
if p.At() == h.at() {
|
||||
indexes = append(indexes, h.popIndex())
|
||||
} else if err := h.next(); err != nil {
|
||||
} else if err := h.seekHead(p.At()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
|
@ -999,20 +999,18 @@ func (h *postingsWithIndexHeap) popIndex() int {
|
|||
// at provides the storage.SeriesRef where root Postings is pointing at this moment.
|
||||
func (h postingsWithIndexHeap) at() storage.SeriesRef { return h[0].p.At() }
|
||||
|
||||
// next performs the Postings.Next() operation on the root of the heap, performing the related operation on the heap
|
||||
// and conveniently returning the result of calling Postings.Err() if the result of calling Next() was false.
|
||||
// If Next() succeeds, heap is fixed to move the root to its new position, according to its Postings.At() value.
|
||||
// If Next() returns fails and there's no error reported by Postings.Err(), then root is marked as removed and heap is fixed.
|
||||
func (h *postingsWithIndexHeap) next() error {
|
||||
// seekHead performs the Postings.Seek() operation on the root of the heap.
|
||||
// If the root is exhausted or fails, it is removed from the heap.
|
||||
func (h *postingsWithIndexHeap) seekHead(val storage.SeriesRef) error {
|
||||
pi := (*h)[0]
|
||||
next := pi.p.Next()
|
||||
next := pi.p.Seek(val)
|
||||
if next {
|
||||
heap.Fix(h, 0)
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := pi.p.Err(); err != nil {
|
||||
return fmt.Errorf("postings %d: %w", pi.index, err)
|
||||
return fmt.Errorf("seek postings %d: %w", pi.index, err)
|
||||
}
|
||||
h.popIndex()
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -1192,7 +1192,7 @@ func (p *postingsFailingAfterNthCall) Err() error {
|
|||
}
|
||||
|
||||
func TestPostingsWithIndexHeap(t *testing.T) {
|
||||
t.Run("iterate", func(t *testing.T) {
|
||||
t.Run("seekHead", func(t *testing.T) {
|
||||
h := postingsWithIndexHeap{
|
||||
{index: 0, p: NewListPostings([]storage.SeriesRef{10, 20, 30})},
|
||||
{index: 1, p: NewListPostings([]storage.SeriesRef{1, 5})},
|
||||
|
|
@ -1205,7 +1205,7 @@ func TestPostingsWithIndexHeap(t *testing.T) {
|
|||
|
||||
for _, expected := range []storage.SeriesRef{1, 5, 10, 20, 25, 30, 50} {
|
||||
require.Equal(t, expected, h.at())
|
||||
require.NoError(t, h.next())
|
||||
require.NoError(t, h.seekHead(h.at()+1))
|
||||
}
|
||||
require.True(t, h.empty())
|
||||
})
|
||||
|
|
@ -1223,7 +1223,7 @@ func TestPostingsWithIndexHeap(t *testing.T) {
|
|||
|
||||
for _, expected := range []storage.SeriesRef{1, 5, 10, 20} {
|
||||
require.Equal(t, expected, h.at())
|
||||
require.NoError(t, h.next())
|
||||
require.NoError(t, h.seekHead(h.at()+1))
|
||||
}
|
||||
require.Equal(t, storage.SeriesRef(25), h.at())
|
||||
node := heap.Pop(&h).(postingsWithIndex)
|
||||
|
|
|
|||
86
tsdb/label_values_bench_test.go
Normal file
86
tsdb/label_values_bench_test.go
Normal file
|
|
@ -0,0 +1,86 @@
|
|||
// Copyright The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tsdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||
)
|
||||
|
||||
// BenchmarkLabelValues_SlowPath benchmarks the performance of LabelValues when the matcher
|
||||
// is far ahead of the candidate posting list. This reproduces the performance regression
|
||||
// described in #14551 where dense candidates caused O(N) iteration instead of O(log N) seeking.
|
||||
func BenchmarkLabelValues_SlowPath(b *testing.B) {
|
||||
// Create a head with some data.
|
||||
opts := DefaultHeadOptions()
|
||||
opts.ChunkDirRoot = b.TempDir()
|
||||
h, err := NewHead(nil, nil, nil, nil, opts, nil)
|
||||
require.NoError(b, err)
|
||||
defer h.Close()
|
||||
|
||||
app := h.Appender(context.Background())
|
||||
// 1. Create a large number of series for a "candidate" label (e.g. "job").
|
||||
// We want these to NOT match the target matcher, but be candidates for a different label.
|
||||
// We use "job=api" and "instance=..."
|
||||
// We want the interaction to be:
|
||||
// LabelValues("instance", "job"="api")
|
||||
// "job"="api" will have 1 series at the END.
|
||||
// "instance" will have 100k series.
|
||||
|
||||
// Actually, let's stick to the reproduction case:
|
||||
// distinct values for "val1".
|
||||
// "b"="1" matcher.
|
||||
|
||||
// Create 100k series with the same label value ("common") but without the matcher label.
|
||||
// This results in a single large posting list for that value, simulating a dense candidate.
|
||||
for i := range 100000 {
|
||||
_, err := app.Append(0, labels.FromStrings("val1", "common", "extra", strconv.Itoa(i)), time.Now().UnixMilli(), 1)
|
||||
require.NoError(b, err)
|
||||
}
|
||||
|
||||
// Create 1 series that matches the label "b=1", with a series ID greater than all previous ones.
|
||||
// This forces the intersection to skip over all 100k previous candidates.
|
||||
_, err = app.Append(0, labels.FromStrings("val1", "common", "b", "1"), time.Now().UnixMilli(), 1)
|
||||
require.NoError(b, err)
|
||||
|
||||
require.NoError(b, app.Commit())
|
||||
|
||||
ctx := context.Background()
|
||||
matcher := labels.MustNewMatcher(labels.MatchEqual, "b", "1")
|
||||
|
||||
// Use the correct method to access label values.
|
||||
idx, err := h.Index()
|
||||
require.NoError(b, err)
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
|
||||
for b.Loop() {
|
||||
// "val1"="common" has 100k+1 postings.
|
||||
// "b=1" has 1 posting (the last one).
|
||||
vals, err := idx.LabelValues(ctx, "val1", nil, matcher)
|
||||
require.NoError(b, err)
|
||||
require.Equal(b, []string{"common"}, vals)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure wlog/wal needed for NewHead.
|
||||
var _ = wlog.WL{}
|
||||
Loading…
Reference in a new issue