From 66ff7b12e9a4cfe033c8872ac027015f455be3ba Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 8 Aug 2017 17:35:34 +0200 Subject: [PATCH] Pool Chunk objects during compaction --- block.go | 5 +-- chunks.go | 30 ++++++++-------- chunks/chunk.go | 54 ++++++++++++++++++++++++++++- compact.go | 20 ++++++++--- db.go | 14 +++++--- head.go | 92 ++++++++++++++++++++++++------------------------- 6 files changed, 142 insertions(+), 73 deletions(-) diff --git a/block.go b/block.go index 06896b8946..733b1bd6be 100644 --- a/block.go +++ b/block.go @@ -22,6 +22,7 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" + "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/labels" ) @@ -181,13 +182,13 @@ type persistedBlock struct { tombstones tombstoneReader } -func newPersistedBlock(dir string) (*persistedBlock, error) { +func newPersistedBlock(dir string, pool chunks.Pool) (*persistedBlock, error) { meta, err := readMetaFile(dir) if err != nil { return nil, err } - cr, err := newChunkReader(chunkDir(dir)) + cr, err := newChunkReader(chunkDir(dir), pool) if err != nil { return nil, err } diff --git a/chunks.go b/chunks.go index 477d9588c7..6bed69700f 100644 --- a/chunks.go +++ b/chunks.go @@ -238,20 +238,22 @@ func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error { } } - b := make([]byte, binary.MaxVarintLen32) - seq := uint64(w.seq()) << 32 - + var ( + b = [binary.MaxVarintLen32]byte{} + seq = uint64(w.seq()) << 32 + ) for i := range chks { chk := &chks[i] chk.Ref = seq | uint64(w.n) - n := binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes()))) + n := binary.PutUvarint(b[:], uint64(len(chk.Chunk.Bytes()))) if err := w.write(b[:n]); err != nil { return err } - if err := w.write([]byte{byte(chk.Chunk.Encoding())}); err != nil { + b[0] = byte(chk.Chunk.Encoding()) + if err := w.write(b[:1]); err != nil { return err } if err := w.write(chk.Chunk.Bytes()); err != nil { @@ -262,7 +264,7 @@ func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error { if err := chk.writeHash(w.crc32); err != nil { return err } - if err := w.write(w.crc32.Sum(nil)); err != nil { + if err := w.write(w.crc32.Sum(b[:0])); err != nil { return err } } @@ -295,15 +297,20 @@ type chunkReader struct { // Closers for resources behind the byte slices. cs []io.Closer + + pool chunks.Pool } // newChunkReader returns a new chunkReader based on mmaped files found in dir. -func newChunkReader(dir string) (*chunkReader, error) { +func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) { files, err := sequenceFiles(dir, "") if err != nil { return nil, err } - var cr chunkReader + if pool == nil { + pool = chunks.NewPool() + } + cr := chunkReader{pool: pool} for _, fn := range files { f, err := openMmapFile(fn) @@ -350,11 +357,6 @@ func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) { return nil, fmt.Errorf("reading chunk length failed") } b = b[n:] - enc := chunks.Encoding(b[0]) - c, err := chunks.FromData(enc, b[1:1+l]) - if err != nil { - return nil, err - } - return c, nil + return s.pool.Get(chunks.Encoding(b[0]), b[1:1+l]) } diff --git a/chunks/chunk.go b/chunks/chunk.go index 6bed4455fe..21b00b3ebe 100644 --- a/chunks/chunk.go +++ b/chunks/chunk.go @@ -13,7 +13,12 @@ package chunks -import "fmt" +import ( + "fmt" + "sync" + + "github.com/pkg/errors" +) // Encoding is the identifier for a chunk encoding. type Encoding uint8 @@ -63,3 +68,50 @@ type Iterator interface { Err() error Next() bool } + +type Pool interface { + Put(Chunk) error + Get(e Encoding, b []byte) (Chunk, error) +} + +// Pool is a memory pool of chunk objects. +type pool struct { + xor sync.Pool +} + +func NewPool() Pool { + return &pool{ + xor: sync.Pool{ + New: func() interface{} { + return &XORChunk{b: &bstream{}} + }, + }, + } +} + +func (p *pool) Get(e Encoding, b []byte) (Chunk, error) { + switch e { + case EncXOR: + c := p.xor.Get().(*XORChunk) + c.b.stream = b + c.b.count = 0 + return c, nil + } + return nil, errors.Errorf("invalid encoding %q", e) +} + +func (p *pool) Put(c Chunk) error { + switch c.Encoding() { + case EncXOR: + xc, ok := c.(*XORChunk) + if !ok { + return nil + } + xc.b.stream = nil + xc.b.count = 0 + p.xor.Put(c) + default: + return errors.Errorf("invalid encoding %q", c.Encoding()) + } + return nil +} diff --git a/compact.go b/compact.go index d0d90ea1e2..5d2e4c4ce2 100644 --- a/compact.go +++ b/compact.go @@ -100,9 +100,15 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { type compactorOptions struct { blockRanges []int64 + chunkPool chunks.Pool } -func newCompactor(dir string, r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor { +func NewCompactor(dir string, r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor { + if opts == nil { + opts = &compactorOptions{ + chunkPool: chunks.NewPool(), + } + } return &compactor{ dir: dir, opts: opts, @@ -288,7 +294,7 @@ func (c *compactor) Compact(dirs ...string) (err error) { var blocks []Block for _, d := range dirs { - b, err := newPersistedBlock(d) + b, err := newPersistedBlock(d, c.opts.chunkPool) if err != nil { return err } @@ -350,7 +356,7 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) { return errors.Wrap(err, "open index writer") } - meta, err := populateBlock(blocks, indexw, chunkw) + meta, err := c.populateBlock(blocks, indexw, chunkw) if err != nil { return errors.Wrap(err, "write compaction") } @@ -397,7 +403,7 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) { // populateBlock fills the index and chunk writers with new data gathered as the union // of the provided blocks. It returns meta information for the new block. -func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) { +func (c *compactor) populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) { var ( set compactionSet metas []BlockMeta @@ -474,7 +480,6 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo } } } - if err := chunkw.WriteChunks(chks...); err != nil { return nil, err } @@ -489,6 +494,10 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples()) } + for _, chk := range chks { + c.opts.chunkPool.Put(chk.Chunk) + } + for _, l := range lset { valset, ok := values[l.Name] if !ok { @@ -685,6 +694,7 @@ func (c *compactionMerger) Next() bool { c.aok = c.a.Next() c.bok = c.b.Next() } + return true } diff --git a/db.go b/db.go index 928e8e9e97..fe48b3ce14 100644 --- a/db.go +++ b/db.go @@ -37,6 +37,7 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/labels" ) @@ -95,9 +96,10 @@ type DB struct { dir string lockf *lockfile.Lockfile - logger log.Logger - metrics *dbMetrics - opts *Options + logger log.Logger + metrics *dbMetrics + opts *Options + chunkPool chunks.Pool // Mutex for that must be held when modifying the general block layout. mtx sync.RWMutex @@ -203,6 +205,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db donec: make(chan struct{}), stopc: make(chan struct{}), compactionsEnabled: true, + chunkPool: chunks.NewPool(), } db.metrics = newDBMetrics(db, r) @@ -223,6 +226,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db copts := &compactorOptions{ blockRanges: opts.BlockRanges, + chunkPool: db.chunkPool, } if len(copts.blockRanges) == 0 { @@ -238,7 +242,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db copts.blockRanges = copts.blockRanges[:len(copts.blockRanges)-1] } - db.compactor = newCompactor(dir, r, l, copts) + db.compactor = NewCompactor(dir, r, l, copts) if err := db.reloadBlocks(); err != nil { return nil, err @@ -508,7 +512,7 @@ func (db *DB) reloadBlocks() (err error) { if meta.Compaction.Generation == 0 { b, err = db.openHeadBlock(dir) } else { - b, err = newPersistedBlock(dir) + b, err = newPersistedBlock(dir, db.chunkPool) } if err != nil { return errors.Wrapf(err, "open block %s", dir) diff --git a/head.go b/head.go index 4f5ed57557..fae0937ece 100644 --- a/head.go +++ b/head.go @@ -270,62 +270,62 @@ Outer: // This has been ensured by acquiring a Lock on DB.mtx, but this limitation should // be removed in the future. func (h *HeadBlock) Snapshot(snapshotDir string) error { - if h.meta.Stats.NumSeries == 0 { - return nil - } + // if h.meta.Stats.NumSeries == 0 { + // return nil + // } - entropy := rand.New(rand.NewSource(time.Now().UnixNano())) - uid := ulid.MustNew(ulid.Now(), entropy) + // entropy := rand.New(rand.NewSource(time.Now().UnixNano())) + // uid := ulid.MustNew(ulid.Now(), entropy) - dir := filepath.Join(snapshotDir, uid.String()) - tmp := dir + ".tmp" + // dir := filepath.Join(snapshotDir, uid.String()) + // tmp := dir + ".tmp" - if err := os.RemoveAll(tmp); err != nil { - return err - } + // if err := os.RemoveAll(tmp); err != nil { + // return err + // } - if err := os.MkdirAll(tmp, 0777); err != nil { - return err - } + // if err := os.MkdirAll(tmp, 0777); err != nil { + // return err + // } - // Populate chunk and index files into temporary directory with - // data of all blocks. - chunkw, err := newChunkWriter(chunkDir(tmp)) - if err != nil { - return errors.Wrap(err, "open chunk writer") - } - indexw, err := newIndexWriter(tmp) - if err != nil { - return errors.Wrap(err, "open index writer") - } + // // Populate chunk and index files into temporary directory with + // // data of all blocks. + // chunkw, err := newChunkWriter(chunkDir(tmp)) + // if err != nil { + // return errors.Wrap(err, "open chunk writer") + // } + // indexw, err := newIndexWriter(tmp) + // if err != nil { + // return errors.Wrap(err, "open index writer") + // } - meta, err := populateBlock([]Block{h}, indexw, chunkw) - if err != nil { - return errors.Wrap(err, "write snapshot") - } - meta.ULID = uid - meta.MaxTime = h.highTimestamp + // meta, err := h.compactor.populateBlock([]Block{h}, indexw, chunkw, nil) + // if err != nil { + // return errors.Wrap(err, "write snapshot") + // } + // meta.ULID = uid + // meta.MaxTime = h.highTimestamp - if err = writeMetaFile(tmp, meta); err != nil { - return errors.Wrap(err, "write merged meta") - } + // if err = writeMetaFile(tmp, meta); err != nil { + // return errors.Wrap(err, "write merged meta") + // } - if err = chunkw.Close(); err != nil { - return errors.Wrap(err, "close chunk writer") - } - if err = indexw.Close(); err != nil { - return errors.Wrap(err, "close index writer") - } + // if err = chunkw.Close(); err != nil { + // return errors.Wrap(err, "close chunk writer") + // } + // if err = indexw.Close(); err != nil { + // return errors.Wrap(err, "close index writer") + // } - // Create an empty tombstones file. - if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil { - return errors.Wrap(err, "write new tombstones file") - } + // // Create an empty tombstones file. + // if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil { + // return errors.Wrap(err, "write new tombstones file") + // } - // Block successfully written, make visible - if err := renameFile(tmp, dir); err != nil { - return errors.Wrap(err, "rename block dir") - } + // // Block successfully written, make visible + // if err := renameFile(tmp, dir); err != nil { + // return errors.Wrap(err, "rename block dir") + // } return nil }