From a648ef52523849c3371c3b70b9ba552fb898d677 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 2 Jan 2017 16:58:47 +0100 Subject: [PATCH] Convert persister into function --- compact.go | 118 +++++++++++++++++++++++------------------------------ db.go | 12 +----- head.go | 12 +++--- 3 files changed, 59 insertions(+), 83 deletions(-) diff --git a/compact.go b/compact.go index fce8108ce0..36a3b9ad08 100644 --- a/compact.go +++ b/compact.go @@ -44,21 +44,18 @@ func (c *compactor) run() { if len(c.shard.persisted) < 2 { continue } - dir := fmt.Sprintf("compacted-%d", timestamp.FromTime(time.Now())) + var ( + dir = fmt.Sprintf("compacted-%d", timestamp.FromTime(time.Now())) + a = c.shard.persisted[0] + b = c.shard.persisted[1] + ) - p, err := newPersister(dir) - if err != nil { - c.logger.Log("msg", "creating persister failed", "err", err) - continue - } - - if err := c.compact(p, c.shard.persisted[0], c.shard.persisted[1]); err != nil { + if err := persist(dir, func(indexw IndexWriter, chunkw SeriesWriter) error { + return c.compact(indexw, chunkw, a, b) + }); err != nil { c.logger.Log("msg", "compaction failed", "err", err) continue } - if err := p.Close(); err != nil { - c.logger.Log("msg", "compaction failed", "err", err) - } } close(c.donec) } @@ -69,7 +66,7 @@ func (c *compactor) Close() error { return nil } -func (c *compactor) compact(p *persister, a, b block) error { +func (c *compactor) compact(indexw IndexWriter, chunkw SeriesWriter, a, b block) error { aall, err := a.index().Postings("", "") if err != nil { return err @@ -110,7 +107,7 @@ func (c *compactor) compact(p *persister, a, b block) error { for set.Next() { lset, chunks := set.At() - if err := p.chunkw.WriteSeries(i, lset, chunks); err != nil { + if err := chunkw.WriteSeries(i, lset, chunks); err != nil { return err } @@ -133,7 +130,7 @@ func (c *compactor) compact(p *persister, a, b block) error { return set.Err() } - if err := p.indexw.WriteStats(stats); err != nil { + if err := indexw.WriteStats(stats); err != nil { return err } @@ -144,13 +141,13 @@ func (c *compactor) compact(p *persister, a, b block) error { for x := range v { s = append(s, x) } - if err := p.indexw.WriteLabelIndex([]string{n}, s); err != nil { + if err := indexw.WriteLabelIndex([]string{n}, s); err != nil { return err } } for t := range postings.m { - if err := p.indexw.WritePostings(t.name, t.value, postings.get(t)); err != nil { + if err := indexw.WritePostings(t.name, t.value, postings.get(t)); err != nil { return err } } @@ -159,7 +156,7 @@ func (c *compactor) compact(p *persister, a, b block) error { for i := range all { all[i] = uint32(i) } - if err := p.indexw.WritePostings("", "", newListPostings(all)); err != nil { + if err := indexw.WritePostings("", "", newListPostings(all)); err != nil { return err } @@ -291,68 +288,55 @@ func (c *compactionMerger) At() (labels.Labels, []ChunkMeta) { return c.l, c.c } -type persister struct { - dir, tmpdir string - - chunkf, indexf *fileutil.LockedFile - - chunkw SeriesWriter - indexw IndexWriter -} - -func newPersister(dir string) (*persister, error) { - p := &persister{ - dir: dir, - tmpdir: dir + ".tmp", - } - var err error +func persist(dir string, write func(IndexWriter, SeriesWriter) error) error { + tmpdir := dir + ".tmp" // Write to temporary directory to make persistence appear atomic. - if fileutil.Exist(p.tmpdir) { - if err := os.RemoveAll(p.tmpdir); err != nil { - return nil, err + if fileutil.Exist(tmpdir) { + if err := os.RemoveAll(tmpdir); err != nil { + return err } } - if err := fileutil.CreateDirAll(p.tmpdir); err != nil { - return nil, err + if err := fileutil.CreateDirAll(tmpdir); err != nil { + return err } - p.chunkf, err = fileutil.LockFile(chunksFileName(p.tmpdir), os.O_WRONLY|os.O_CREATE, 0666) + chunkf, err := fileutil.LockFile(chunksFileName(tmpdir), os.O_WRONLY|os.O_CREATE, 0666) if err != nil { - return nil, err + return err } - p.indexf, err = fileutil.LockFile(indexFileName(p.tmpdir), os.O_WRONLY|os.O_CREATE, 0666) + indexf, err := fileutil.LockFile(indexFileName(tmpdir), os.O_WRONLY|os.O_CREATE, 0666) if err != nil { - return nil, err - } - - p.indexw = newIndexWriter(p.indexf) - p.chunkw = newSeriesWriter(p.chunkf, p.indexw) - - return p, nil -} - -func (p *persister) Close() error { - if err := p.chunkw.Close(); err != nil { - return err - } - if err := p.indexw.Close(); err != nil { - return err - } - if err := fileutil.Fsync(p.chunkf.File); err != nil { - return err - } - if err := fileutil.Fsync(p.indexf.File); err != nil { - return err - } - if err := p.chunkf.Close(); err != nil { - return err - } - if err := p.indexf.Close(); err != nil { return err } - return renameDir(p.tmpdir, p.dir) + indexw := newIndexWriter(indexf) + chunkw := newSeriesWriter(chunkf, indexw) + + if err := write(indexw, chunkw); err != nil { + return err + } + + if err := chunkw.Close(); err != nil { + return err + } + if err := indexw.Close(); err != nil { + return err + } + if err := fileutil.Fsync(chunkf.File); err != nil { + return err + } + if err := fileutil.Fsync(indexf.File); err != nil { + return err + } + if err := chunkf.Close(); err != nil { + return err + } + if err := indexf.Close(); err != nil { + return err + } + + return renameDir(tmpdir, dir) } func renameDir(from, to string) error { diff --git a/db.go b/db.go index 7c6ac60fb7..917d341181 100644 --- a/db.go +++ b/db.go @@ -385,18 +385,10 @@ func (s *Shard) persist() error { // before actually persisting it. dir := filepath.Join(s.path, fmt.Sprintf("%d", head.stats.MinTime)) - p, err := newPersister(dir) - if err != nil { + if err := persist(dir, head.persist); err != nil { return err } - if err := head.persist(p); err != nil { - return err - } - if err := p.Close(); err != nil { - return err - } - sz := fmt.Sprintf("%.2fMB", float64(p.chunkw.Size()+p.indexw.Size())/1e6) - s.logger.Log("size", sz, "samples", head.stats.SampleCount, "chunks", head.stats.ChunkCount, "msg", "persisted head") + s.logger.Log("samples", head.stats.SampleCount, "chunks", head.stats.ChunkCount, "msg", "persisted head") // Reopen block as persisted block for querying. pb, err := newPersistedBlock(dir) diff --git a/head.go b/head.go index 78a968dbbc..49acccdce6 100644 --- a/head.go +++ b/head.go @@ -260,13 +260,13 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { return nil } -func (h *HeadBlock) persist(p *persister) error { +func (h *HeadBlock) persist(indexw IndexWriter, chunkw SeriesWriter) error { if err := h.wal.Close(); err != nil { return err } for ref, cd := range h.descs { - if err := p.chunkw.WriteSeries(uint32(ref), cd.lset, []ChunkMeta{ + if err := chunkw.WriteSeries(uint32(ref), cd.lset, []ChunkMeta{ { MinTime: cd.firsTimestamp, MaxTime: cd.lastTimestamp, @@ -277,7 +277,7 @@ func (h *HeadBlock) persist(p *persister) error { } } - if err := p.indexw.WriteStats(h.stats); err != nil { + if err := indexw.WriteStats(h.stats); err != nil { return err } for n, v := range h.values { @@ -286,13 +286,13 @@ func (h *HeadBlock) persist(p *persister) error { s = append(s, x) } - if err := p.indexw.WriteLabelIndex([]string{n}, s); err != nil { + if err := indexw.WriteLabelIndex([]string{n}, s); err != nil { return err } } for t := range h.postings.m { - if err := p.indexw.WritePostings(t.name, t.value, h.postings.get(t)); err != nil { + if err := indexw.WritePostings(t.name, t.value, h.postings.get(t)); err != nil { return err } } @@ -301,7 +301,7 @@ func (h *HeadBlock) persist(p *persister) error { for i := range all { all[i] = uint32(i) } - if err := p.indexw.WritePostings("", "", newListPostings(all)); err != nil { + if err := indexw.WritePostings("", "", newListPostings(all)); err != nil { return err } return nil