diff --git a/tsdb/head.go b/tsdb/head.go index d3b2b09cce..419340506d 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1666,26 +1666,34 @@ func (h *Head) mmapHeadChunks() { var count int for i := 0; i < h.series.size; i++ { h.series.locks[i].RLock() - for _, all := range h.series.hashes[i] { - for _, series := range all { - series.Lock() - count += series.mmapChunks(h.chunkDiskMapper) - series.Unlock() - } + for _, series := range h.series.series[i] { + series.Lock() + count += series.mmapChunks(h.chunkDiskMapper) + series.Unlock() } h.series.locks[i].RUnlock() } h.metrics.mmapChunksTotal.Add(float64(count)) } -// seriesHashmap is a simple hashmap for memSeries by their label set. It is built -// on top of a regular hashmap and holds a slice of series to resolve hash collisions. +// seriesHashmap lets TSDB find a memSeries by its label set, via a 64-bit hash. +// There is one map for the common case where the hash value is unique, and a +// second map for the case that two series have the same hash value. +// Each series is in only one of the maps. // Its methods require the hash to be submitted with it to avoid re-computations throughout // the code. -type seriesHashmap map[uint64][]*memSeries +type seriesHashmap struct { + unique map[uint64]*memSeries + conflicts map[uint64][]*memSeries +} -func (m seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries { - for _, s := range m[hash] { +func (m *seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries { + if s, found := m.unique[hash]; found { + if labels.Equal(s.lset, lset) { + return s + } + } + for _, s := range m.conflicts[hash] { if labels.Equal(s.lset, lset) { return s } @@ -1694,27 +1702,49 @@ func (m seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries { } func (m seriesHashmap) set(hash uint64, s *memSeries) { - l := m[hash] + if existing, found := m.unique[hash]; !found || labels.Equal(existing.lset, s.lset) { + m.unique[hash] = s + return + } + if m.conflicts == nil { + m.conflicts = make(map[uint64][]*memSeries) + } + l := m.conflicts[hash] for i, prev := range l { if labels.Equal(prev.lset, s.lset) { l[i] = s return } } - m[hash] = append(l, s) + m.conflicts[hash] = append(l, s) } func (m seriesHashmap) del(hash uint64, lset labels.Labels) { var rem []*memSeries - for _, s := range m[hash] { - if !labels.Equal(s.lset, lset) { - rem = append(rem, s) + unique, found := m.unique[hash] + switch { + case !found: + return + case labels.Equal(unique.lset, lset): + conflicts := m.conflicts[hash] + if len(conflicts) == 0 { + delete(m.unique, hash) + return + } + rem = conflicts + default: + rem = append(rem, unique) + for _, s := range m.conflicts[hash] { + if !labels.Equal(s.lset, lset) { + rem = append(rem, s) + } } } - if len(rem) == 0 { - delete(m, hash) + m.unique[hash] = rem[0] + if len(rem) == 1 { + delete(m.conflicts, hash) } else { - m[hash] = rem + m.conflicts[hash] = rem[1:] } } @@ -1756,7 +1786,10 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st s.series[i] = map[chunks.HeadSeriesRef]*memSeries{} } for i := range s.hashes { - s.hashes[i] = seriesHashmap{} + s.hashes[i] = seriesHashmap{ + unique: map[uint64]*memSeries{}, + conflicts: nil, // Initialized on demand in set(). + } } return s } @@ -1776,70 +1809,72 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) ( deletedFromPrevStripe = 0 ) minMmapFile = math.MaxInt32 - // Run through all series and truncate old chunks. Mark those with no - // chunks left as deleted and store their ID. + + // For one series, truncate old chunks and check if any chunks left. If not, mark as deleted and collect the ID. + check := func(hashShard int, hash uint64, series *memSeries, deletedForCallback map[chunks.HeadSeriesRef]labels.Labels) { + series.Lock() + defer series.Unlock() + + rmChunks += series.truncateChunksBefore(mint, minOOOMmapRef) + + if len(series.mmappedChunks) > 0 { + seq, _ := series.mmappedChunks[0].ref.Unpack() + if seq < minMmapFile { + minMmapFile = seq + } + } + if series.ooo != nil && len(series.ooo.oooMmappedChunks) > 0 { + seq, _ := series.ooo.oooMmappedChunks[0].ref.Unpack() + if seq < minMmapFile { + minMmapFile = seq + } + for _, ch := range series.ooo.oooMmappedChunks { + if ch.minTime < minOOOTime { + minOOOTime = ch.minTime + } + } + } + if series.ooo != nil && series.ooo.oooHeadChunk != nil { + if series.ooo.oooHeadChunk.minTime < minOOOTime { + minOOOTime = series.ooo.oooHeadChunk.minTime + } + } + if len(series.mmappedChunks) > 0 || series.headChunks != nil || series.pendingCommit || + (series.ooo != nil && (len(series.ooo.oooMmappedChunks) > 0 || series.ooo.oooHeadChunk != nil)) { + seriesMint := series.minTime() + if seriesMint < actualMint { + actualMint = seriesMint + } + return + } + // The series is gone entirely. We need to keep the series lock + // and make sure we have acquired the stripe locks for hash and ID of the + // series alike. + // If we don't hold them all, there's a very small chance that a series receives + // samples again while we are half-way into deleting it. + refShard := int(series.ref) & (s.size - 1) + if hashShard != refShard { + s.locks[refShard].Lock() + defer s.locks[refShard].Unlock() + } + + deleted[storage.SeriesRef(series.ref)] = struct{}{} + s.hashes[hashShard].del(hash, series.lset) + delete(s.series[refShard], series.ref) + deletedForCallback[series.ref] = series.lset + } + + // Run through all series shard by shard, checking which should be deleted. for i := 0; i < s.size; i++ { deletedForCallback := make(map[chunks.HeadSeriesRef]labels.Labels, deletedFromPrevStripe) s.locks[i].Lock() - for hash, all := range s.hashes[i] { + for hash, series := range s.hashes[i].unique { + check(i, hash, series, deletedForCallback) + } + for hash, all := range s.hashes[i].conflicts { for _, series := range all { - series.Lock() - rmChunks += series.truncateChunksBefore(mint, minOOOMmapRef) - - if len(series.mmappedChunks) > 0 { - seq, _ := series.mmappedChunks[0].ref.Unpack() - if seq < minMmapFile { - minMmapFile = seq - } - } - if series.ooo != nil && len(series.ooo.oooMmappedChunks) > 0 { - seq, _ := series.ooo.oooMmappedChunks[0].ref.Unpack() - if seq < minMmapFile { - minMmapFile = seq - } - for _, ch := range series.ooo.oooMmappedChunks { - if ch.minTime < minOOOTime { - minOOOTime = ch.minTime - } - } - } - if series.ooo != nil && series.ooo.oooHeadChunk != nil { - if series.ooo.oooHeadChunk.minTime < minOOOTime { - minOOOTime = series.ooo.oooHeadChunk.minTime - } - } - if len(series.mmappedChunks) > 0 || series.headChunks != nil || series.pendingCommit || - (series.ooo != nil && (len(series.ooo.oooMmappedChunks) > 0 || series.ooo.oooHeadChunk != nil)) { - seriesMint := series.minTime() - if seriesMint < actualMint { - actualMint = seriesMint - } - series.Unlock() - continue - } - - // The series is gone entirely. We need to keep the series lock - // and make sure we have acquired the stripe locks for hash and ID of the - // series alike. - // If we don't hold them all, there's a very small chance that a series receives - // samples again while we are half-way into deleting it. - j := int(series.ref) & (s.size - 1) - - if i != j { - s.locks[j].Lock() - } - - deleted[storage.SeriesRef(series.ref)] = struct{}{} - s.hashes[i].del(hash, series.lset) - delete(s.series[j], series.ref) - deletedForCallback[series.ref] = series.lset - - if i != j { - s.locks[j].Unlock() - } - - series.Unlock() + check(i, hash, series, deletedForCallback) } }