diff --git a/block.go b/block.go index 9bb5d6ebb8..3e7b5ad247 100644 --- a/block.go +++ b/block.go @@ -11,8 +11,8 @@ import ( "github.com/pkg/errors" ) -// Block handles reads against a Block of time series data. -type Block interface { +// DiskBlock handles reads against a Block of time series data. +type DiskBlock interface { // Directory where block data is stored. Dir() string @@ -29,6 +29,32 @@ type Block interface { Close() error } +// Block is an interface to a DiskBlock that can also be queried. +type Block interface { + DiskBlock + // Queryable +} + +// HeadBlock is a regular block that can still be appended to. +type HeadBlock interface { + DiskBlock + Appendable +} + +// Appendable defines an entity to which data can be appended. +type Appendable interface { + // Appender returns a new Appender against an underlying store. + Appender() Appender + + // Busy returns whether there are any currently active appenders. + Busy() bool +} + +// Queryable defines an entity which provides a Querier. +type Queryable interface { + Queryable() Querier +} + // BlockMeta provides meta information about a block. type BlockMeta struct { // Unique identifier for the block and its contents. Changes on compaction. diff --git a/chunks.go b/chunks.go index b3aaeb206d..c628108b9a 100644 --- a/chunks.go +++ b/chunks.go @@ -15,7 +15,7 @@ import ( ) const ( - // MagicSeries 4 bytes at the head of series file. + // MagicChunks 4 bytes at the head of series file. MagicChunks = 0x85BD40DD ) diff --git a/db.go b/db.go index 4de657a397..e84a9228ad 100644 --- a/db.go +++ b/db.go @@ -11,7 +11,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" "unsafe" @@ -94,15 +93,14 @@ type DB struct { // Mutex for that must be held when modifying the general // block layout. - mtx sync.RWMutex - persisted []*persistedBlock - seqBlocks map[int]Block + mtx sync.RWMutex + blocks []Block + // seqBlocks map[int]Block // Mutex that must be held when modifying just the head blocks // or the general layout. headmtx sync.RWMutex - heads []*headBlock - headGen uint8 + heads []HeadBlock compactor Compactor @@ -237,11 +235,11 @@ func (db *DB) retentionCutoff() (bool, error) { // We don't count the span covered by head blocks towards the // retention time as it generally makes up a fraction of it. - if len(db.persisted) == 0 { + if len(db.blocks)-len(db.heads) == 0 { return false, nil } - last := db.persisted[len(db.persisted)-1] + last := db.blocks[len(db.blocks)-len(db.heads)-1] mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration) return retentionCutoff(db.dir, mint) @@ -252,7 +250,7 @@ func (db *DB) compact() (changes bool, err error) { // Check whether we have pending head blocks that are ready to be persisted. // They have the highest priority. - var singles []*headBlock + var singles []Block // Collect head blocks that are ready for compaction. Write them after // returning the lock to not block Appenders. @@ -263,7 +261,7 @@ func (db *DB) compact() (changes bool, err error) { // Blocks that won't be appendable when instantiating a new appender // might still have active appenders on them. // Abort at the first one we encounter. - if atomic.LoadUint64(&h.activeWriters) > 0 { + if h.Busy() { break } singles = append(singles, h) @@ -355,6 +353,15 @@ func retentionCutoff(dir string, mint int64) (bool, error) { return changes, fileutil.Fsync(df) } +func (db *DB) seqBlock(i int) (Block, bool) { + for _, b := range db.blocks { + if b.Meta().Sequence == i { + return b, true + } + } + return nil, false +} + func (db *DB) reloadBlocks() error { var cs []io.Closer defer closeAll(cs...) @@ -371,8 +378,8 @@ func (db *DB) reloadBlocks() error { } var ( metas []*BlockMeta - persisted []*persistedBlock - heads []*headBlock + blocks []Block + heads []HeadBlock seqBlocks = make(map[int]Block, len(dirs)) ) @@ -385,7 +392,7 @@ func (db *DB) reloadBlocks() error { } for i, meta := range metas { - b, ok := db.seqBlocks[meta.Sequence] + b, ok := db.seqBlock(meta.Sequence) if meta.Compaction.Generation == 0 { if !ok { @@ -397,7 +404,7 @@ func (db *DB) reloadBlocks() error { if meta.ULID != b.Meta().ULID { return errors.Errorf("head block ULID changed unexpectedly") } - heads = append(heads, b.(*headBlock)) + heads = append(heads, b.(HeadBlock)) } else { if !ok || meta.ULID != b.Meta().ULID { b, err = newPersistedBlock(dirs[i]) @@ -405,22 +412,21 @@ func (db *DB) reloadBlocks() error { return errors.Wrapf(err, "open persisted block %s", dirs[i]) } } - persisted = append(persisted, b.(*persistedBlock)) } seqBlocks[meta.Sequence] = b + blocks = append(blocks, b) } // Close all blocks that we no longer need. They are closed after returning all // locks to avoid questionable locking order. - for seq, b := range db.seqBlocks { - if nb, ok := seqBlocks[seq]; !ok || nb != b { + for _, b := range db.blocks { + if nb := seqBlocks[b.Meta().Sequence]; nb != b { cs = append(cs, b) } } - db.seqBlocks = seqBlocks - db.persisted = persisted + db.blocks = blocks db.heads = heads return nil @@ -436,12 +442,10 @@ func (db *DB) Close() error { var g errgroup.Group - for _, pb := range db.persisted { + // blocks also contains all head blocks. + for _, pb := range db.blocks { g.Go(pb.Close) } - for _, hb := range db.heads { - g.Go(hb.Close) - } var merr MultiError @@ -459,54 +463,59 @@ func (db *DB) Appender() Appender { // Only instantiate appender after returning the headmtx to avoid // questionable locking order. db.headmtx.RLock() - app := db.appendable() - heads := make([]*headBlock, len(app)) - copy(heads, app) - db.headmtx.RUnlock() - for _, b := range heads { - a.heads = append(a.heads, b.Appender().(*headAppender)) + for _, b := range app { + a.heads = append(a.heads, &metaAppender{ + meta: b.Meta(), + app: b.Appender().(*headAppender), + }) } return a } type dbAppender struct { - db *DB - heads []*headAppender + db *DB + heads []*metaAppender + samples int } +type metaAppender struct { + meta BlockMeta + app Appender +} + func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { h, err := a.appenderFor(t) if err != nil { return 0, err } - ref, err := h.Add(lset, t, v) + ref, err := h.app.Add(lset, t, v) if err != nil { return 0, err } a.samples++ - return ref | (uint64(h.generation) << 40), nil + // Store last byte of sequence number in 3rd byte of refernece. + return ref | (uint64(h.meta.Sequence^0xff) << 40), nil } func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error { - // We store the head generation in the 4th byte and use it to reject - // stale references. - gen := uint8((ref << 16) >> 56) + // Load the head last byte of the head sequence from the 3rd byte of the + // reference number. + gen := (ref << 16) >> 56 h, err := a.appenderFor(t) if err != nil { return err } - // If the reference pointed into a previous block, we cannot - // use it to append the sample. - if h.generation != gen { + // If the last byte of the sequence does not add up, the reference is not valid. + if uint64(h.meta.Sequence^0xff) != gen { return ErrNotFound } - if err := h.AddFast(ref, t, v); err != nil { + if err := h.app.AddFast(ref, t, v); err != nil { return err } @@ -516,12 +525,12 @@ func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error { // appenderFor gets the appender for the head containing timestamp t. // If the head block doesn't exist yet, it gets created. -func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { +func (a *dbAppender) appenderFor(t int64) (*metaAppender, error) { // If there's no fitting head block for t, ensure it gets created. if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime { a.db.headmtx.Lock() - var newHeads []*headBlock + var newHeads []HeadBlock if err := a.db.ensureHead(t); err != nil { a.db.headmtx.Unlock() @@ -532,7 +541,7 @@ func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { } else { maxSeq := a.heads[len(a.heads)-1].meta.Sequence for _, b := range a.db.appendable() { - if b.meta.Sequence > maxSeq { + if b.Meta().Sequence > maxSeq { newHeads = append(newHeads, b) } } @@ -543,7 +552,10 @@ func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { // Instantiate appenders after returning headmtx to avoid questionable // locking order. for _, b := range newHeads { - a.heads = append(a.heads, b.Appender().(*headAppender)) + a.heads = append(a.heads, &metaAppender{ + app: b.Appender(), + meta: b.Meta(), + }) } } for i := len(a.heads) - 1; i >= 0; i-- { @@ -570,11 +582,12 @@ func (db *DB) ensureHead(t int64) error { for { h := db.heads[len(db.heads)-1] + m := h.Meta() // If t doesn't exceed the range of heads blocks, there's nothing to do. - if t < h.meta.MaxTime { + if t < m.MaxTime { return nil } - if _, err := db.cut(h.meta.MaxTime); err != nil { + if _, err := db.cut(m.MaxTime); err != nil { return err } } @@ -584,7 +597,7 @@ func (a *dbAppender) Commit() error { var merr MultiError for _, h := range a.heads { - merr.Add(h.Commit()) + merr.Add(h.app.Commit()) } a.db.mtx.RUnlock() @@ -598,18 +611,22 @@ func (a *dbAppender) Rollback() error { var merr MultiError for _, h := range a.heads { - merr.Add(h.Rollback()) + merr.Add(h.app.Rollback()) } a.db.mtx.RUnlock() return merr.Err() } -func (db *DB) appendable() []*headBlock { - if len(db.heads) <= db.opts.AppendableBlocks { - return db.heads +// appendable returns a copy of a slice of HeadBlocks that can still be appended to. +func (db *DB) appendable() []HeadBlock { + var i int + app := make([]HeadBlock, 0, db.opts.AppendableBlocks) + + if len(db.heads) > db.opts.AppendableBlocks { + i = len(db.heads) - db.opts.AppendableBlocks } - return db.heads[len(db.heads)-db.opts.AppendableBlocks:] + return append(app, db.heads[i:]...) } func intervalOverlap(amin, amax, bmin, bmax int64) bool { @@ -631,13 +648,7 @@ func intervalContains(min, max, t int64) bool { func (db *DB) blocksForInterval(mint, maxt int64) []Block { var bs []Block - for _, b := range db.persisted { - m := b.Meta() - if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { - bs = append(bs, b) - } - } - for _, b := range db.heads { + for _, b := range db.blocks { m := b.Meta() if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { bs = append(bs, b) @@ -661,11 +672,8 @@ func (db *DB) cut(mint int64) (*headBlock, error) { return nil, err } + db.blocks = append(db.blocks, newHead) db.heads = append(db.heads, newHead) - db.seqBlocks[seq] = newHead - db.headGen++ - - newHead.generation = db.headGen select { case db.compactc <- struct{}{}: diff --git a/head.go b/head.go index eed2ff2222..f801bc69c8 100644 --- a/head.go +++ b/head.go @@ -36,10 +36,9 @@ var ( // headBlock handles reads and writes of time series data within a time window. type headBlock struct { - mtx sync.RWMutex - dir string - generation uint8 - wal *WAL + mtx sync.RWMutex + dir string + wal *WAL activeWriters uint64 closed bool @@ -184,6 +183,10 @@ func (h *headBlock) Appender() Appender { return &headAppender{headBlock: h, samples: getHeadAppendBuffer()} } +func (h *headBlock) Busy() bool { + return atomic.LoadUint64(&h.activeWriters) > 0 +} + var headPool = sync.Pool{} func getHeadAppendBuffer() []refdSample { @@ -265,6 +268,8 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { // sample sequence is valid. // We also have to revalidate it as we switch locks an create // the new series. + } else if ref > uint64(len(a.series)) { + return ErrNotFound } else { ms := a.series[int(ref)] if ms == nil {