diff --git a/head.go b/head.go index c522907785..0a67400e0c 100644 --- a/head.go +++ b/head.go @@ -237,22 +237,28 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int // Samples before the mint timestamp are discarded. func (h *Head) processWALSamples( minValidTime int64, - partition, total uint64, input <-chan []RefSample, output chan<- []RefSample, ) (unknownRefs uint64) { defer close(output) + // Mitigate lock contention in getByID. + refSeries := map[uint64]*memSeries{} + mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) for samples := range input { for _, s := range samples { - if s.T < minValidTime || s.Ref%total != partition { + if s.T < minValidTime { continue } - ms := h.series.getByID(s.Ref) + ms := refSeries[s.Ref] if ms == nil { - unknownRefs++ - continue + ms = h.series.getByID(s.Ref) + if ms == nil { + unknownRefs++ + continue + } + refSeries[s.Ref] = ms } _, chunkCreated := ms.append(s.T, s.V) if chunkCreated { @@ -310,25 +316,22 @@ func (h *Head) loadWAL(r *wal.Reader) error { // They are connected through a ring of channels which ensures that all sample batches // read from the WAL are processed in order. var ( - wg sync.WaitGroup - n = runtime.GOMAXPROCS(0) - firstInput = make(chan []RefSample, 300) - input = firstInput + wg sync.WaitGroup + n = runtime.GOMAXPROCS(0) + inputs = make([]chan []RefSample, n) + outputs = make([]chan []RefSample, n) ) wg.Add(n) for i := 0; i < n; i++ { - output := make(chan []RefSample, 300) + outputs[i] = make(chan []RefSample, 300) + inputs[i] = make(chan []RefSample, 300) - go func(i int, input <-chan []RefSample, output chan<- []RefSample) { - unknown := h.processWALSamples(minValidTime, uint64(i), uint64(n), input, output) + go func(input <-chan []RefSample, output chan<- []RefSample) { + unknown := h.processWALSamples(minValidTime, input, output) atomic.AddUint64(&unknownRefs, unknown) wg.Done() - }(i, input, output) - - // The output feeds the next worker goroutine. For the last worker, - // it feeds the initial input again to reuse the RefSample slices. - input = output + }(inputs[i], outputs[i]) } var ( @@ -336,6 +339,7 @@ func (h *Head) loadWAL(r *wal.Reader) error { series []RefSeries samples []RefSample tstones []Stone + err error ) for r.Next() { series, samples, tstones = series[:0], samples[:0], tstones[:0] @@ -343,7 +347,7 @@ func (h *Head) loadWAL(r *wal.Reader) error { switch dec.Type(rec) { case RecordSeries: - series, err := dec.Series(rec, series) + series, err = dec.Series(rec, series) if err != nil { return errors.Wrap(err, "decode series") } @@ -355,7 +359,8 @@ func (h *Head) loadWAL(r *wal.Reader) error { } } case RecordSamples: - samples, err := dec.Samples(rec, samples) + samples, err = dec.Samples(rec, samples) + s := samples if err != nil { return errors.Wrap(err, "decode samples") } @@ -364,20 +369,31 @@ func (h *Head) loadWAL(r *wal.Reader) error { // cause thousands of very large in flight buffers occupying large amounts // of unused memory. for len(samples) > 0 { - n := 5000 - if len(samples) < n { - n = len(samples) + m := 5000 + if len(samples) < m { + m = len(samples) } - var buf []RefSample - select { - case buf = <-input: - default: + shards := make([][]RefSample, n) + for i := 0; i < n; i++ { + var buf []RefSample + select { + case buf = <-outputs[i]: + default: + } + shards[i] = buf[:0] } - firstInput <- append(buf[:0], samples[:n]...) - samples = samples[n:] + for _, sam := range samples[:m] { + mod := sam.Ref % uint64(n) + shards[mod] = append(shards[mod], sam) + } + for i := 0; i < n; i++ { + inputs[i] <- shards[i] + } + samples = samples[m:] } + samples = s // Keep whole slice for reuse. case RecordTombstones: - tstones, err := dec.Tombstones(rec, tstones) + tstones, err = dec.Tombstones(rec, tstones) if err != nil { return errors.Wrap(err, "decode tombstones") } @@ -397,9 +413,11 @@ func (h *Head) loadWAL(r *wal.Reader) error { return errors.Wrap(r.Err(), "read records") } - // Signal termination to first worker and wait for last one to close its output channel. - close(firstInput) - for range input { + // Signal termination to each worker and wait for it to close its output channel. + for i := 0; i < n; i++ { + close(inputs[i]) + for range outputs[i] { + } } wg.Wait() @@ -1341,6 +1359,7 @@ type memSeries struct { ref uint64 lset labels.Labels chunks []*memChunk + headChunk *memChunk chunkRange int64 firstChunkID int @@ -1374,6 +1393,7 @@ func (s *memSeries) cut(mint int64) *memChunk { maxTime: math.MinInt64, } s.chunks = append(s.chunks, c) + s.headChunk = c // Set upper bound on when the next chunk must be started. An earlier timestamp // may be chosen dynamically at a later point. @@ -1442,6 +1462,11 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { } s.chunks = append(s.chunks[:0], s.chunks[k:]...) s.firstChunkID += k + if len(s.chunks) == 0 { + s.headChunk = nil + } else { + s.headChunk = s.chunks[len(s.chunks)-1] + } return k } @@ -1524,10 +1549,7 @@ func (s *memSeries) iterator(id int) chunkenc.Iterator { } func (s *memSeries) head() *memChunk { - if len(s.chunks) == 0 { - return nil - } - return s.chunks[len(s.chunks)-1] + return s.headChunk } type memChunk struct {