diff --git a/tsdb/block.go b/tsdb/block.go index ce75a56b27..8f8d5e0555 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -54,10 +54,6 @@ type IndexWriter interface { // The passed in values chained tuples of strings of the length of names. WriteLabelIndex(names []string, values []string) error - // WritePostings writes a postings list for a single label pair. - // The Postings here contain refs to the series that were added. - WritePostings(name, value string, it index.Postings) error - // Close writes any finalization and closes the resources associated with // the underlying writer. Close() error diff --git a/tsdb/compact.go b/tsdb/compact.go index da304ed12a..fc34b9bef9 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -727,11 +727,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } } - // We fully rebuild the postings list index from merged series. var ( - postings = index.NewMemPostings() - values = map[string]stringset{} - i = uint64(0) + values = map[string]stringset{} + ref = uint64(0) ) if err := indexw.AddSymbols(allSymbols); err != nil { @@ -822,7 +820,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, return errors.Wrap(err, "write chunks") } - if err := indexw.AddSeries(i, lset, mergedChks...); err != nil { + if err := indexw.AddSeries(ref, lset, mergedChks...); err != nil { return errors.Wrap(err, "add series") } @@ -846,9 +844,8 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } valset.set(l.Value) } - postings.Add(i, lset) - i++ + ref++ } if set.Err() != nil { return errors.Wrap(set.Err(), "iterate compaction set") @@ -866,11 +863,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } } - for _, l := range postings.SortedKeys() { - if err := indexw.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value)); err != nil { - return errors.Wrap(err, "write postings") - } - } return nil } diff --git a/tsdb/encoding/encoding.go b/tsdb/encoding/encoding.go index f2e1037567..422df7abb0 100644 --- a/tsdb/encoding/encoding.go +++ b/tsdb/encoding/encoding.go @@ -158,6 +158,14 @@ func NewDecbufUvarintAt(bs ByteSlice, off int, castagnoliTable *crc32.Table) Dec return dec } +// NewDecbufRaw returns a new decoding buffer of the given length. +func NewDecbufRaw(bs ByteSlice, length int) Decbuf { + if bs.Len() < length { + return Decbuf{E: ErrInvalidSize} + } + return Decbuf{B: bs.Range(0, length)} +} + func (d *Decbuf) Uvarint() int { return int(d.Uvarint64()) } func (d *Decbuf) Be32int() int { return int(d.Be32()) } func (d *Decbuf) Be64int64() int64 { return int64(d.Be64()) } @@ -260,6 +268,20 @@ func (d *Decbuf) Byte() byte { return x } +func (d *Decbuf) EatPadding() { + if d.E != nil { + return + } + for len(d.B) > 1 && d.B[0] == '\x00' { + d.B = d.B[1:] + } + if len(d.B) < 1 { + d.E = ErrInvalidSize + return + } + return +} + func (d *Decbuf) Err() error { return d.E } func (d *Decbuf) Len() int { return len(d.B) } func (d *Decbuf) Get() []byte { return d.B } diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 61b80427c6..1303f531d7 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -119,17 +119,18 @@ type Writer struct { stage indexWriterStage // Reusable memory. - buf1 encoding.Encbuf - buf2 encoding.Encbuf - uint32s []uint32 + buf1 encoding.Encbuf + buf2 encoding.Encbuf - symbols map[string]uint32 // symbol offsets - seriesOffsets map[uint64]uint64 // offsets of series - labelIndexes []labelIndexHashEntry // label index offsets - postings []postingsHashEntry // postings lists offsets + symbols map[string]uint32 // symbol offsets + reverseSymbols map[uint32]string + labelIndexes []labelIndexHashEntry // label index offsets + postings []postingsHashEntry // postings lists offsets + labelNames map[string]struct{} // label names // Hold last series to validate that clients insert new series in order. lastSeries labels.Labels + lastRef uint64 crc32 hash.Hash @@ -203,14 +204,14 @@ func NewWriter(fn string) (*Writer, error) { stage: idxStageNone, // Reusable memory. - buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, - buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, - uint32s: make([]uint32, 0, 1<<15), + buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, + buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, // Caches. - symbols: make(map[string]uint32, 1<<13), - seriesOffsets: make(map[uint64]uint64, 1<<16), - crc32: newCRC32(), + symbols: make(map[string]uint32, 1<<13), + reverseSymbols: make(map[uint32]string, 1<<13), + labelNames: make(map[string]struct{}, 1<<8), + crc32: newCRC32(), } if err := iw.writeMeta(); err != nil { return nil, err @@ -274,10 +275,12 @@ func (w *Writer) ensureStage(s indexWriterStage) error { case idxStageLabelIndex: w.toc.LabelIndices = w.pos - case idxStagePostings: - w.toc.Postings = w.pos - case idxStageDone: + w.toc.Postings = w.pos + if err := w.writePostings(); err != nil { + return err + } + w.toc.LabelIndicesTable = w.pos if err := w.writeLabelIndexesOffsetTable(); err != nil { return err @@ -312,8 +315,8 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta return errors.Errorf("out-of-order series added with label set %q", lset) } - if _, ok := w.seriesOffsets[ref]; ok { - return errors.Errorf("series with reference %d already added", ref) + if ref < w.lastRef && len(w.lastSeries) != 0 { + return errors.Errorf("series with reference greater than %d already added", ref) } // We add padding to 16 bytes to increase the addressable space we get through 4 byte // series references. @@ -324,7 +327,6 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta if w.pos%16 != 0 { return errors.Errorf("series write not 16-byte aligned at %d", w.pos) } - w.seriesOffsets[ref] = w.pos / 16 w.buf2.Reset() w.buf2.PutUvarint(len(lset)) @@ -335,6 +337,7 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta if !ok { return errors.Errorf("symbol entry for %q does not exist", l.Name) } + w.labelNames[l.Name] = struct{}{} w.buf2.PutUvarint32(index) index, ok = w.symbols[l.Value] @@ -374,6 +377,7 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta } w.lastSeries = append(w.lastSeries[:0], lset...) + w.lastRef = ref return nil } @@ -408,6 +412,7 @@ func (w *Writer) AddSymbols(sym map[string]struct{}) error { for index, s := range symbols { w.symbols[s] = uint32(index) + w.reverseSymbols[uint32(index)] = s w.buf1.Reset() w.buf1.PutUvarintStr(s) w.buf1.WriteToHash(w.crc32) @@ -590,11 +595,94 @@ func (w *Writer) writeTOC() error { return w.write(w.buf1.Get()) } -func (w *Writer) WritePostings(name, value string, it Postings) error { - if err := w.ensureStage(idxStagePostings); err != nil { - return errors.Wrap(err, "ensure stage") - } +func (w *Writer) writePostings() error { + names := make([]string, 0, len(w.labelNames)) + for n := range w.labelNames { + names = append(names, n) + } + sort.Strings(names) + + if err := w.fbuf.Flush(); err != nil { + return err + } + f, err := fileutil.OpenMmapFile(w.f.Name()) + if err != nil { + return err + } + defer f.Close() + + // Write out the special all index. + offsets := []uint32{} + d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices)) + d.B = d.B[w.toc.Series:] // dec.Skip not merged yet + for d.Len() > 0 { + d.EatPadding() + startPos := w.toc.LabelIndices - uint64(d.Len()) + if startPos%16 != 0 { + return errors.Errorf("series not 16-byte aligned at %d", startPos) + } + offsets = append(offsets, uint32(startPos/16)) + // Skip to next series. The 4 is for the CRC32. + skip := d.Uvarint() + 4 + d.B = d.B[skip:] + if err := d.Err(); err != nil { + return nil + } + } + w.writePosting("", "", offsets) + + for _, name := range names { + nameo := w.symbols[name] + postings := map[uint32][]uint32{} + + d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices)) + d.B = d.B[w.toc.Series:] // dec.Skip not merged yet + for d.Len() > 0 { + d.EatPadding() + startPos := w.toc.LabelIndices - uint64(d.Len()) + l := d.Uvarint() // Length of this series in bytes. + startLen := d.Len() + + // See if this label name is in the series. + numLabels := d.Uvarint() + for i := 0; i < numLabels; i++ { + lno := uint32(d.Uvarint()) + lvo := uint32(d.Uvarint()) + + if lno == nameo { + if _, ok := postings[lvo]; !ok { + postings[lvo] = []uint32{} + } + postings[lvo] = append(postings[lvo], uint32(startPos/16)) + break + } + } + // Skip to next series. The 4 is for the CRC32. + skip := l - (startLen - d.Len()) + 4 + d.B = d.B[skip:] + if err := d.Err(); err != nil { + return nil + } + } + + // Write out postings for this label name. + values := make([]uint32, 0, len(postings)) + for v := range postings { + values = append(values, v) + + } + // Symbol numbers are in order, so the strings will also be in order. + sort.Sort(uint32slice(values)) + for _, v := range values { + w.writePosting(name, w.reverseSymbols[v], postings[v]) + } + + } + return nil +} + +func (w *Writer) writePosting(name, value string, offs []uint32) error { // Align beginning to 4 bytes for more efficient postings list scans. if err := w.addPadding(4); err != nil { return err @@ -606,26 +694,6 @@ func (w *Writer) WritePostings(name, value string, it Postings) error { offset: w.pos, }) - // Order of the references in the postings list does not imply order - // of the series references within the persisted block they are mapped to. - // We have to sort the new references again. - refs := w.uint32s[:0] - - for it.Next() { - offset, ok := w.seriesOffsets[it.At()] - if !ok { - return errors.Errorf("%p series for reference %d not found", w, it.At()) - } - if offset > (1<<32)-1 { - return errors.Errorf("series offset %d exceeds 4 bytes", offset) - } - refs = append(refs, uint32(offset)) - } - if err := it.Err(); err != nil { - return err - } - sort.Sort(uint32slice(refs)) - startPos := w.pos // Leave 4 bytes of space for the length, which will be calculated later. if err := w.write([]byte("alen")); err != nil { @@ -634,21 +702,23 @@ func (w *Writer) WritePostings(name, value string, it Postings) error { w.crc32.Reset() w.buf1.Reset() - w.buf1.PutBE32int(len(refs)) + w.buf1.PutBE32int(len(offs)) w.buf1.WriteToHash(w.crc32) if err := w.write(w.buf1.Get()); err != nil { return err } - for _, r := range refs { + for _, off := range offs { + if off > (1<<32)-1 { + return errors.Errorf("series offset %d exceeds 4 bytes", off) + } w.buf1.Reset() - w.buf1.PutBE32(r) + w.buf1.PutBE32(off) w.buf1.WriteToHash(w.crc32) if err := w.write(w.buf1.Get()); err != nil { return err } } - w.uint32s = refs // Write out the length. w.buf1.Reset() diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index 95611d093e..4328b26c32 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -49,6 +49,7 @@ func newMockIndex() mockIndex { postings: make(map[labels.Label][]uint64), symbols: make(map[string]struct{}), } + ix.postings[allPostingsKey] = []uint64{} return ix } @@ -63,7 +64,12 @@ func (m mockIndex) AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) for _, lbl := range l { m.symbols[lbl.Name] = struct{}{} m.symbols[lbl.Value] = struct{}{} + if _, ok := m.postings[lbl]; !ok { + m.postings[lbl] = []uint64{} + } + m.postings[lbl] = append(m.postings[lbl], ref) } + m.postings[allPostingsKey] = append(m.postings[allPostingsKey], ref) s := series{l: l} // Actual chunk data is not stored in the index. @@ -86,19 +92,6 @@ func (m mockIndex) WriteLabelIndex(names []string, values []string) error { return nil } -func (m mockIndex) WritePostings(name, value string, it Postings) error { - l := labels.Label{Name: name, Value: value} - if _, ok := m.postings[l]; ok { - return errors.Errorf("postings for %s already added", l) - } - ep, err := ExpandPostings(it) - if err != nil { - return err - } - m.postings[l] = ep - return nil -} - func (m mockIndex) Close() error { return nil } @@ -116,7 +109,7 @@ func (m mockIndex) Postings(name string, values ...string) (Postings, error) { p := []Postings{} for _, value := range values { l := labels.Label{Name: name, Value: value} - p = append(p, NewListPostings(m.postings[l])) + p = append(p, m.SortedPostings(NewListPostings(m.postings[l]))) } return Merge(p...), nil } @@ -217,7 +210,8 @@ func TestIndexRW_Postings(t *testing.T) { testutil.Ok(t, iw.AddSeries(3, series[2])) testutil.Ok(t, iw.AddSeries(4, series[3])) - err = iw.WritePostings("a", "1", newListPostings(1, 2, 3, 4)) + err = iw.WriteLabelIndex([]string{"a"}, []string{"1"}) + err = iw.WriteLabelIndex([]string{"b"}, []string{"1", "2", "3", "4"}) testutil.Ok(t, err) testutil.Ok(t, iw.Close()) @@ -271,9 +265,7 @@ func TestPostingsMany(t *testing.T) { for i, s := range series { testutil.Ok(t, iw.AddSeries(uint64(i), s)) } - for i, s := range series { - testutil.Ok(t, iw.WritePostings("i", s.Get("i"), newListPostings(uint64(i)))) - } + err = iw.WriteLabelIndex([]string{"foo"}, []string{"bar"}) testutil.Ok(t, iw.Close()) ir, err := NewFileReader(fn) @@ -414,20 +406,6 @@ func TestPersistence_index_e2e(t *testing.T) { testutil.Ok(t, mi.WriteLabelIndex([]string{k}, vals)) } - all := make([]uint64, len(lbls)) - for i := range all { - all[i] = uint64(i) - } - err = iw.WritePostings("", "", newListPostings(all...)) - testutil.Ok(t, err) - testutil.Ok(t, mi.WritePostings("", "", newListPostings(all...))) - - for _, l := range postings.SortedKeys() { - err := iw.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value)) - testutil.Ok(t, err) - mi.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value)) - } - err = iw.Close() testutil.Ok(t, err) @@ -457,7 +435,7 @@ func TestPersistence_index_e2e(t *testing.T) { testutil.Equals(t, explset, lset) testutil.Equals(t, expchks, chks) } - testutil.Assert(t, expp.Next() == false, "Unexpected Next() for "+p.Name+" "+p.Value) + testutil.Assert(t, expp.Next() == false, "Expected no more postings for %q=%q", p.Name, p.Value) testutil.Ok(t, gotp.Err()) } diff --git a/tsdb/mocks_test.go b/tsdb/mocks_test.go index d9b1056563..241802ec23 100644 --- a/tsdb/mocks_test.go +++ b/tsdb/mocks_test.go @@ -17,7 +17,6 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" - "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tombstones" ) @@ -62,9 +61,9 @@ func (m *mockIndexWriter) AddSeries(ref uint64, l labels.Labels, chunks ...chunk return nil } -func (mockIndexWriter) WriteLabelIndex(names []string, values []string) error { return nil } -func (mockIndexWriter) WritePostings(name, value string, it index.Postings) error { return nil } -func (mockIndexWriter) Close() error { return nil } +func (mockIndexWriter) WriteLabelIndex(names []string, values []string) error { return nil } +func (mockIndexWriter) WritePostings() error { return nil } +func (mockIndexWriter) Close() error { return nil } type mockBReader struct { ir IndexReader