From 1d5f85817dcd582090fbc42e62e74d4e192b6cf6 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 8 Sep 2017 08:48:19 +0200 Subject: [PATCH] Fix various races --- head.go | 35 +++++++++++++++++++---------------- wal.go | 14 ++++++++++++-- 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/head.go b/head.go index a74552bcaf..4e1e858773 100644 --- a/head.go +++ b/head.go @@ -402,9 +402,11 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { if s == nil { return errors.Wrap(ErrNotFound, "unknown series") } + s.Lock() if err := s.appendable(t, v); err != nil { return err } + s.Unlock() if t < a.mint { return ErrOutOfBounds @@ -435,7 +437,10 @@ func (a *headAppender) Commit() error { total := len(a.samples) for _, s := range a.samples { + s.series.Lock() ok, chunkCreated := s.series.append(s.T, s.V) + s.series.Unlock() + if !ok { total-- } @@ -672,9 +677,9 @@ func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { s := h.head.series.getByID(sid) - s.mtx.RLock() + s.Lock() c := s.chunk(int(cid)) - s.mtx.RUnlock() + s.Unlock() // Do not expose chunks that are outside of the specified range. if c == nil || !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) { @@ -694,9 +699,10 @@ type safeChunk struct { } func (c *safeChunk) Iterator() chunks.Iterator { - c.s.mtx.RLock() - defer c.s.mtx.RUnlock() - return c.s.iterator(c.cid) + c.s.Lock() + it := c.s.iterator(c.cid) + c.s.Unlock() + return it } // func (c *safeChunk) Appender() (chunks.Appender, error) { panic("illegal") } @@ -803,8 +809,8 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkM } *lbls = append((*lbls)[:0], s.lset...) - s.mtx.RLock() - defer s.mtx.RUnlock() + s.Lock() + defer s.Unlock() *chks = (*chks)[:0] @@ -956,11 +962,11 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { for hash, all := range s.hashes[i] { for _, series := range all { - series.mtx.Lock() + series.Lock() rmChunks += series.truncateChunksBefore(mint) if len(series.chunks) > 0 { - series.mtx.Unlock() + series.Unlock() continue } @@ -983,7 +989,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { s.locks[j].Unlock() } - series.mtx.Unlock() + series.Unlock() } } @@ -1040,8 +1046,10 @@ type sample struct { v float64 } +// memSeries is the in-memory representation of a series. None of its methods +// are goroutine safe and its the callers responsibility to lock it. type memSeries struct { - mtx sync.RWMutex + sync.Mutex ref uint64 lset labels.Labels @@ -1143,8 +1151,6 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { const samplesPerChunk = 120 - s.mtx.Lock() - c := s.head() if c == nil { @@ -1152,7 +1158,6 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { chunkCreated = true } if c.maxTime >= t { - s.mtx.Unlock() return false, chunkCreated } if c.chunk.NumSamples() > samplesPerChunk/4 && t >= s.nextAt { @@ -1175,8 +1180,6 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { s.sampleBuf[2] = s.sampleBuf[3] s.sampleBuf[3] = sample{t: t, v: v} - s.mtx.Unlock() - return true, chunkCreated } diff --git a/wal.go b/wal.go index 68c48838cc..747510fd60 100644 --- a/wal.go +++ b/wal.go @@ -398,6 +398,10 @@ func (w *SegmentWAL) LogSeries(series []RefSeries) error { buf := w.getBuffer() flag := w.encodeSeries(buf, series) + + w.mtx.Lock() + defer w.mtx.Unlock() + err := w.write(WALEntrySeries, flag, buf.get()) w.putBuffer(buf) @@ -425,6 +429,10 @@ func (w *SegmentWAL) LogSamples(samples []RefSample) error { buf := w.getBuffer() flag := w.encodeSamples(buf, samples) + + w.mtx.Lock() + defer w.mtx.Unlock() + err := w.write(WALEntrySamples, flag, buf.get()) w.putBuffer(buf) @@ -451,6 +459,10 @@ func (w *SegmentWAL) LogDeletes(stones []Stone) error { buf := w.getBuffer() flag := w.encodeDeletes(buf, stones) + + w.mtx.Lock() + defer w.mtx.Unlock() + err := w.write(WALEntryDeletes, flag, buf.get()) w.putBuffer(buf) @@ -661,8 +673,6 @@ const ( ) func (w *SegmentWAL) write(t WALEntryType, flag uint8, buf []byte) error { - w.mtx.Lock() - defer w.mtx.Unlock() // Cut to the next segment if the entry exceeds the file size unless it would also // exceed the size of a new segment. // TODO(gouthamve): Add a test for this case where the commit is greater than segmentSize.