mirror of
https://github.com/prometheus/prometheus.git
synced 2026-05-28 04:02:21 -04:00
Refactor: TSDB: small improvement to Postings (#17661)
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (push) Waiting to run
CI / Build Prometheus for all architectures (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (push) Waiting to run
CI / Build Prometheus for all architectures (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
* [TESTS] TSDB: Check that ListPostings are sorted `newListPostings` is a convenient place to do this; move it into the test file because it's not needed for anything else. Also simplify the existing `SliceIsSorted` check. Signed-off-by: Bryan Boreham <bjboreham@gmail.com> * [COMMENTS] TSDB: Document that Postings are ordered. The description of `Seek()` implies they are, but it's better to be explicit. Signed-off-by: Bryan Boreham <bjboreham@gmail.com> * [REFACTOR] Unexport ListPostings type It's only used within the one package, and it would be surprising if some downstream code did rely on this type, given all of its members are unexported. Signed-off-by: Bryan Boreham <bjboreham@gmail.com> --------- Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
parent
952efe77ad
commit
583bc01cc9
2 changed files with 59 additions and 55 deletions
|
|
@ -391,7 +391,7 @@ func (p *MemPostings) Iter(f func(labels.Label, Postings) error) error {
|
|||
|
||||
for n, e := range p.m {
|
||||
for v, p := range e {
|
||||
if err := f(labels.Label{Name: n, Value: v}, newListPostings(p...)); err != nil {
|
||||
if err := f(labels.Label{Name: n, Value: v}, NewListPostings(p)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
@ -478,8 +478,8 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string,
|
|||
}
|
||||
|
||||
// Now `vals` only contains the values that matched, get their postings.
|
||||
its := make([]*ListPostings, 0, len(vals))
|
||||
lps := make([]ListPostings, len(vals))
|
||||
its := make([]*listPostings, 0, len(vals))
|
||||
lps := make([]listPostings, len(vals))
|
||||
p.mtx.RLock()
|
||||
e := p.m[name]
|
||||
for i, v := range vals {
|
||||
|
|
@ -488,7 +488,7 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string,
|
|||
// If we didn't let the mutex go, we'd have these postings here, but they would be pointing nowhere
|
||||
// because there would be a `MemPostings.Delete()` call waiting for the lock to delete these labels,
|
||||
// because the series were deleted already.
|
||||
lps[i] = ListPostings{list: refs}
|
||||
lps[i] = listPostings{list: refs}
|
||||
its = append(its, &lps[i])
|
||||
}
|
||||
}
|
||||
|
|
@ -500,13 +500,13 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string,
|
|||
|
||||
// Postings returns a postings iterator for the given label values.
|
||||
func (p *MemPostings) Postings(ctx context.Context, name string, values ...string) Postings {
|
||||
res := make([]*ListPostings, 0, len(values))
|
||||
lps := make([]ListPostings, len(values))
|
||||
res := make([]*listPostings, 0, len(values))
|
||||
lps := make([]listPostings, len(values))
|
||||
p.mtx.RLock()
|
||||
postingsMapForName := p.m[name]
|
||||
for i, value := range values {
|
||||
if lp := postingsMapForName[value]; lp != nil {
|
||||
lps[i] = ListPostings{list: lp}
|
||||
lps[i] = listPostings{list: lp}
|
||||
res = append(res, &lps[i])
|
||||
}
|
||||
}
|
||||
|
|
@ -518,12 +518,12 @@ func (p *MemPostings) PostingsForAllLabelValues(ctx context.Context, name string
|
|||
p.mtx.RLock()
|
||||
|
||||
e := p.m[name]
|
||||
its := make([]*ListPostings, 0, len(e))
|
||||
lps := make([]ListPostings, len(e))
|
||||
its := make([]*listPostings, 0, len(e))
|
||||
lps := make([]listPostings, len(e))
|
||||
i := 0
|
||||
for _, refs := range e {
|
||||
if len(refs) > 0 {
|
||||
lps[i] = ListPostings{list: refs}
|
||||
lps[i] = listPostings{list: refs}
|
||||
its = append(its, &lps[i])
|
||||
}
|
||||
i++
|
||||
|
|
@ -542,7 +542,7 @@ func ExpandPostings(p Postings) (res []storage.SeriesRef, err error) {
|
|||
return res, p.Err()
|
||||
}
|
||||
|
||||
// Postings provides iterative access over a postings list.
|
||||
// Postings provides iterative access over an ordered list of SeriesRef.
|
||||
type Postings interface {
|
||||
// Next advances the iterator and returns true if another value was found.
|
||||
Next() bool
|
||||
|
|
@ -827,25 +827,23 @@ func (rp *removedPostings) Err() error {
|
|||
return rp.remove.Err()
|
||||
}
|
||||
|
||||
// ListPostings implements the Postings interface over a plain list.
|
||||
type ListPostings struct {
|
||||
// listPostings implements the Postings interface over a plain list.
|
||||
type listPostings struct {
|
||||
list []storage.SeriesRef
|
||||
cur storage.SeriesRef
|
||||
}
|
||||
|
||||
// NewListPostings creates a Postings from the supplied SeriesRefs, which must be in order.
|
||||
// The list slice passed in is retained.
|
||||
func NewListPostings(list []storage.SeriesRef) Postings {
|
||||
return newListPostings(list...)
|
||||
return &listPostings{list: list}
|
||||
}
|
||||
|
||||
func newListPostings(list ...storage.SeriesRef) *ListPostings {
|
||||
return &ListPostings{list: list}
|
||||
}
|
||||
|
||||
func (it *ListPostings) At() storage.SeriesRef {
|
||||
func (it *listPostings) At() storage.SeriesRef {
|
||||
return it.cur
|
||||
}
|
||||
|
||||
func (it *ListPostings) Next() bool {
|
||||
func (it *listPostings) Next() bool {
|
||||
if len(it.list) > 0 {
|
||||
it.cur = it.list[0]
|
||||
it.list = it.list[1:]
|
||||
|
|
@ -855,7 +853,7 @@ func (it *ListPostings) Next() bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (it *ListPostings) Seek(x storage.SeriesRef) bool {
|
||||
func (it *listPostings) Seek(x storage.SeriesRef) bool {
|
||||
// If the current value satisfies, then return.
|
||||
if it.cur >= x {
|
||||
return true
|
||||
|
|
@ -877,12 +875,12 @@ func (it *ListPostings) Seek(x storage.SeriesRef) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func (*ListPostings) Err() error {
|
||||
func (*listPostings) Err() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Len returns the remaining number of postings in the list.
|
||||
func (it *ListPostings) Len() int {
|
||||
func (it *listPostings) Len() int {
|
||||
return len(it.list)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"slices"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
|
@ -62,9 +63,7 @@ func TestMemPostings_ensureOrder(t *testing.T) {
|
|||
|
||||
for _, e := range p.m {
|
||||
for _, l := range e {
|
||||
ok := sort.SliceIsSorted(l, func(i, j int) bool {
|
||||
return l[i] < l[j]
|
||||
})
|
||||
ok := slices.IsSorted(l)
|
||||
require.True(t, ok, "postings list %v is not sorted", l)
|
||||
}
|
||||
}
|
||||
|
|
@ -285,9 +284,16 @@ func consumePostings(p Postings) error {
|
|||
return p.Err()
|
||||
}
|
||||
|
||||
func newListPostings(list ...storage.SeriesRef) *listPostings {
|
||||
if !slices.IsSorted(list) {
|
||||
panic("newListPostings: list is not sorted")
|
||||
}
|
||||
return &listPostings{list: list}
|
||||
}
|
||||
|
||||
// Create ListPostings for a benchmark, collecting the original sets of references
|
||||
// so they can be reset without additional memory allocations.
|
||||
func createPostings(lps *[]*ListPostings, refs *[][]storage.SeriesRef, params ...storage.SeriesRef) {
|
||||
func createPostings(lps *[]*listPostings, refs *[][]storage.SeriesRef, params ...storage.SeriesRef) {
|
||||
var temp []storage.SeriesRef
|
||||
for i := 0; i < len(params); i += 3 {
|
||||
for j := params[i]; j < params[i+1]; j += params[i+2] {
|
||||
|
|
@ -299,7 +305,7 @@ func createPostings(lps *[]*ListPostings, refs *[][]storage.SeriesRef, params ..
|
|||
}
|
||||
|
||||
// Reset the ListPostings to their original values each time round the benchmark loop.
|
||||
func resetPostings(its []Postings, lps []*ListPostings, refs [][]storage.SeriesRef) {
|
||||
func resetPostings(its []Postings, lps []*listPostings, refs [][]storage.SeriesRef) {
|
||||
for j := range refs {
|
||||
lps[j].list = refs[j]
|
||||
its[j] = lps[j]
|
||||
|
|
@ -308,7 +314,7 @@ func resetPostings(its []Postings, lps []*ListPostings, refs [][]storage.SeriesR
|
|||
|
||||
func BenchmarkIntersect(t *testing.B) {
|
||||
t.Run("LongPostings1", func(bench *testing.B) {
|
||||
var lps []*ListPostings
|
||||
var lps []*listPostings
|
||||
var refs [][]storage.SeriesRef
|
||||
createPostings(&lps, &refs, 0, 10000000, 2)
|
||||
createPostings(&lps, &refs, 5000000, 5000100, 4, 5090000, 5090600, 4)
|
||||
|
|
@ -327,7 +333,7 @@ func BenchmarkIntersect(t *testing.B) {
|
|||
})
|
||||
|
||||
t.Run("LongPostings2", func(bench *testing.B) {
|
||||
var lps []*ListPostings
|
||||
var lps []*listPostings
|
||||
var refs [][]storage.SeriesRef
|
||||
createPostings(&lps, &refs, 0, 12500000, 1)
|
||||
createPostings(&lps, &refs, 7500000, 12500000, 1)
|
||||
|
|
@ -346,7 +352,7 @@ func BenchmarkIntersect(t *testing.B) {
|
|||
})
|
||||
|
||||
t.Run("ManyPostings", func(bench *testing.B) {
|
||||
var lps []*ListPostings
|
||||
var lps []*listPostings
|
||||
var refs [][]storage.SeriesRef
|
||||
for range 100 {
|
||||
createPostings(&lps, &refs, 1, 100, 1)
|
||||
|
|
@ -365,7 +371,7 @@ func BenchmarkIntersect(t *testing.B) {
|
|||
}
|
||||
|
||||
func BenchmarkMerge(t *testing.B) {
|
||||
var lps []*ListPostings
|
||||
var lps []*listPostings
|
||||
var refs [][]storage.SeriesRef
|
||||
|
||||
// Create 100000 matchers(k=100000), making sure all memory allocation is done before starting the loop.
|
||||
|
|
@ -378,7 +384,7 @@ func BenchmarkMerge(t *testing.B) {
|
|||
refs = append(refs, temp)
|
||||
}
|
||||
|
||||
its := make([]*ListPostings, len(refs))
|
||||
its := make([]*listPostings, len(refs))
|
||||
for _, nSeries := range []int{1, 10, 10000, 100000} {
|
||||
t.Run(strconv.Itoa(nSeries), func(bench *testing.B) {
|
||||
ctx := context.Background()
|
||||
|
|
@ -1229,78 +1235,78 @@ func TestPostingsWithIndexHeap(t *testing.T) {
|
|||
func TestListPostings(t *testing.T) {
|
||||
t.Run("empty list", func(t *testing.T) {
|
||||
p := NewListPostings(nil)
|
||||
require.Equal(t, 0, p.(*ListPostings).Len())
|
||||
require.Equal(t, 0, p.(*listPostings).Len())
|
||||
require.False(t, p.Next())
|
||||
require.False(t, p.Seek(10))
|
||||
require.False(t, p.Next())
|
||||
require.NoError(t, p.Err())
|
||||
require.Equal(t, 0, p.(*ListPostings).Len())
|
||||
require.Equal(t, 0, p.(*listPostings).Len())
|
||||
})
|
||||
|
||||
t.Run("one posting", func(t *testing.T) {
|
||||
t.Run("next", func(t *testing.T) {
|
||||
p := NewListPostings([]storage.SeriesRef{10})
|
||||
require.Equal(t, 1, p.(*ListPostings).Len())
|
||||
require.Equal(t, 1, p.(*listPostings).Len())
|
||||
require.True(t, p.Next())
|
||||
require.Equal(t, storage.SeriesRef(10), p.At())
|
||||
require.False(t, p.Next())
|
||||
require.NoError(t, p.Err())
|
||||
require.Equal(t, 0, p.(*ListPostings).Len())
|
||||
require.Equal(t, 0, p.(*listPostings).Len())
|
||||
})
|
||||
t.Run("seek less", func(t *testing.T) {
|
||||
p := NewListPostings([]storage.SeriesRef{10})
|
||||
require.Equal(t, 1, p.(*ListPostings).Len())
|
||||
require.Equal(t, 1, p.(*listPostings).Len())
|
||||
require.True(t, p.Seek(5))
|
||||
require.Equal(t, storage.SeriesRef(10), p.At())
|
||||
require.True(t, p.Seek(5))
|
||||
require.Equal(t, storage.SeriesRef(10), p.At())
|
||||
require.False(t, p.Next())
|
||||
require.NoError(t, p.Err())
|
||||
require.Equal(t, 0, p.(*ListPostings).Len())
|
||||
require.Equal(t, 0, p.(*listPostings).Len())
|
||||
})
|
||||
t.Run("seek equal", func(t *testing.T) {
|
||||
p := NewListPostings([]storage.SeriesRef{10})
|
||||
require.Equal(t, 1, p.(*ListPostings).Len())
|
||||
require.Equal(t, 1, p.(*listPostings).Len())
|
||||
require.True(t, p.Seek(10))
|
||||
require.Equal(t, storage.SeriesRef(10), p.At())
|
||||
require.False(t, p.Next())
|
||||
require.NoError(t, p.Err())
|
||||
require.Equal(t, 0, p.(*ListPostings).Len())
|
||||
require.Equal(t, 0, p.(*listPostings).Len())
|
||||
})
|
||||
t.Run("seek more", func(t *testing.T) {
|
||||
p := NewListPostings([]storage.SeriesRef{10})
|
||||
require.Equal(t, 1, p.(*ListPostings).Len())
|
||||
require.Equal(t, 1, p.(*listPostings).Len())
|
||||
require.False(t, p.Seek(15))
|
||||
require.False(t, p.Next())
|
||||
require.NoError(t, p.Err())
|
||||
require.Equal(t, 0, p.(*ListPostings).Len())
|
||||
require.Equal(t, 0, p.(*listPostings).Len())
|
||||
})
|
||||
t.Run("seek after next", func(t *testing.T) {
|
||||
p := NewListPostings([]storage.SeriesRef{10})
|
||||
require.Equal(t, 1, p.(*ListPostings).Len())
|
||||
require.Equal(t, 1, p.(*listPostings).Len())
|
||||
require.True(t, p.Next())
|
||||
require.False(t, p.Seek(15))
|
||||
require.False(t, p.Next())
|
||||
require.NoError(t, p.Err())
|
||||
require.Equal(t, 0, p.(*ListPostings).Len())
|
||||
require.Equal(t, 0, p.(*listPostings).Len())
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("multiple postings", func(t *testing.T) {
|
||||
t.Run("next", func(t *testing.T) {
|
||||
p := NewListPostings([]storage.SeriesRef{10, 20})
|
||||
require.Equal(t, 2, p.(*ListPostings).Len())
|
||||
require.Equal(t, 2, p.(*listPostings).Len())
|
||||
require.True(t, p.Next())
|
||||
require.Equal(t, storage.SeriesRef(10), p.At())
|
||||
require.True(t, p.Next())
|
||||
require.Equal(t, storage.SeriesRef(20), p.At())
|
||||
require.False(t, p.Next())
|
||||
require.NoError(t, p.Err())
|
||||
require.Equal(t, 0, p.(*ListPostings).Len())
|
||||
require.Equal(t, 0, p.(*listPostings).Len())
|
||||
})
|
||||
t.Run("seek", func(t *testing.T) {
|
||||
p := NewListPostings([]storage.SeriesRef{10, 20})
|
||||
require.Equal(t, 2, p.(*ListPostings).Len())
|
||||
require.Equal(t, 2, p.(*listPostings).Len())
|
||||
require.True(t, p.Seek(5))
|
||||
require.Equal(t, storage.SeriesRef(10), p.At())
|
||||
require.True(t, p.Seek(5))
|
||||
|
|
@ -1315,30 +1321,30 @@ func TestListPostings(t *testing.T) {
|
|||
require.Equal(t, storage.SeriesRef(20), p.At())
|
||||
require.False(t, p.Next())
|
||||
require.NoError(t, p.Err())
|
||||
require.Equal(t, 0, p.(*ListPostings).Len())
|
||||
require.Equal(t, 0, p.(*listPostings).Len())
|
||||
})
|
||||
t.Run("seek lest than last", func(t *testing.T) {
|
||||
p := NewListPostings([]storage.SeriesRef{10, 20, 30, 40, 50})
|
||||
require.Equal(t, 5, p.(*ListPostings).Len())
|
||||
require.Equal(t, 5, p.(*listPostings).Len())
|
||||
require.True(t, p.Seek(45))
|
||||
require.Equal(t, storage.SeriesRef(50), p.At())
|
||||
require.False(t, p.Next())
|
||||
require.Equal(t, 0, p.(*ListPostings).Len())
|
||||
require.Equal(t, 0, p.(*listPostings).Len())
|
||||
})
|
||||
t.Run("seek exactly last", func(t *testing.T) {
|
||||
p := NewListPostings([]storage.SeriesRef{10, 20, 30, 40, 50})
|
||||
require.Equal(t, 5, p.(*ListPostings).Len())
|
||||
require.Equal(t, 5, p.(*listPostings).Len())
|
||||
require.True(t, p.Seek(50))
|
||||
require.Equal(t, storage.SeriesRef(50), p.At())
|
||||
require.False(t, p.Next())
|
||||
require.Equal(t, 0, p.(*ListPostings).Len())
|
||||
require.Equal(t, 0, p.(*listPostings).Len())
|
||||
})
|
||||
t.Run("seek more than last", func(t *testing.T) {
|
||||
p := NewListPostings([]storage.SeriesRef{10, 20, 30, 40, 50})
|
||||
require.Equal(t, 5, p.(*ListPostings).Len())
|
||||
require.Equal(t, 5, p.(*listPostings).Len())
|
||||
require.False(t, p.Seek(60))
|
||||
require.False(t, p.Next())
|
||||
require.Equal(t, 0, p.(*ListPostings).Len())
|
||||
require.Equal(t, 0, p.(*listPostings).Len())
|
||||
})
|
||||
})
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue