diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index 159f6416e2..937d1287b8 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -289,41 +289,67 @@ func (p *MemPostings) EnsureOrder(numberOfConcurrentProcesses int) { // Delete removes all ids in the given map from the postings lists. func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}) { - var keys, vals []string + // We will take an optimistic read lock for the entire method, + // and only lock for writing when we actually find something to delete. + // + // Each SeriesRef can appear in several Postings. + // To change each one, we need to know the label name and value that it is indexed under. + // We iterate over all label names, then for each name all values, + // and look for individual series to be deleted. + p.mtx.RLock() + defer p.mtx.RUnlock() // Collect all keys relevant for deletion once. New keys added afterwards // can by definition not be affected by any of the given deletes. - p.mtx.RLock() + keys := make([]string, 0, len(p.m)) + maxVals := 0 for n := range p.m { keys = append(keys, n) + if len(p.m[n]) > maxVals { + maxVals = len(p.m[n]) + } } - p.mtx.RUnlock() + vals := make([]string, 0, maxVals) for _, n := range keys { - p.mtx.RLock() + // Copy the values and iterate the copy: if we unlock in the loop below, + // another goroutine might modify the map while we are part-way through it. vals = vals[:0] for v := range p.m[n] { vals = append(vals, v) } - p.mtx.RUnlock() // For each posting we first analyse whether the postings list is affected by the deletes. - // If yes, we actually reallocate a new postings list. - for _, l := range vals { - // Only lock for processing one postings list so we don't block reads for too long. - p.mtx.Lock() - + // If no, we remove the label value from the vals list. + // This way we only need to Lock once later. + for i := 0; i < len(vals); { found := false - for _, id := range p.m[n][l] { + refs := p.m[n][vals[i]] + for _, id := range refs { if _, ok := deleted[id]; ok { + i++ found = true break } } + if !found { - p.mtx.Unlock() - continue + // Didn't match, bring the last value to this position, make the slice shorter and check again. + // The order of the slice doesn't matter as it comes from a map iteration. + vals[i], vals = vals[len(vals)-1], vals[:len(vals)-1] } + } + + // If no label values have deleted ids, just continue. + if len(vals) == 0 { + continue + } + + // The only vals left here are the ones that contain deleted ids. + // Now we take the write lock and remove the ids. + p.mtx.RUnlock() + p.mtx.Lock() + for _, l := range vals { repl := make([]storage.SeriesRef, 0, len(p.m[n][l])) for _, id := range p.m[n][l] { @@ -336,13 +362,14 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}) { } else { delete(p.m[n], l) } - p.mtx.Unlock() } - p.mtx.Lock() + + // Delete the key if we removed all values. if len(p.m[n]) == 0 { delete(p.m, n) } p.mtx.Unlock() + p.mtx.RLock() } } diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index 2cbc14ac64..562aef457e 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -23,6 +23,7 @@ import ( "sort" "strconv" "strings" + "sync" "testing" "github.com/grafana/regexp" @@ -1001,6 +1002,102 @@ func TestMemPostings_Delete(t *testing.T) { require.Empty(t, expanded, "expected empty postings, got %v", expanded) } +// BenchmarkMemPostings_Delete is quite heavy, so consider running it with +// -benchtime=10x or similar to get more stable and comparable results. +func BenchmarkMemPostings_Delete(b *testing.B) { + internedItoa := map[int]string{} + var mtx sync.RWMutex + itoa := func(i int) string { + mtx.RLock() + s, ok := internedItoa[i] + mtx.RUnlock() + if ok { + return s + } + mtx.Lock() + s = strconv.Itoa(i) + internedItoa[i] = s + mtx.Unlock() + return s + } + + const total = 1e6 + prepare := func() *MemPostings { + var ref storage.SeriesRef + next := func() storage.SeriesRef { + ref++ + return ref + } + + p := NewMemPostings() + nameValues := make([]string, 0, 100) + for i := 0; i < total; i++ { + nameValues = nameValues[:0] + + // A thousand labels like lbl_x_of_1000, each with total/1000 values + thousand := "lbl_" + itoa(i%1000) + "_of_1000" + nameValues = append(nameValues, thousand, itoa(i/1000)) + // A hundred labels like lbl_x_of_100, each with total/100 values. + hundred := "lbl_" + itoa(i%100) + "_of_100" + nameValues = append(nameValues, hundred, itoa(i/100)) + + if i < 100 { + ten := "lbl_" + itoa(i%10) + "_of_10" + nameValues = append(nameValues, ten, itoa(i%10)) + } + + p.Add(next(), labels.FromStrings(append(nameValues, "first", "a", "second", "a", "third", "a")...)) + } + return p + } + + for _, refs := range []int{1, 100, 10_000} { + b.Run(fmt.Sprintf("refs=%d", refs), func(b *testing.B) { + for _, reads := range []int{0, 1, 10} { + b.Run(fmt.Sprintf("readers=%d", reads), func(b *testing.B) { + if b.N > total/refs { + // Just to make sure that benchmark still makes sense. + panic("benchmark not prepared") + } + + p := prepare() + stop := make(chan struct{}) + wg := sync.WaitGroup{} + for i := 0; i < reads; i++ { + wg.Add(1) + go func(i int) { + lbl := "lbl_" + itoa(i) + "_of_100" + defer wg.Done() + for { + select { + case <-stop: + return + default: + // Get a random value of this label. + p.Get(lbl, itoa(rand.Intn(10000))).Next() + } + } + }(i) + } + b.Cleanup(func() { + close(stop) + wg.Wait() + }) + + b.ResetTimer() + for n := 0; n < b.N; n++ { + deleted := map[storage.SeriesRef]struct{}{} + for i := 0; i < refs; i++ { + deleted[storage.SeriesRef(n*refs+i)] = struct{}{} + } + p.Delete(deleted) + } + }) + } + }) + } +} + func TestFindIntersectingPostings(t *testing.T) { t.Run("multiple intersections", func(t *testing.T) { p := NewListPostings([]storage.SeriesRef{10, 15, 20, 25, 30, 35, 40, 45, 50})