From b08f82fa4e76481c42ab6c9bb4032e34ac7acd42 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 16 Dec 2016 12:13:17 +0100 Subject: [PATCH] Pre-select relevant chunks on series access. This adds interval metadata to indexed chunks. The queried interval is used to filter chunks when queried from the index to save unnecessary accesses of the chunks file. This is especially relevant for series that come and go often and larger files. --- db.go | 8 +++ head.go | 9 ++-- querier.go | 120 ++++++++++++++++++++++++++------------------ reader.go | 81 +++++++++++++++--------------- test/labels_test.go | 53 +++++++++++++------ writer.go | 38 +++++++------- 6 files changed, 183 insertions(+), 126 deletions(-) diff --git a/db.go b/db.go index e9abe73e75..42f01b84af 100644 --- a/db.go +++ b/db.go @@ -293,6 +293,10 @@ func intervalOverlap(amin, amax, bmin, bmax int64) bool { return false } +func intervalContains(min, max, t int64) bool { + return t >= min && t <= max +} + // blocksForRange returns all blocks within the shard that may contain // data for the given time range. func (s *Shard) blocksForInterval(mint, maxt int64) []block { @@ -312,6 +316,8 @@ func (s *Shard) blocksForInterval(mint, maxt int64) []block { bs = append(bs, s.head) } + fmt.Println("blocks for interval", bs) + return bs } @@ -393,6 +399,7 @@ type chunkDesc struct { chunk chunks.Chunk // Caching fields. + firsTimestamp int64 lastTimestamp int64 lastValue float64 @@ -405,6 +412,7 @@ func (cd *chunkDesc) append(ts int64, v float64) (err error) { if err != nil { return err } + cd.firsTimestamp = ts } if err := cd.app.Append(ts, v); err != nil { return err diff --git a/head.go b/head.go index c27d72958c..a358a8e1ce 100644 --- a/head.go +++ b/head.go @@ -76,15 +76,18 @@ func (h *HeadBlock) Postings(name, value string) (Postings, error) { } // Series returns the series for the given reference. -func (h *HeadBlock) Series(ref uint32) (Series, error) { +func (h *HeadBlock) Series(ref uint32, mint, maxt int64) (Series, error) { cd, ok := h.index.forward[ref] if !ok { return nil, errNotFound } + if !intervalOverlap(cd.firsTimestamp, cd.lastTimestamp, mint, maxt) { + return nil, nil + } s := &series{ labels: cd.lset, - offsets: []ChunkOffset{ - {Value: h.stats.MinTime, Offset: 0}, + chunks: []ChunkMeta{ + {MinTime: h.stats.MinTime, Ref: 0}, }, chunk: func(ref uint32) (chunks.Chunk, error) { return cd.chunk, nil diff --git a/querier.go b/querier.go index 3a970d740f..33141c32a5 100644 --- a/querier.go +++ b/querier.go @@ -47,16 +47,13 @@ type Querier interface { type Series interface { // Labels returns the complete set of labels identifying the series. Labels() Labels + // Iterator returns a new iterator of the data of the series. Iterator() SeriesIterator // Ref() uint32 } -func inRange(x, mint, maxt int64) bool { - return x >= mint && x <= maxt -} - // querier merges query results from a set of shard querieres. type querier struct { mint, maxt int64 @@ -164,6 +161,8 @@ func (q *blockQuerier) Select(ms ...Matcher) SeriesSet { return &blockSeriesSet{ index: q.index, it: Intersect(its...), + mint: q.mint, + maxt: q.maxt, } } @@ -379,34 +378,67 @@ func (s *shardSeriesSet) Next() bool { // blockSeriesSet is a set of series from an inverted index query. type blockSeriesSet struct { - index IndexReader - it Postings + index IndexReader + it Postings + mint, maxt int64 err error cur Series } func (s *blockSeriesSet) Next() bool { - // Get next reference from postings iterator. - if !s.it.Next() { + // Step through the postings iterator to find potential series. + // Resolving series may return nil if no applicable data for the + // time range exists and we can skip to the next series. + for s.it.Next() { + series, err := s.index.Series(s.it.Value(), s.mint, s.maxt) + if err != nil { + s.err = err + return false + } + if series != nil { + s.cur = series + return true + } + } + if s.it.Err() != nil { s.err = s.it.Err() - return false } - - // Resolve reference to series. - series, err := s.index.Series(s.it.Value()) - if err != nil { - s.err = err - return false - } - - s.cur = series - return true + return false } func (s *blockSeriesSet) Series() Series { return s.cur } func (s *blockSeriesSet) Err() error { return s.err } +type series struct { + labels Labels + chunks []ChunkMeta // in-order chunk refs + + chunk func(ref uint32) (chunks.Chunk, error) +} + +func (s *series) Labels() Labels { + return s.labels +} + +func (s *series) Iterator() SeriesIterator { + var cs []chunks.Chunk + var mints []int64 + + for _, co := range s.chunks { + c, err := s.chunk(co.Ref) + if err != nil { + panic(err) // TODO(fabxc): add error series iterator. + } + cs = append(cs, c) + mints = append(mints, co.MinTime) + } + + // TODO(fabxc): consider pushing chunk retrieval further down. In practice, we + // probably have to touch all chunks anyway and it doesn't matter. + return newChunkSeriesIterator(mints, cs) +} + // SeriesIterator iterates over the data of a time series. type SeriesIterator interface { // Seek advances the iterator forward to the given timestamp. @@ -421,6 +453,7 @@ type SeriesIterator interface { Err() error } +// chainedSeries implements a series for a list of time-sorted series. type chainedSeries struct { series []Series } @@ -430,46 +463,29 @@ func (s *chainedSeries) Labels() Labels { } func (s *chainedSeries) Iterator() SeriesIterator { - it := &chainedSeriesIterator{ - series: make([]SeriesIterator, 0, len(s.series)), - } - for _, series := range s.series { - it.series = append(it.series, series.Iterator()) - } - return it + return &chainedSeriesIterator{series: s.series} } // chainedSeriesIterator implements a series iterater over a list // of time-sorted, non-overlapping iterators. type chainedSeriesIterator struct { - mints []int64 // minimum timestamps for each iterator - series []SeriesIterator // iterators in time order + series []Series // series in time order i int cur SeriesIterator } func (it *chainedSeriesIterator) Seek(t int64) bool { - x := sort.Search(len(it.mints), func(i int) bool { return it.mints[i] >= t }) - - if x == len(it.mints) { - return false - } - if it.mints[x] == t { - if x == 0 { - return false - } - x-- - } - - it.i = x - it.cur = it.series[x] - - for it.cur.Next() { - t0, _ := it.cur.Values() - if t0 >= t { - break + // We just scan the chained series sequentially as they are already + // pre-selected by relevant time and should be accessed sequentially anyway. + for i, s := range it.series[it.i:] { + cur := s.Iterator() + if !cur.Seek(t) { + continue } + it.cur = cur + it.i += i + return true } return false } @@ -486,7 +502,7 @@ func (it *chainedSeriesIterator) Next() bool { } it.i++ - it.cur = it.series[it.i] + it.cur = it.series[it.i].Iterator() return it.Next() } @@ -509,8 +525,12 @@ type chunkSeriesIterator struct { cur chunks.Iterator } -func newChunkSeriesIterator(cs []chunks.Chunk) *chunkSeriesIterator { +func newChunkSeriesIterator(mints []int64, cs []chunks.Chunk) *chunkSeriesIterator { + if len(mints) != len(cs) { + panic("chunk references and chunks length don't match") + } return &chunkSeriesIterator{ + mints: mints, chunks: cs, i: 0, cur: cs[0].Iterator(), @@ -536,7 +556,7 @@ func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { for it.cur.Next() { t0, _ := it.cur.Values() if t0 >= t { - break + return true } } return false diff --git a/reader.go b/reader.go index 73e88de33a..ff5beb8a15 100644 --- a/reader.go +++ b/reader.go @@ -58,7 +58,7 @@ type IndexReader interface { Postings(name, value string) (Postings, error) // Series returns the series for the given reference. - Series(ref uint32) (Series, error) + Series(ref uint32, mint, maxt int64) (Series, error) } // StringTuples provides access to a sorted list of string tuples. @@ -231,7 +231,7 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { return st, nil } -func (r *indexReader) Series(ref uint32) (Series, error) { +func (r *indexReader) Series(ref uint32, mint, maxt int64) (Series, error) { k, n := binary.Uvarint(r.b[ref:]) if n < 1 { return nil, errInvalidSize @@ -249,13 +249,15 @@ func (r *indexReader) Series(ref uint32) (Series, error) { b = b[n:] } - // Offests must occur in pairs representing name and value. + // Symbol offests must occur in pairs representing name and value. if len(offsets)&1 != 0 { return nil, errInvalidSize } - // TODO(fabxc): Fully materialize series for now. Figure out later if it + // TODO(fabxc): Fully materialize series symbols for now. Figure out later if it // makes sense to decode those lazily. + // If we use unsafe strings the there'll be no copy overhead. + // // The references are expected to be sorted and match the order of // the underlying strings. labels := make(Labels, 0, k) @@ -275,17 +277,28 @@ func (r *indexReader) Series(ref uint32) (Series, error) { }) } - // Read the chunk offsets. - k, n = binary.Uvarint(r.b[ref:]) + // Read the chunks meta data. + k, n = binary.Uvarint(b) if n < 1 { return nil, errInvalidSize } b = b[n:] - coffsets := make([]ChunkOffset, 0, k) + chunks := make([]ChunkMeta, 0, k) for i := 0; i < int(k); i++ { - v, n := binary.Varint(b) + firstTime, n := binary.Varint(b) + if n < 1 { + return nil, errInvalidSize + } + b = b[n:] + + // Terminate early if we exceeded the queried time range. + if firstTime > maxt { + break + } + + lastTime, n := binary.Varint(b) if n < 1 { return nil, errInvalidSize } @@ -297,18 +310,28 @@ func (r *indexReader) Series(ref uint32) (Series, error) { } b = b[n:] - coffsets = append(coffsets, ChunkOffset{ - Offset: uint32(o), - Value: v, + // Skip the chunk if it is before the queried time range. + if lastTime < mint { + continue + } + + chunks = append(chunks, ChunkMeta{ + Ref: uint32(o), + MinTime: firstTime, + MaxTime: lastTime, }) } - - s := &series{ - labels: labels, - offsets: coffsets, - chunk: r.series.Chunk, + // If no chunks applicable to the time range were found, the series + // can be skipped. + if len(chunks) == 0 { + return nil, nil } - return s, nil + + return &series{ + labels: labels, + chunks: chunks, + chunk: r.series.Chunk, + }, nil } func (r *indexReader) Postings(name, value string) (Postings, error) { @@ -344,30 +367,6 @@ func (r *indexReader) Postings(name, value string) (Postings, error) { return &listIterator{list: l, idx: -1}, nil } -type series struct { - labels Labels - offsets []ChunkOffset // in-order chunk refs - chunk func(ref uint32) (chunks.Chunk, error) -} - -func (s *series) Labels() Labels { - return s.labels -} - -func (s *series) Iterator() SeriesIterator { - var cs []chunks.Chunk - - for _, co := range s.offsets { - c, err := s.chunk(co.Offset) - if err != nil { - panic(err) // TODO(fabxc): add error series iterator. - } - cs = append(cs, c) - } - - return newChunkSeriesIterator(cs) -} - type stringTuples struct { l int // tuple length s []string // flattened tuple entries diff --git a/test/labels_test.go b/test/labels_test.go index a802009b71..7349a75c06 100644 --- a/test/labels_test.go +++ b/test/labels_test.go @@ -2,6 +2,7 @@ package test import ( "bytes" + "crypto/rand" "testing" "github.com/fabxc/tsdb" @@ -55,40 +56,64 @@ func BenchmarkLabelSetAccess(b *testing.B) { } func BenchmarkStringBytesEquals(b *testing.B) { + randBytes := func(n int) ([]byte, []byte) { + buf1 := make([]byte, n) + if _, err := rand.Read(buf1); err != nil { + b.Fatal(err) + } + buf2 := make([]byte, n) + copy(buf1, buf2) + + return buf1, buf2 + } + cases := []struct { name string - a, b string + f func() ([]byte, []byte) }{ { name: "equal", - a: "sdfn492cn9xwm0ws8r,4932x98f,uj594cxf594802h875hgzz0h3586x8xz,359", - b: "sdfn492cn9xwm0ws8r,4932x98f,uj594cxf594802h875hgzz0h3586x8xz,359", + f: func() ([]byte, []byte) { + return randBytes(60) + }, }, { name: "1-flip-end", - a: "sdfn492cn9xwm0ws8r,4932x98f,uj594cxf594802h875hgzz0h3586x8xz,359", - b: "sdfn492cn9xwm0ws8r,4932x98f,uj594cxf594802h875hgzz0h3586x8xz,353", + f: func() ([]byte, []byte) { + b1, b2 := randBytes(60) + b2[59] ^= b2[59] + return b1, b2 + }, }, { name: "1-flip-middle", - a: "sdfn492cn9xwm0ws8r,4932x98f,uj594cxf594802h875hgzz0h3586x8xz,359", - b: "sdfn492cn9xwm0ws8r,4932x98f,uj504cxf594802h875hgzz0h3586x8xz,359", + f: func() ([]byte, []byte) { + b1, b2 := randBytes(60) + b2[29] ^= b2[29] + return b1, b2 + }, }, { name: "1-flip-start", - a: "sdfn492cn9xwm0ws8r,4932x98f,uj594cxf594802h875hgzz0h3586x8xz,359", - b: "adfn492cn9xwm0ws8r,4932x98f,uj594cxf594802h875hgzz0h3586x8xz,359", + f: func() ([]byte, []byte) { + b1, b2 := randBytes(60) + b2[0] ^= b2[0] + return b1, b2 + }, }, { name: "different-length", - a: "sdfn492cn9xwm0ws8r,4932x98f,uj594cxf594802h875hgzz0h3586x8xz,359", - b: "sdfn492cn9xwm0ws8r,4932x98f,uj594cxf594802h875hgzz0h3586x8xz,35", + f: func() ([]byte, []byte) { + b1, b2 := randBytes(60) + return b1, b2[:59] + }, }, } for _, c := range cases { b.Run(c.name+"-strings", func(b *testing.B) { - as, bs := c.a, c.b + ab, bb := c.f() + as, bs := string(ab), string(bb) b.SetBytes(int64(len(as))) var r bool @@ -100,7 +125,7 @@ func BenchmarkStringBytesEquals(b *testing.B) { }) b.Run(c.name+"-bytes", func(b *testing.B) { - ab, bb := []byte(c.a), []byte(c.b) + ab, bb := c.f() b.SetBytes(int64(len(ab))) var r bool @@ -112,7 +137,7 @@ func BenchmarkStringBytesEquals(b *testing.B) { }) b.Run(c.name+"-bytes-length-check", func(b *testing.B) { - ab, bb := []byte(c.a), []byte(c.b) + ab, bb := c.f() b.SetBytes(int64(len(ab))) var r bool diff --git a/writer.go b/writer.go index 83f8449eee..72dbdf8a38 100644 --- a/writer.go +++ b/writer.go @@ -92,13 +92,13 @@ func (w *seriesWriter) WriteSeries(ref uint32, lset Labels, chks []*chunkDesc) e return err } - offsets := make([]ChunkOffset, 0, len(chks)) - lastTimestamp := w.baseTimestamp + metas := make([]ChunkMeta, 0, len(chks)) for _, cd := range chks { - offsets = append(offsets, ChunkOffset{ - Value: lastTimestamp, - Offset: uint32(w.n), + metas = append(metas, ChunkMeta{ + MinTime: cd.firsTimestamp, + MaxTime: cd.lastTimestamp, + Ref: uint32(w.n), }) n = binary.PutUvarint(b[:], uint64(len(cd.chunk.Bytes()))) @@ -111,7 +111,6 @@ func (w *seriesWriter) WriteSeries(ref uint32, lset Labels, chks []*chunkDesc) e if err := w.write(wr, cd.chunk.Bytes()); err != nil { return err } - lastTimestamp = cd.lastTimestamp } if err := w.write(w.w, h.Sum(nil)); err != nil { @@ -119,7 +118,7 @@ func (w *seriesWriter) WriteSeries(ref uint32, lset Labels, chks []*chunkDesc) e } if w.index != nil { - w.index.AddSeries(ref, lset, offsets...) + w.index.AddSeries(ref, lset, metas...) } return nil } @@ -141,9 +140,10 @@ func (w *seriesWriter) Close() error { return nil } -type ChunkOffset struct { - Value int64 - Offset uint32 +type ChunkMeta struct { + Ref uint32 + MinTime int64 + MaxTime int64 } type BlockStats struct { @@ -161,7 +161,7 @@ type IndexWriter interface { // of chunks that the index can reference. // The reference number is used to resolve a series against the postings // list iterator. It only has to be available during the write processing. - AddSeries(ref uint32, l Labels, o ...ChunkOffset) + AddSeries(ref uint32, l Labels, chunks ...ChunkMeta) // WriteStats writes final stats for the indexed block. WriteStats(BlockStats) error @@ -183,8 +183,8 @@ type IndexWriter interface { type indexWriterSeries struct { labels Labels - chunks []ChunkOffset // series file offset of chunks - offset uint32 // index file offset of series reference + chunks []ChunkMeta // series file offset of chunks + offset uint32 // index file offset of series reference } // indexWriter implements the IndexWriter interface for the standard @@ -242,7 +242,7 @@ func (w *indexWriter) writeMeta() error { return w.write(w.w, b[:]) } -func (w *indexWriter) AddSeries(ref uint32, lset Labels, offsets ...ChunkOffset) { +func (w *indexWriter) AddSeries(ref uint32, lset Labels, chunks ...ChunkMeta) { // Populate the symbol table from all label sets we have to reference. for _, l := range lset { w.symbols[l.Name] = 0 @@ -251,7 +251,7 @@ func (w *indexWriter) AddSeries(ref uint32, lset Labels, offsets ...ChunkOffset) w.series[ref] = &indexWriterSeries{ labels: lset, - chunks: offsets, + chunks: chunks, } } @@ -332,15 +332,17 @@ func (w *indexWriter) writeSeries() error { b = append(b, buf[:n]...) } - // Write skiplist to chunk offsets. + // Write chunks meta data including reference into chunk file. n = binary.PutUvarint(buf, uint64(len(s.chunks))) b = append(b, buf[:n]...) for _, c := range s.chunks { - n = binary.PutVarint(buf, c.Value) + n = binary.PutVarint(buf, c.MinTime) + b = append(b, buf[:n]...) + n = binary.PutVarint(buf, c.MaxTime) b = append(b, buf[:n]...) - n = binary.PutUvarint(buf, uint64(c.Offset)) + n = binary.PutUvarint(buf, uint64(c.Ref)) b = append(b, buf[:n]...) } }