From 972d94433a8caa214bf6fef5c6179b9af46cfd1c Mon Sep 17 00:00:00 2001 From: beorn7 Date: Mon, 25 Jan 2016 18:44:43 +0100 Subject: [PATCH 1/3] Introduce a hysteresis for "rushed mode" "Rushed mode" is formerly known as "degraded mode", which is changed with this commit, too. The name "degraded" was very misleading. Also, switch into rushed mode if we have too many chunks in memory and an at least reasonable amount of chunks to persist so that speeding up persisting chunks can help. --- storage/local/storage.go | 126 +++++++++++++++++++++++++++++---------- 1 file changed, 93 insertions(+), 33 deletions(-) diff --git a/storage/local/storage.go b/storage/local/storage.go index 4e75f5f8d7..3afc9fb86d 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -17,6 +17,7 @@ package local import ( "container/list" "fmt" + "math" "sync/atomic" "time" @@ -38,11 +39,28 @@ const ( // See waitForNextFP. maxEvictInterval = time.Minute - // If numChunskToPersist is this percentage of maxChunksToPersist, we - // consider the storage in "graceful degradation mode", i.e. we do not - // checkpoint anymore based on the dirty series count, and we do not - // sync series files anymore if using the adaptive sync strategy. - percentChunksToPersistForDegradation = 80 + // Constants to control the hysteresis of entering and leaving "rushed + // mode". In rushed mode, the dirty series count is ignored for + // checkpointing, and series files are not synced if the adaptive sync + // strategy is used. + // + // If we reach 80% of -storage.local.max-chunks-to-persist, we enter + // "rushed mode". + factorChunksToPersistForEnteringRushedMode = 0.8 + // To leave "rushed mode", we must be below 70% of + // -storage.local.max-chunks-to-persist. + factorChunksToPersistForLeavingRushedMode = 0.7 + // To enter "rushed mode" for other reasons (see below), we must have at + // least 30% of -storage.local.max-chunks-to-persist. + factorMinChunksToPersistToAllowRushedMode = 0.3 + // If the number of chunks in memory reaches 110% of + // -storage.local.memory-chunks, we will enter "rushed mode" (provided + // we have enough chunks to persist at all, see + // factorMinChunksToPersistToAllowRushedMode.) + factorMemChunksForEnteringRushedMode = 1.1 + // To leave "rushed mode", we must be below 105% of + // -storage.local.memory-chunks. + factorMemChunksForLeavingRushedMode = 1.05 ) var ( @@ -110,7 +128,7 @@ type memorySeriesStorage struct { // numChunksToPersist has to be aligned for atomic operations. numChunksToPersist int64 // The number of chunks waiting for persistence. maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will stall. - degraded bool + rushed bool // Whether the storage is in rushed mode. fpLocker *fingerprintLocker fpToSeries *seriesMap @@ -238,7 +256,7 @@ func (s *memorySeriesStorage) Start() (err error) { case Always: syncStrategy = func() bool { return true } case Adaptive: - syncStrategy = func() bool { return !s.isDegraded() } + syncStrategy = func() bool { return !s.inRushedMode() } default: panic("unknown sync strategy") } @@ -898,9 +916,8 @@ loop: // would be counterproductive, as it would slow down chunk persisting even more, // while in a situation like that, where we are clearly lacking speed of disk // maintenance, the best we can do for crash recovery is to persist chunks as - // quickly as possible. So only checkpoint if the storage is not in "graceful - // degradation mode". - if dirtySeriesCount >= s.checkpointDirtySeriesLimit && !s.isDegraded() { + // quickly as possible. So only checkpoint if the storage is not in "rushed mode". + if dirtySeriesCount >= s.checkpointDirtySeriesLimit && !s.inRushedMode() { checkpointTimer.Reset(0) } } @@ -1144,37 +1161,80 @@ func (s *memorySeriesStorage) incNumChunksToPersist(by int) { atomic.AddInt64(&s.numChunksToPersist, int64(by)) } -// isDegraded returns whether the storage is in "graceful degradation mode", -// which is the case if the number of chunks waiting for persistence has reached -// a percentage of maxChunksToPersist that exceeds -// percentChunksToPersistForDegradation. The method is not goroutine safe (but -// only ever called from the goroutine dealing with series maintenance). -// Changes of degradation mode are logged. -func (s *memorySeriesStorage) isDegraded() bool { - nowDegraded := s.getNumChunksToPersist() > s.maxChunksToPersist*percentChunksToPersistForDegradation/100 - if s.degraded && !nowDegraded { - log.Warn("Storage has left graceful degradation mode. Things are back to normal.") - } else if !s.degraded && nowDegraded { - log.Warnf( - "%d chunks waiting for persistence (%d%% of the allowed maximum %d). Storage is now in graceful degradation mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v. Series maintenance happens as frequently as possible.", - s.getNumChunksToPersist(), - s.getNumChunksToPersist()*100/s.maxChunksToPersist, - s.maxChunksToPersist, - s.checkpointInterval) +// inRushedMode returns whether the storage is in "rushed mode", which is the +// case if there are too many chunks waiting for persistence or there are too +// many chunks in memory. The method is not goroutine safe (but only ever called +// from the goroutine dealing with series maintenance). Changes of degradation +// mode are logged. +func (s *memorySeriesStorage) inRushedMode() bool { + chunksToPersist := float64(s.getNumChunksToPersist()) + memChunks := float64(atomic.LoadInt64(&numMemChunks)) + + if s.rushed { + // We are already in rushed mode, so check if we can get out of + // it, using the lower hysteresis thresholds. + s.rushed = chunksToPersist > float64(s.maxChunksToPersist)*factorChunksToPersistForLeavingRushedMode || + memChunks > float64(s.maxMemoryChunks)*factorMemChunksForLeavingRushedMode + if !s.rushed { + log.Warn("Storage has left rushed mode. Things are back to normal.") + } + return s.rushed } - s.degraded = nowDegraded - return s.degraded + // We are not rushed yet, so check the higher hysteresis threshold if we enter it now. + // First WRT chunksToPersist... + s.rushed = chunksToPersist > float64(s.maxChunksToPersist)*factorChunksToPersistForEnteringRushedMode + if s.rushed { + log.Warnf( + "%.0f chunks waiting for persistence (%.1f%% of the allowed maximum %d). Storage is now in rushed mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v. Series maintenance happens as frequently as possible.", + chunksToPersist, + chunksToPersist*100/float64(s.maxChunksToPersist), + s.maxChunksToPersist, + s.checkpointInterval, + ) + return true + } + // ...then WRT memChunks. + s.rushed = memChunks > float64(s.maxMemoryChunks)*factorMemChunksForEnteringRushedMode && + chunksToPersist > float64(s.maxChunksToPersist)*factorMinChunksToPersistToAllowRushedMode + if s.rushed { + log.Warnf( + "%.0f chunks in memory (%.1f%% of the allowed maximum %d). Storage is now in rushed mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v. Series maintenance happens as frequently as possible.", + memChunks, + memChunks*100/float64(s.maxMemoryChunks), + s.maxMemoryChunks, + s.checkpointInterval, + ) + } + return s.rushed } -// persistenceBacklogScore works similar to isDegraded, but returns a score +// persistenceBacklogScore works similar to inRushedMode, but returns a score // about how close we are to degradation. This score is 1.0 if no chunks are -// waiting for persistence and 0.0 if we are at or above the degradation -// threshold. +// waiting for persistence or we are not over the threshold for memory chunks, +// and 0.0 if we are at or above the thresholds. However, the score is always 0 +// if the storage is currently in rushed mode. (Getting out of it has a +// hysteresis, so we might be below thresholds again but still in rushed mode.) func (s *memorySeriesStorage) persistenceBacklogScore() float64 { - score := 1 - float64(s.getNumChunksToPersist())/float64(s.maxChunksToPersist*percentChunksToPersistForDegradation/100) + if s.inRushedMode() { + return 0 + } + + chunksToPersist := float64(s.getNumChunksToPersist()) + score := 1 - chunksToPersist/(float64(s.maxChunksToPersist)*factorChunksToPersistForEnteringRushedMode) + + if chunksToPersist > float64(s.maxChunksToPersist)*factorMinChunksToPersistToAllowRushedMode { + memChunks := float64(atomic.LoadInt64(&numMemChunks)) + score = math.Min( + score, + 1-(memChunks/float64(s.maxMemoryChunks)-1)/(factorMemChunksForEnteringRushedMode-1), + ) + } if score < 0 { return 0 } + if score > 1 { + return 1 + } return score } From a2cd479058e8da3fc98679ba4d0ad6c2accb0c8e Mon Sep 17 00:00:00 2001 From: beorn7 Date: Mon, 25 Jan 2016 19:33:51 +0100 Subject: [PATCH 2/3] Fix calculation of chunks to persist after restart Since we are not overestimating the number of chunks to persist anymore, this commit also adjusts the default value for -storage.local.memory-chunks. Update of documentation will follow. --- cmd/prometheus/config.go | 2 +- storage/local/persistence.go | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/cmd/prometheus/config.go b/cmd/prometheus/config.go index 1c5e7679a3..839b3f4ce3 100644 --- a/cmd/prometheus/config.go +++ b/cmd/prometheus/config.go @@ -113,7 +113,7 @@ func init() { "How long to retain samples in the local storage.", ) cfg.fs.IntVar( - &cfg.storage.MaxChunksToPersist, "storage.local.max-chunks-to-persist", 1024*1024, + &cfg.storage.MaxChunksToPersist, "storage.local.max-chunks-to-persist", 512*1024, "How many chunks can be waiting for persistence before sample ingestion will stop. Many chunks waiting to be persisted will increase the checkpoint size.", ) cfg.fs.DurationVar( diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 96a478a04d..72b989ccb7 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -861,6 +861,12 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in } } + headChunkClosed := persistWatermark >= numChunkDescs + if !headChunkClosed { + // Head chunk is not ready for persisting yet. + chunksToPersist-- + } + fingerprintToSeries[model.Fingerprint(fp)] = &memorySeries{ metric: model.Metric(metric), chunkDescs: chunkDescs, @@ -869,7 +875,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in chunkDescsOffset: int(chunkDescsOffset), savedFirstTime: model.Time(savedFirstTime), lastTime: chunkDescs[len(chunkDescs)-1].lastTime(), - headChunkClosed: persistWatermark >= numChunkDescs, + headChunkClosed: headChunkClosed, } } return sm, chunksToPersist, nil From 87ef24cd2588d3c9962c903472f6edd0b5930532 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Tue, 26 Jan 2016 17:29:33 +0100 Subject: [PATCH 3/3] Add instrumentation and refactor things around "rushed mode" --- storage/local/storage.go | 205 +++++++++++++++++++++------------------ 1 file changed, 112 insertions(+), 93 deletions(-) diff --git a/storage/local/storage.go b/storage/local/storage.go index 3afc9fb86d..3d5aeed562 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -41,26 +41,21 @@ const ( // Constants to control the hysteresis of entering and leaving "rushed // mode". In rushed mode, the dirty series count is ignored for - // checkpointing, and series files are not synced if the adaptive sync - // strategy is used. - // - // If we reach 80% of -storage.local.max-chunks-to-persist, we enter - // "rushed mode". - factorChunksToPersistForEnteringRushedMode = 0.8 - // To leave "rushed mode", we must be below 70% of - // -storage.local.max-chunks-to-persist. - factorChunksToPersistForLeavingRushedMode = 0.7 - // To enter "rushed mode" for other reasons (see below), we must have at - // least 30% of -storage.local.max-chunks-to-persist. - factorMinChunksToPersistToAllowRushedMode = 0.3 - // If the number of chunks in memory reaches 110% of - // -storage.local.memory-chunks, we will enter "rushed mode" (provided - // we have enough chunks to persist at all, see - // factorMinChunksToPersistToAllowRushedMode.) - factorMemChunksForEnteringRushedMode = 1.1 - // To leave "rushed mode", we must be below 105% of - // -storage.local.memory-chunks. - factorMemChunksForLeavingRushedMode = 1.05 + // checkpointing, series are maintained as frequently as possible, and + // series files are not synced if the adaptive sync strategy is used. + persintenceUrgencyScoreForEnteringRushedMode = 0.8 + persintenceUrgencyScoreForLeavingRushedMode = 0.7 + + // This factor times -storage.local.memory-chunks is the number of + // memory chunks we tolerate before suspending ingestion (TODO!). It is + // also a basis for calculating the persistenceUrgencyScore. + toleranceFactorForMemChunks = 1.1 + // This factor times -storage.local.max-chunks-to-persist is the minimum + // required number of chunks waiting for persistence before the number + // of chunks in memory may influence the persistenceUrgencyScore. (In + // other words: if there are no chunks to persist, it doesn't help chunk + // eviction if we speed up persistence.) + factorMinChunksToPersist = 0.2 ) var ( @@ -155,6 +150,8 @@ type memorySeriesStorage struct { outOfOrderSamplesCount prometheus.Counter invalidPreloadRequestsCount prometheus.Counter maintainSeriesDuration *prometheus.SummaryVec + persistenceUrgencyScore prometheus.Gauge + rushedMode prometheus.Gauge } // MemorySeriesStorageOptions contains options needed by @@ -243,6 +240,18 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage { }, []string{seriesLocationLabel}, ), + persistenceUrgencyScore: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "persistence_urgency_score", + Help: "A score of urgency to persist chunks, 0 is least urgent, 1 most.", + }), + rushedMode: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "rushed_mode", + Help: "1 if the storage is in rushed mode, 0 otherwise. In rushed mode, the system behaves as if the persistence_urgency_score is 1.", + }), } return s } @@ -256,7 +265,7 @@ func (s *memorySeriesStorage) Start() (err error) { case Always: syncStrategy = func() bool { return true } case Adaptive: - syncStrategy = func() bool { return !s.inRushedMode() } + syncStrategy = func() bool { return s.calculatePersistenceUrgencyScore() < 1 } default: panic("unknown sync strategy") } @@ -823,8 +832,8 @@ func (s *memorySeriesStorage) cycleThroughMemoryFingerprints() chan model.Finger case <-s.loopStopping: return } - // Reduce the wait time by the backlog score. - s.waitForNextFP(s.fpToSeries.length(), s.persistenceBacklogScore()) + // Reduce the wait time according to the urgency score. + s.waitForNextFP(s.fpToSeries.length(), 1-s.calculatePersistenceUrgencyScore()) count++ } if count > 0 { @@ -916,8 +925,9 @@ loop: // would be counterproductive, as it would slow down chunk persisting even more, // while in a situation like that, where we are clearly lacking speed of disk // maintenance, the best we can do for crash recovery is to persist chunks as - // quickly as possible. So only checkpoint if the storage is not in "rushed mode". - if dirtySeriesCount >= s.checkpointDirtySeriesLimit && !s.inRushedMode() { + // quickly as possible. So only checkpoint if the urgency score is < 1. + if dirtySeriesCount >= s.checkpointDirtySeriesLimit && + s.calculatePersistenceUrgencyScore() < 1 { checkpointTimer.Reset(0) } } @@ -1161,78 +1171,83 @@ func (s *memorySeriesStorage) incNumChunksToPersist(by int) { atomic.AddInt64(&s.numChunksToPersist, int64(by)) } -// inRushedMode returns whether the storage is in "rushed mode", which is the -// case if there are too many chunks waiting for persistence or there are too -// many chunks in memory. The method is not goroutine safe (but only ever called -// from the goroutine dealing with series maintenance). Changes of degradation -// mode are logged. -func (s *memorySeriesStorage) inRushedMode() bool { - chunksToPersist := float64(s.getNumChunksToPersist()) - memChunks := float64(atomic.LoadInt64(&numMemChunks)) - - if s.rushed { - // We are already in rushed mode, so check if we can get out of - // it, using the lower hysteresis thresholds. - s.rushed = chunksToPersist > float64(s.maxChunksToPersist)*factorChunksToPersistForLeavingRushedMode || - memChunks > float64(s.maxMemoryChunks)*factorMemChunksForLeavingRushedMode - if !s.rushed { - log.Warn("Storage has left rushed mode. Things are back to normal.") - } - return s.rushed - } - // We are not rushed yet, so check the higher hysteresis threshold if we enter it now. - // First WRT chunksToPersist... - s.rushed = chunksToPersist > float64(s.maxChunksToPersist)*factorChunksToPersistForEnteringRushedMode - if s.rushed { - log.Warnf( - "%.0f chunks waiting for persistence (%.1f%% of the allowed maximum %d). Storage is now in rushed mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v. Series maintenance happens as frequently as possible.", - chunksToPersist, - chunksToPersist*100/float64(s.maxChunksToPersist), - s.maxChunksToPersist, - s.checkpointInterval, - ) - return true - } - // ...then WRT memChunks. - s.rushed = memChunks > float64(s.maxMemoryChunks)*factorMemChunksForEnteringRushedMode && - chunksToPersist > float64(s.maxChunksToPersist)*factorMinChunksToPersistToAllowRushedMode - if s.rushed { - log.Warnf( - "%.0f chunks in memory (%.1f%% of the allowed maximum %d). Storage is now in rushed mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v. Series maintenance happens as frequently as possible.", - memChunks, - memChunks*100/float64(s.maxMemoryChunks), - s.maxMemoryChunks, - s.checkpointInterval, - ) - } - return s.rushed -} - -// persistenceBacklogScore works similar to inRushedMode, but returns a score -// about how close we are to degradation. This score is 1.0 if no chunks are -// waiting for persistence or we are not over the threshold for memory chunks, -// and 0.0 if we are at or above the thresholds. However, the score is always 0 -// if the storage is currently in rushed mode. (Getting out of it has a -// hysteresis, so we might be below thresholds again but still in rushed mode.) -func (s *memorySeriesStorage) persistenceBacklogScore() float64 { - if s.inRushedMode() { - return 0 - } - - chunksToPersist := float64(s.getNumChunksToPersist()) - score := 1 - chunksToPersist/(float64(s.maxChunksToPersist)*factorChunksToPersistForEnteringRushedMode) - - if chunksToPersist > float64(s.maxChunksToPersist)*factorMinChunksToPersistToAllowRushedMode { - memChunks := float64(atomic.LoadInt64(&numMemChunks)) - score = math.Min( +// calculatePersistenceUrgencyScore calculates and returns an urgency score for +// the speed of persisting chunks. The score is between 0 and 1, where 0 means +// no urgency at all and 1 means highest urgency. +// +// The score is the maximum of the two following sub-scores: +// +// (1) The first sub-score is the number of chunks waiting for persistence +// divided by the maximum number of chunks allowed to be waiting for +// persistence. +// +// (2) If there are more chunks in memory than allowed AND there are more chunks +// waiting for persistence than factorMinChunksToPersist times +// -storage.local.max-chunks-to-persist, then the second sub-score is the +// fraction the number of memory chunks has reached between +// -storage.local.memory-chunks and toleranceFactorForMemChunks times +// -storage.local.memory-chunks. +// +// Should the score ever hit persintenceUrgencyScoreForEnteringRushedMode, the +// storage locks into "rushed mode", in which the returned score is always +// bumped up to 1 until the non-bumped score is below +// persintenceUrgencyScoreForLeavingRushedMode. +// +// This method is not goroutine-safe, but it is only ever called by the single +// goroutine that is in charge of series maintenance. According to the returned +// score, series maintenence should be sped up. If a score of 1 is returned, +// checkpointing based on dirty-series count should be disabled, and series +// files should not by synced anymore provided the user has specified the +// adaptive sync strategy. +func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 { + var ( + chunksToPersist = float64(s.getNumChunksToPersist()) + maxChunksToPersist = float64(s.maxChunksToPersist) + memChunks = float64(atomic.LoadInt64(&numMemChunks)) + maxMemChunks = float64(s.maxMemoryChunks) + ) + score := chunksToPersist / maxChunksToPersist + if chunksToPersist > maxChunksToPersist*factorMinChunksToPersist { + score = math.Max( score, - 1-(memChunks/float64(s.maxMemoryChunks)-1)/(factorMemChunksForEnteringRushedMode-1), + (memChunks/maxMemChunks-1)/(toleranceFactorForMemChunks-1), ) } - if score < 0 { - return 0 - } if score > 1 { + score = 1 + } + s.persistenceUrgencyScore.Set(score) + + if s.rushed { + // We are already in rushed mode. If the score is still above + // persintenceUrgencyScoreForLeavingRushedMode, return 1 and + // leave things as they are. + if score > persintenceUrgencyScoreForLeavingRushedMode { + return 1 + } + // We are out of rushed mode! + s.rushed = false + s.rushedMode.Set(0) + log. + With("urgencyScore", score). + With("chunksToPersist", chunksToPersist). + With("maxChunksToPersist", maxChunksToPersist). + With("memoryChunks", memChunks). + With("maxMemoryChunks", maxMemChunks). + Warn("Storage has left rushed mode.") + return score + } + if score > persintenceUrgencyScoreForEnteringRushedMode { + // Enter rushed mode. + s.rushed = true + s.rushedMode.Set(1) + log. + With("urgencyScore", score). + With("chunksToPersist", chunksToPersist). + With("maxChunksToPersist", maxChunksToPersist). + With("memoryChunks", memChunks). + With("maxMemoryChunks", maxMemChunks). + Warn("Storage has entered rushed mode.") return 1 } return score @@ -1253,6 +1268,8 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { ch <- s.invalidPreloadRequestsCount.Desc() ch <- numMemChunksDesc s.maintainSeriesDuration.Describe(ch) + ch <- s.persistenceUrgencyScore.Desc() + ch <- s.rushedMode.Desc() } // Collect implements prometheus.Collector. @@ -1282,4 +1299,6 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) { float64(atomic.LoadInt64(&numMemChunks)), ) s.maintainSeriesDuration.Collect(ch) + ch <- s.persistenceUrgencyScore + ch <- s.rushedMode }