diff --git a/block.go b/block.go index 40126a3624..365d2f6dc6 100644 --- a/block.go +++ b/block.go @@ -14,7 +14,7 @@ package tsdb import ( - "bufio" + "encoding/binary" "encoding/json" "fmt" "io/ioutil" @@ -155,6 +155,79 @@ func writeMetaFile(dir string, meta *BlockMeta) error { return renameFile(tmp, path) } +func readTombstoneFile(dir string) (TombstoneReader, error) { + return newTombStoneReader(dir) +} + +func writeTombstoneFile(dir string, tr TombstoneReader) error { + path := filepath.Join(dir, tombstoneFilename) + tmp := path + ".tmp" + + f, err := os.Create(tmp) + if err != nil { + return err + } + + stoneOff := make(map[uint32]int64) // The map that holds the ref to offset vals. + refs := []uint32{} // Sorted refs. + + pos := int64(0) + buf := encbuf{b: make([]byte, 2*binary.MaxVarintLen64)} + for tr.Next() { + s := tr.At() + + refs = append(refs, s.ref) + stoneOff[s.ref] = pos + + // Write the ranges. + buf.reset() + buf.putVarint64(int64(len(s.ranges))) + n, err := f.Write(buf.get()) + if err != nil { + return err + } + pos += int64(n) + + for _, r := range s.ranges { + buf.reset() + buf.putVarint64(r.mint) + buf.putVarint64(r.maxt) + n, err = f.Write(buf.get()) + if err != nil { + return err + } + pos += int64(n) + } + } + + // Write the offset table. + buf.reset() + buf.putBE32int(len(refs)) + for _, ref := range refs { + buf.reset() + buf.putBE32(ref) + buf.putBE64int64(stoneOff[ref]) + _, err = f.Write(buf.get()) + if err != nil { + return err + } + } + + // Write the offset to the offset table. + buf.reset() + buf.putBE64int64(pos) + _, err = f.Write(buf.get()) + if err != nil { + return err + } + + if err := f.Close(); err != nil { + return err + } + + return renameFile(tmp, path) +} + type persistedBlock struct { dir string meta BlockMeta @@ -226,6 +299,9 @@ func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error { Outer: for p.Next() { lset, chunks, err := ir.Series(p.At()) + if err != nil { + return err + } for _, abs := range absent { if lset.Get(abs) != "" { @@ -235,10 +311,10 @@ Outer: // XXX(gouthamve): Adjust mint and maxt to match the time-range in the chunks? for _, chk := range chunks { - if (mint <= chk.MinTime && maxt >= MinTime) || + if (mint <= chk.MinTime && maxt >= chk.MinTime) || (mint > chk.MinTime && mint <= chk.MaxTime) { vPostings = append(vPostings, p.At()) - continue + continue Outer } } } @@ -248,127 +324,239 @@ Outer: } // Merge the current and new tombstones. - tr := ir.tombstones() - stones := make([]rip, 0, len(vPostings)) - i := 0 - for tr.Next() { - stone := tr.At() - for stone.ref > vPostings[i] { - stones = append(stones, rip{ref: vPostings[i], mint: mint, maxt: maxt}) - i++ - } + tr := newMapTombstoneReader(ir.tombstones) + str := newSimpleTombstoneReader(vPostings, []trange{mint, maxt}) + tombreader := newMergedTombstoneReader(tr, str) - if stone.ref == vPostings[i] { - if stone.mint > mint { - stone.mint = mint - } - if stone.maxt < maxt { - stone.maxt = maxt - } - - stones = append(stones, stone) - continue - } - - stones = append(stones, stone) - } - - path := filepath.Join(pb.dir, tombstoneFilename) - tmp := path + ".tmp" - - f, err := os.Create(tmp) - if err != nil { - return err - } - - // TODO: Proper format and all. - buf := encbuf{b: make([]byte, 0, 20)} - fbuf := bufio.NewWriterSize(f, 20) - - for _, stone := range stones { - buf.reset() - buf.putBE32(stone.ref) - buf.putBE64int64(stone.mint) - buf.putBE64int64(stone.maxt) - - _, err := fbuf.Write(buf.get()) - if err != nil { - return err - } - } - - if err := fbuf.Flush(); err != nil { - return err - } - if err := f.Close(); err != nil { - return err - } - - return renameFile(tmp, path) + return writeTombstoneFile(pb.dir, tombreader) } -// rip (after rest-in-peace) holds the information on the posting and time-range +// stone holds the information on the posting and time-range // that is deleted. -type rip struct { - ref uint32 - mint, maxt int64 +type stone struct { + ref uint32 + ranges []trange } -// TODO(gouthamve): Move to cur and reduction in byte-array vis-a-vis BEPostings. +// TombstoneReader is the iterator over tombstones. +type TombstoneReader interface { + Next() bool + At() stone + Err() error +} + +var emptyTombstoneReader = newMapTombstoneReader(make(map[uint32][]trange)) + type tombstoneReader struct { - data []byte - idx int - len int + stones []byte + idx int + len int + + b []byte + err error } -func newTombStoneReader(data []byte) *tombstoneReader { - // TODO(gouthamve): Error handling. - return &tombstoneReader{data: data, idx: -1, len: len(data) / 20} +func newTombStoneReader(dir string) (*tombstoneReader, error) { + // TODO(gouthamve): MMAP? + b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) + if err != nil { + return nil, err + } + + offsetBytes := b[len(b)-8:] + d := &decbuf{b: offsetBytes} + off := d.be64int64() + if err := d.err(); err != nil { + return nil, err + } + + d = &decbuf{b: b[off:]} + numStones := d.be64int64() + if err := d.err(); err != nil { + return nil, err + } + + return &tombstoneReader{ + stones: b[off+8 : (off+8)+(numStones*12)], + idx: -1, + len: int(numStones), + + b: b, + }, nil } func (t *tombstoneReader) Next() bool { + if t.err != nil { + return false + } + t.idx++ return t.idx < t.len } -func (t *tombstoneReader) At() rip { - bytIdx := t.idx * (4 + 8 + 8) - dat := t.data[bytIdx : bytIdx+20] +func (t *tombstoneReader) At() stone { + bytIdx := t.idx * (4 + 8) + dat := t.stones[bytIdx : bytIdx+12] - db := &decbuf{b: dat} - ref := db.be32() - mint := db.be64int64() - maxt := db.be64int64() + d := &decbuf{b: dat} + ref := d.be32() + off := d.be64int64() - // TODO(gouthamve): Handle errors. - return rip{ref: ref, mint: mint, maxt: maxt} -} - -func (t *tombstoneReader) Seek(ref uint32) bool { - if s := t.At(); s.ref >= ref { - return true + d = &decbuf{b: t.b[off:]} + numRanges := d.varint64() + if err := d.err(); err != nil { + t.err = err + return stone{ref: ref} } - i := sort.Search(t.len-t.idx, func(i int) bool { - bytIdx := (t.idx + i) * 20 - dat := t.data[bytIdx : bytIdx+20] - - db := &decbuf{b: dat} - ref2 := db.be32() - if ref >= ref2 { - return true + dranges := make([]trange, 0, numRanges) + for i := 0; i < int(numRanges); i++ { + mint := d.varint64() + maxt := d.varint64() + if err := d.err(); err != nil { + t.err = err + return stone{ref: ref, ranges: dranges} } - }) - t.idx += idx - return t.idx < t.len + dranges = append(dranges, trange{mint, maxt}) + } + + return stone{ref: ref, ranges: dranges} } func (t *tombstoneReader) Err() error { + return t.err +} + +type mapTombstoneReader struct { + refs []uint32 + cur uint32 + + stones map[uint32][]trange +} + +func newMapTombstoneReader(ts map[uint32][]trange) *mapTombstoneReader { + refs := make([]uint32, 0, len(ts)) + for k := range ts { + refs = append(refs, k) + } + sort.Sort(uint32slice(refs)) + return &mapTombstoneReader{stones: ts, refs: refs} +} + +func (t *mapTombstoneReader) Next() bool { + if len(t.refs) > 0 { + t.cur = t.refs[0] + return true + } + + t.cur = 0 + return false +} + +func (t *mapTombstoneReader) At() stone { + return stone{ref: t.cur, ranges: t.stones[t.cur]} +} + +func (t *mapTombstoneReader) Err() error { return nil } +type simpleTombstoneReader struct { + refs []uint32 + cur uint32 + + ranges []trange +} + +func newSimpleTombstoneReader(refs []uint32, drange []trange) *simpleTombstoneReader { + return &simpleTombstoneReader{refs: refs, ranges: drange} +} + +func (t *simpleTombstoneReader) Next() bool { + if len(t.refs) > 0 { + t.cur = t.refs[0] + return true + } + + t.cur = 0 + return false +} + +func (t *simpleTombstoneReader) At() stone { + return stone{ref: t.cur, ranges: t.ranges} +} + +func (t *simpleTombstoneReader) Err() error { + return nil +} + +type mergedTombstoneReader struct { + a, b TombstoneReader + cur stone + + initialized bool + aok, bok bool +} + +func newMergedTombstoneReader(a, b TombstoneReader) *mergedTombstoneReader { + return &mergedTombstoneReader{a: a, b: b} +} + +func (t *mergedTombstoneReader) Next() bool { + if !t.initialized { + t.aok = t.a.Next() + t.bok = t.b.Next() + t.initialized = true + } + + if !t.aok && !t.bok { + return false + } + + if !t.aok { + t.cur = t.b.At() + t.bok = t.b.Next() + return true + } + if !t.bok { + t.cur = t.a.At() + t.aok = t.a.Next() + return true + } + + acur, bcur := t.a.At(), t.b.At() + + if acur.ref < bcur.ref { + t.cur = acur + t.aok = t.a.Next() + } else if acur.ref > bcur.ref { + t.cur = bcur + t.bok = t.b.Next() + } else { + t.cur = acur + // Merge time ranges. + for _, r := range bcur.ranges { + acur.ranges = addNewInterval(acur.ranges, r) + } + t.aok = t.a.Next() + t.bok = t.b.Next() + } + return true +} + +func (t *mergedTombstoneReader) At() stone { + return t.cur +} + +func (t *mergedTombstoneReader) Err() error { + if t.a.Err() != nil { + return t.a.Err() + } + return t.b.Err() +} + func chunkDir(dir string) string { return filepath.Join(dir, "chunks") } func walDir(dir string) string { return filepath.Join(dir, "wal") } diff --git a/chunks.go b/chunks.go index 1edb6d2ea1..5e89267e2b 100644 --- a/chunks.go +++ b/chunks.go @@ -55,6 +55,38 @@ func (tr trange) inBounds(t int64) bool { return t >= tr.mint && t <= tr.maxt } +// This adds the new time-range to the existing ones. +// The existing ones must be sorted. +func addNewInterval(existing []trange, n trange) []trange { + for i, r := range existing { + if r.inBounds(n.mint) { + if n.maxt > r.maxt { + existing[i].maxt = n.maxt + } + + return existing + } + if r.inBounds(n.maxt) { + if n.mint < r.maxt { + existing[i].mint = n.mint + } + + return existing + } + + if n.mint < r.mint { + newRange := existing[:i] + newRange = append(newRange, n) + newRange = append(newRange, existing[i:]...) + + return newRange + } + } + + existing = append(existing, n) + return existing +} + // writeHash writes the chunk encoding and raw data into the provided hash. func (cm *ChunkMeta) writeHash(h hash.Hash) error { if _, err := h.Write([]byte{byte(cm.Chunk.Encoding())}); err != nil { @@ -114,7 +146,7 @@ Outer: return false } -func (it *deletedIterator) Err() { +func (it *deletedIterator) Err() error { return it.Err() } @@ -252,6 +284,27 @@ func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error { maxLen := int64(binary.MaxVarintLen32) // The number of chunks. for _, c := range chks { maxLen += binary.MaxVarintLen32 + 1 // The number of bytes in the chunk and its encoding. + + // Remove the deleted parts. + if c.deleted { + // TODO(gouthamve): Try to do it in-place somehow? + chk := chunks.NewXORChunk() + app, err := chk.Appender() + if err != nil { + return err + } + it := c.Iterator() + for it.Next() { + ts, v := it.At() + app.Append(ts, v) + } + + if err := it.Err(); err != nil { + return err + } + c.Chunk = chk + } + maxLen += int64(len(c.Chunk.Bytes())) } newsz := w.n + maxLen diff --git a/encoding_helpers.go b/encoding_helpers.go index 486930d222..50189e0bbe 100644 --- a/encoding_helpers.go +++ b/encoding_helpers.go @@ -72,9 +72,9 @@ type decbuf struct { e error } -func (d *decbuf) uvarint() int { return int(d.uvarint64()) } -func (d *decbuf) be32int() int { return int(d.be32()) } -func (d *decbuf) be64int64() int { return int64(d.be64()) } +func (d *decbuf) uvarint() int { return int(d.uvarint64()) } +func (d *decbuf) be32int() int { return int(d.be32()) } +func (d *decbuf) be64int64() int64 { return int64(d.be64()) } func (d *decbuf) uvarintStr() string { l := d.uvarint64() diff --git a/head.go b/head.go index 1c7e90d97e..4b59a9d749 100644 --- a/head.go +++ b/head.go @@ -66,6 +66,8 @@ type HeadBlock struct { values map[string]stringset // label names to possible values postings *memPostings // postings lists for terms + tombstones map[uint32][]trange + meta BlockMeta } @@ -94,6 +96,7 @@ func TouchHeadBlock(dir string, seq int, mint, maxt int64) error { }); err != nil { return err } + return renameFile(tmp, dir) } @@ -105,13 +108,14 @@ func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) { } h := &HeadBlock{ - dir: dir, - wal: wal, - series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil. - hashes: map[uint64][]*memSeries{}, - values: map[string]stringset{}, - postings: &memPostings{m: make(map[term][]uint32)}, - meta: *meta, + dir: dir, + wal: wal, + series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil. + hashes: map[uint64][]*memSeries{}, + values: map[string]stringset{}, + postings: &memPostings{m: make(map[term][]uint32)}, + meta: *meta, + tombstones: make(map[uint32][]trange), } return h, h.init() } @@ -138,7 +142,20 @@ func (h *HeadBlock) init() error { h.meta.Stats.NumSamples++ } } - return errors.Wrap(r.Err(), "consume WAL") + if err := r.Err(); err != nil { + return errors.Wrap(err, "consume WAL") + } + + tr, err := readTombstoneFile(h.dir) + if err != nil { + return errors.Wrap(err, "read tombstones file") + } + + for tr.Next() { + s := tr.At() + h.tombstones[s.ref] = s.ranges + } + return errors.Wrap(err, "tombstones reader iteration") } // inBounds returns true if the given timestamp is within the valid @@ -206,7 +223,44 @@ func (h *HeadBlock) Index() IndexReader { return &headIndexReader{h} } func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} } // Delete implements headBlock -func (h *HeadBlock) Delete(int64, int64, ...labels.Matcher) error { return nil } +func (h *HeadBlock) Delete(mint int64, maxt int64, ms ...labels.Matcher) error { + h.mtx.RLock() + + ir := h.Index() + + pr := newPostingsReader(ir) + p, absent := pr.Select(ms...) + + h.mtx.RUnlock() + + h.mtx.Lock() // We are modifying the tombstones here. + defer h.mtx.Unlock() + +Outer: + for p.Next() { + ref := p.At() + lset := h.series[ref].lset + for _, abs := range absent { + if lset.Get(abs) != "" { + continue Outer + } + } + + rs, ok := h.tombstones[ref] + if !ok { + h.tombstones[ref] = []trange{{mint, maxt}} + continue + } + + h.tombstones[ref] = addNewInterval(rs, trange{mint, maxt}) + } + + if p.Err() != nil { + return p.Err() + } + + return writeTombstoneFile(h.dir, newMapTombstoneReader(h.tombstones)) +} // Querier implements Queryable and headBlock func (h *HeadBlock) Querier(mint, maxt int64) Querier { @@ -527,6 +581,9 @@ func (h *headIndexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error if int(ref) >= len(h.series) { return nil, nil, ErrNotFound } + + dranges, deleted := h.tombstones[ref] + s := h.series[ref] metas := make([]*ChunkMeta, 0, len(s.chunks)) @@ -538,6 +595,9 @@ func (h *headIndexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error MinTime: c.minTime, MaxTime: c.maxTime, Ref: (uint64(ref) << 32) | uint64(i), + + deleted: deleted, + dranges: dranges, }) } diff --git a/index.go b/index.go index 051555dd21..3536468185 100644 --- a/index.go +++ b/index.go @@ -165,6 +165,10 @@ func newIndexWriter(dir string) (*indexWriter, error) { if err := iw.writeMeta(); err != nil { return nil, err } + // TODO(gouthamve): Figure out where this function goes, index or block. + if err := writeTombstoneFile(dir, emptyTombstoneReader); err != nil { + return nil, err + } return iw, nil } @@ -538,8 +542,7 @@ type indexReader struct { labels map[string]uint32 postings map[string]uint32 - // The underlying byte slice holding the tombstone data. - tomb []byte + tombstones map[uint32][]trange } var ( @@ -576,13 +579,17 @@ func newIndexReader(dir string) (*indexReader, error) { return nil, errors.Wrap(err, "read postings table") } - tf, err := openMmapFile(filepath.Join(dir, tombstoneFilename)) + tr, err := readTombstoneFile(dir) if err != nil { - return err + return r, err + } + r.tombstones = make(map[uint32][]trange) + for tr.Next() { + s := tr.At() + r.tombstones[s.ref] = s.ranges } - r.tomb = tf.b - return r, nil + return r, tr.Err() } func (r *indexReader) readTOC() error { @@ -750,17 +757,7 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) { lbls = append(lbls, labels.Label{Name: ln, Value: lv}) } - // TODO: This sucks! Put tombstones in map. - tr := r.tombstones() - dmint, dmaxt := 0 - del := false - if tr.Seek(ref) { - s := tr.At() - if s.ref == ref { - del = true - dmint, dmaxt = s.mint, s.maxt - } - } + s, deleted := r.tombstones[ref] // Read the chunks meta data. k = int(d2.uvarint()) @@ -781,8 +778,8 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) { MinTime: mint, MaxTime: maxt, - deleted: del, - dranges: []trange{{dmint, dmaxt}}, + deleted: deleted, + dranges: s, }) } @@ -814,10 +811,6 @@ func (r *indexReader) Postings(name, value string) (Postings, error) { return newBigEndianPostings(d2.get()), nil } -func (r *indexReader) tombstones() *tombstoneReader { - return newTombStoneReader(r.tomb[:]) -} - type stringTuples struct { l int // tuple length s []string // flattened tuple entries