From f0e79ec264b69dd286840af349ffd8546b03e444 Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Wed, 31 Oct 2018 14:12:39 +0000 Subject: [PATCH 1/4] Actually reuse samples in loadWAL across records. This cuts walltime by 2.5X and CPU by 2X Signed-off-by: Brian Brazil --- head.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/head.go b/head.go index 92d8a128fa..e8794b66d8 100644 --- a/head.go +++ b/head.go @@ -336,6 +336,7 @@ func (h *Head) loadWAL(r *wal.Reader) error { series []RefSeries samples []RefSample tstones []Stone + err error ) for r.Next() { series, samples, tstones = series[:0], samples[:0], tstones[:0] @@ -343,7 +344,7 @@ func (h *Head) loadWAL(r *wal.Reader) error { switch dec.Type(rec) { case RecordSeries: - series, err := dec.Series(rec, series) + series, err = dec.Series(rec, series) if err != nil { return errors.Wrap(err, "decode series") } @@ -355,7 +356,8 @@ func (h *Head) loadWAL(r *wal.Reader) error { } } case RecordSamples: - samples, err := dec.Samples(rec, samples) + samples, err = dec.Samples(rec, samples) + s := samples if err != nil { return errors.Wrap(err, "decode samples") } @@ -376,8 +378,9 @@ func (h *Head) loadWAL(r *wal.Reader) error { firstInput <- append(buf[:0], samples[:n]...) samples = samples[n:] } + samples = s // Keep whole slice for reuse. case RecordTombstones: - tstones, err := dec.Tombstones(rec, tstones) + tstones, err = dec.Tombstones(rec, tstones) if err != nil { return errors.Wrap(err, "decode tombstones") } From d8c8e4e6e4f690c0bd7c165f7b9718fce58c165a Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Wed, 31 Oct 2018 12:51:21 +0000 Subject: [PATCH 2/4] Keep local cache of ids. With the various goroutines running, the locking in getByID is notable. This cuts cpu usage by ~25% and walltime by ~20%. Signed-off-by: Brian Brazil --- head.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/head.go b/head.go index e8794b66d8..894febb219 100644 --- a/head.go +++ b/head.go @@ -242,6 +242,9 @@ func (h *Head) processWALSamples( ) (unknownRefs uint64) { defer close(output) + // Mitigate lock contention in getByID. + refSeries := map[uint64]*memSeries{} + mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) for samples := range input { @@ -249,10 +252,14 @@ func (h *Head) processWALSamples( if s.T < minValidTime || s.Ref%total != partition { continue } - ms := h.series.getByID(s.Ref) + ms := refSeries[s.Ref] if ms == nil { - unknownRefs++ - continue + ms = h.series.getByID(s.Ref) + if ms == nil { + unknownRefs++ + continue + } + refSeries[s.Ref] = ms } _, chunkCreated := ms.append(s.T, s.V) if chunkCreated { From a64b0d51c4da614efb493627a7b5425bc65c6769 Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Wed, 31 Oct 2018 13:28:56 +0000 Subject: [PATCH 3/4] Precalculate memSeries.head This is read far more than it changes. This cuts ~14% off walltme and ~27% off CPU for WAL reading. Signed-off-by: Brian Brazil --- head.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/head.go b/head.go index 894febb219..690489b83c 100644 --- a/head.go +++ b/head.go @@ -1338,6 +1338,7 @@ type memSeries struct { ref uint64 lset labels.Labels chunks []*memChunk + headChunk *memChunk chunkRange int64 firstChunkID int @@ -1371,6 +1372,7 @@ func (s *memSeries) cut(mint int64) *memChunk { maxTime: math.MinInt64, } s.chunks = append(s.chunks, c) + s.headChunk = c // Set upper bound on when the next chunk must be started. An earlier timestamp // may be chosen dynamically at a later point. @@ -1439,6 +1441,11 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { } s.chunks = append(s.chunks[:0], s.chunks[k:]...) s.firstChunkID += k + if len(s.chunks) == 0 { + s.headChunk = nil + } else { + s.headChunk = s.chunks[len(s.chunks)-1] + } return k } @@ -1521,10 +1528,7 @@ func (s *memSeries) iterator(id int) chunkenc.Iterator { } func (s *memSeries) head() *memChunk { - if len(s.chunks) == 0 { - return nil - } - return s.chunks[len(s.chunks)-1] + return s.headChunk } type memChunk struct { From c7e7fd355e524e4212851000f1673b853fb0f3c2 Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Wed, 31 Oct 2018 22:52:26 +0000 Subject: [PATCH 4/4] Only send WAL read workers the samples they need. Calculating the modulus in each worker was a hotspot, and meant that you had more work to do the more cores you had. This cuts CPU usage (on my 8 core, 4 real core machine) by 33%, and walltime by 3% Signed-off-by: Brian Brazil --- head.go | 60 ++++++++++++++++++++++++++++++++------------------------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/head.go b/head.go index 690489b83c..b5cc00ffb1 100644 --- a/head.go +++ b/head.go @@ -237,7 +237,6 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int // Samples before the mint timestamp are discarded. func (h *Head) processWALSamples( minValidTime int64, - partition, total uint64, input <-chan []RefSample, output chan<- []RefSample, ) (unknownRefs uint64) { defer close(output) @@ -249,7 +248,7 @@ func (h *Head) processWALSamples( for samples := range input { for _, s := range samples { - if s.T < minValidTime || s.Ref%total != partition { + if s.T < minValidTime { continue } ms := refSeries[s.Ref] @@ -317,25 +316,22 @@ func (h *Head) loadWAL(r *wal.Reader) error { // They are connected through a ring of channels which ensures that all sample batches // read from the WAL are processed in order. var ( - wg sync.WaitGroup - n = runtime.GOMAXPROCS(0) - firstInput = make(chan []RefSample, 300) - input = firstInput + wg sync.WaitGroup + n = runtime.GOMAXPROCS(0) + inputs = make([]chan []RefSample, n) + outputs = make([]chan []RefSample, n) ) wg.Add(n) for i := 0; i < n; i++ { - output := make(chan []RefSample, 300) + outputs[i] = make(chan []RefSample, 300) + inputs[i] = make(chan []RefSample, 300) - go func(i int, input <-chan []RefSample, output chan<- []RefSample) { - unknown := h.processWALSamples(minValidTime, uint64(i), uint64(n), input, output) + go func(input <-chan []RefSample, output chan<- []RefSample) { + unknown := h.processWALSamples(minValidTime, input, output) atomic.AddUint64(&unknownRefs, unknown) wg.Done() - }(i, input, output) - - // The output feeds the next worker goroutine. For the last worker, - // it feeds the initial input again to reuse the RefSample slices. - input = output + }(inputs[i], outputs[i]) } var ( @@ -373,17 +369,27 @@ func (h *Head) loadWAL(r *wal.Reader) error { // cause thousands of very large in flight buffers occupying large amounts // of unused memory. for len(samples) > 0 { - n := 5000 - if len(samples) < n { - n = len(samples) + m := 5000 + if len(samples) < m { + m = len(samples) } - var buf []RefSample - select { - case buf = <-input: - default: + shards := make([][]RefSample, n) + for i := 0; i < n; i++ { + var buf []RefSample + select { + case buf = <-outputs[i]: + default: + } + shards[i] = buf[:0] } - firstInput <- append(buf[:0], samples[:n]...) - samples = samples[n:] + for _, sam := range samples[:m] { + mod := sam.Ref % uint64(n) + shards[mod] = append(shards[mod], sam) + } + for i := 0; i < n; i++ { + inputs[i] <- shards[i] + } + samples = samples[m:] } samples = s // Keep whole slice for reuse. case RecordTombstones: @@ -407,9 +413,11 @@ func (h *Head) loadWAL(r *wal.Reader) error { return errors.Wrap(r.Err(), "read records") } - // Signal termination to first worker and wait for last one to close its output channel. - close(firstInput) - for range input { + // Signal termination to each worker and wait for it to close its output channel. + for i := 0; i < n; i++ { + close(inputs[i]) + for range outputs[i] { + } } wg.Wait()