From 42fa3422295408fa71c97932aa5d1e46cc327219 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 1 Feb 2017 21:31:35 +0100 Subject: [PATCH] Improve multi-head handling This improves handling of multiple head blocks. Configuration is simplified to specify the number of concurrently appendable blocks. --- db.go | 126 ++++++++++++++++++++++++---------------------------------- 1 file changed, 51 insertions(+), 75 deletions(-) diff --git a/db.go b/db.go index 2f577d020e..0baa1bc5f3 100644 --- a/db.go +++ b/db.go @@ -30,7 +30,7 @@ var DefaultOptions = &Options{ WALFlushInterval: 5 * time.Second, MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds MaxBlockDuration: 48 * 60 * 60 * 1000, // 1 day in milliseconds - GracePeriod: 2 * 60 * 60 * 1000, // 2 hours in milliseconds + AppendableBlocks: 2, } // Options of the DB storage. @@ -45,9 +45,12 @@ type Options struct { // The maximum timestamp range of compacted blocks. MaxBlockDuration uint64 - // Time window between the highest timestamp and the minimum timestamp - // that can still be appended. - GracePeriod uint64 + // Number of head blocks that can be appended to. + // Should be two or higher to prevent write errors in general scenarios. + // + // After a new block is started for timestamp t0 or higher, appends with + // timestamps as early as t0 - (n-1) * MinBlockDuration are valid. + AppendableBlocks int } // Appender allows appending a batch of data. It must be completed with a @@ -388,7 +391,6 @@ type dbAppender struct { func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { h, err := a.appenderFor(t) if err != nil { - fmt.Println("no appender") return 0, err } ref, err := h.Add(lset, t, v) @@ -401,7 +403,6 @@ func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) func (a *dbAppender) hashedAdd(hash uint64, lset labels.Labels, t int64, v float64) (uint64, error) { h, err := a.appenderFor(t) if err != nil { - fmt.Println("no appender") return 0, err } ref, err := h.hashedAdd(hash, lset, t, v) @@ -420,33 +421,33 @@ func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error { if err != nil { return err } - // fmt.Println("check gen", h.generation, gen) + // If the reference pointed into a previous block, we cannot + // use it to append the sample. if h.generation != gen { return ErrNotFound } - return h.AddFast(ref, t, v) } // appenderFor gets the appender for the head containing timestamp t. // If the head block doesn't exist yet, it gets created. func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { - if len(a.heads) == 0 { - if err := a.addNextHead(t); err != nil { + // If there's no fitting head block for t, ensure it gets created. + if len(a.heads) == 0 || t > a.heads[len(a.heads)-1].meta.MaxTime { + a.db.mtx.RUnlock() + if err := a.db.ensureHead(t); err != nil { + a.db.mtx.RLock() return nil, err } - return a.appenderFor(t) + a.db.mtx.RLock() + + a.heads = nil + for _, b := range a.db.appendable() { + a.heads = append(a.heads, b.Appender().(*headAppender)) + } } for i := len(a.heads) - 1; i >= 0; i-- { - h := a.heads[i] - - if t > h.meta.MaxTime { - if err := a.addNextHead(t); err != nil { - return nil, err - } - return a.appenderFor(t) - } - if t >= h.meta.MinTime { + if h := a.heads[i]; t >= h.meta.MinTime { return h, nil } } @@ -454,33 +455,30 @@ func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { return nil, ErrNotFound } -func (a *dbAppender) addNextHead(t int64) error { - a.db.mtx.RUnlock() - a.db.mtx.Lock() +func (db *DB) ensureHead(t int64) error { + db.mtx.Lock() + defer db.mtx.Unlock() - // We switched locks, validate that adding a head for the timestamp - // is still required. - if len(a.db.heads) > 1 { - h := a.db.heads[len(a.db.heads)-1] - if t <= h.meta.MaxTime { - a.heads = append(a.heads, h.Appender().(*headAppender)) - a.maxGen++ - a.db.mtx.Unlock() - a.db.mtx.RLock() - return nil + // Initial case for a new database: we must create the first + // AppendableBlocks-1 front padding heads. + if len(db.heads) == 0 { + for i := int64(db.opts.AppendableBlocks - 1); i >= 0; i-- { + if _, err := db.cut(t - i*int64(db.opts.MinBlockDuration)); err != nil { + return err + } } } - h, err := a.db.cut(t) - if err == nil { - a.heads = append(a.heads, h.Appender().(*headAppender)) - a.maxGen++ + for { + h := db.heads[len(db.heads)-1] + // If t doesn't exceed the range of heads blocks, there's nothing to do. + if t <= h.meta.MaxTime { + return nil + } + if _, err := db.cut(h.meta.MaxTime + 1); err != nil { + return err + } } - - a.db.mtx.Unlock() - a.db.mtx.RLock() - - return err } func (a *dbAppender) Commit() error { @@ -506,23 +504,10 @@ func (a *dbAppender) Rollback() error { } func (db *DB) appendable() []*headBlock { - if len(db.heads) == 0 { - return nil + if len(db.heads) <= db.opts.AppendableBlocks { + return db.heads } - var blocks []*headBlock - maxHead := db.heads[len(db.heads)-1] - - k := len(db.heads) - 2 - for i := k; i >= 0; i-- { - if db.heads[i].meta.MaxTime < maxHead.meta.MinTime-int64(db.opts.GracePeriod) { - break - } - k-- - } - for i := k + 1; i < len(db.heads); i++ { - blocks = append(blocks, db.heads[i]) - } - return blocks + return db.heads[len(db.heads)-db.opts.AppendableBlocks:] } func (db *DB) compactable() []Block { @@ -534,22 +519,13 @@ func (db *DB) compactable() []Block { blocks = append(blocks, pb) } - maxHead := db.heads[len(db.heads)-1] - - k := len(db.heads) - 2 - for i := k; i >= 0; i-- { - if db.heads[i].meta.MaxTime < maxHead.meta.MinTime-int64(db.opts.GracePeriod) { - break - } - k-- - } - for i, hb := range db.heads[:len(db.heads)-1] { - if i > k { - break - } - blocks = append(blocks, hb) + if len(db.heads) <= db.opts.AppendableBlocks { + return blocks } + for _, h := range db.heads[:len(db.heads)-db.opts.AppendableBlocks] { + blocks = append(blocks, h) + } return blocks } @@ -592,7 +568,6 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block { // will still be appendable for the configured grace period. func (db *DB) cut(mint int64) (*headBlock, error) { maxt := mint + int64(db.opts.MinBlockDuration) - 1 - fmt.Println("cut", mint, maxt) dir, seq, err := nextBlockDir(db.dir) if err != nil { @@ -608,8 +583,6 @@ func (db *DB) cut(mint int64) (*headBlock, error) { newHead.generation = db.headGen - fmt.Println("headlen", len(db.heads)) - select { case db.compactc <- struct{}{}: default: @@ -691,6 +664,9 @@ func OpenPartitioned(dir string, n int, l log.Logger, opts *Options) (*Partition l = log.NewLogfmtLogger(os.Stdout) l = log.NewContext(l).With("ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) } + if opts.AppendableBlocks < 1 { + return nil, errors.Errorf("AppendableBlocks must be greater than 0") + } if err := os.MkdirAll(dir, 0777); err != nil { return nil, err