diff --git a/storage/metric/memory.go b/storage/metric/memory.go index f4af209606..5cfb20ec94 100644 --- a/storage/metric/memory.go +++ b/storage/metric/memory.go @@ -288,13 +288,14 @@ func (s *memorySeriesStorage) Flush(flushOlderThan time.Time, queue chan<- clien } s.RUnlock() - s.Lock() for _, fingerprint := range emptySeries { - if s.fingerprintToSeries[fingerprint].size() == 0 { + if series, ok := s.fingerprintToSeries[fingerprint]; ok && series.size() == 0 { + s.Lock() s.dropSeries(&fingerprint) + s.Unlock() + } } - s.Unlock() } // Drop all references to a series, including any samples. diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 87c3d81765..350e81cb8e 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -93,6 +93,8 @@ type TieredStorage struct { wmCache *watermarkCache Indexer MetricIndexer + + flushSema chan bool } // viewJob encapsulates a request to extract sample values from the datastore. @@ -136,6 +138,8 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn memorySemaphore: make(chan bool, tieredMemorySemaphores), wmCache: wmCache, + + flushSema: make(chan bool, 1), } for i := 0; i < tieredMemorySemaphores; i++ { @@ -235,11 +239,18 @@ func (t *TieredStorage) Serve(started chan<- bool) { }() started <- true - for { select { case <-flushMemoryTicker.C: - t.flushMemory(t.memoryTTL) + select { + case t.flushSema <- true: + go func() { + t.flushMemory(t.memoryTTL) + <-t.flushSema + }() + default: + glog.Warning("Backlogging on flush...") + } case viewRequest := <-t.ViewQueue: viewRequest.stats.GetTimer(stats.ViewQueueTime).Stop() <-t.memorySemaphore @@ -247,6 +258,7 @@ func (t *TieredStorage) Serve(started chan<- bool) { case drainingDone := <-t.draining: t.Flush() drainingDone <- true + return } } @@ -261,7 +273,9 @@ func (t *TieredStorage) reportQueues() { } func (t *TieredStorage) Flush() { + t.flushSema <- true t.flushMemory(0) + <-t.flushSema } func (t *TieredStorage) flushMemory(ttl time.Duration) {