diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index edb0cf5658..ffede437b1 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -668,6 +668,9 @@ func main() { } if cfgFile.StorageConfig.TSDBConfig != nil { cfg.tsdb.OutOfOrderTimeWindow = cfgFile.StorageConfig.TSDBConfig.OutOfOrderTimeWindow + cfg.tsdb.StaleSeriesCompactionInterval = time.Duration(cfgFile.StorageConfig.TSDBConfig.StaleSeriesCompactionInterval) + cfg.tsdb.StaleSeriesCompactionThreshold = cfgFile.StorageConfig.TSDBConfig.StaleSeriesCompactionThreshold + cfg.tsdb.StaleSeriesImmediateCompactionThreshold = cfgFile.StorageConfig.TSDBConfig.StaleSeriesImmediateCompactionThreshold } // Set Go runtime parameters before we get too far into initialization. @@ -1857,50 +1860,56 @@ func (rm *readyScrapeManager) Get() (*scrape.Manager, error) { // tsdbOptions is tsdb.Option version with defined units. // This is required as tsdb.Option fields are unit agnostic (time). type tsdbOptions struct { - WALSegmentSize units.Base2Bytes - MaxBlockChunkSegmentSize units.Base2Bytes - RetentionDuration model.Duration - MaxBytes units.Base2Bytes - NoLockfile bool - WALCompressionType compression.Type - HeadChunksWriteQueueSize int - SamplesPerChunk int - StripeSize int - MinBlockDuration model.Duration - MaxBlockDuration model.Duration - OutOfOrderTimeWindow int64 - EnableExemplarStorage bool - MaxExemplars int64 - EnableMemorySnapshotOnShutdown bool - EnableNativeHistograms bool - EnableDelayedCompaction bool - CompactionDelayMaxPercent int - EnableOverlappingCompaction bool - UseUncachedIO bool + WALSegmentSize units.Base2Bytes + MaxBlockChunkSegmentSize units.Base2Bytes + RetentionDuration model.Duration + MaxBytes units.Base2Bytes + NoLockfile bool + WALCompressionType compression.Type + HeadChunksWriteQueueSize int + SamplesPerChunk int + StripeSize int + MinBlockDuration model.Duration + MaxBlockDuration model.Duration + OutOfOrderTimeWindow int64 + EnableExemplarStorage bool + MaxExemplars int64 + EnableMemorySnapshotOnShutdown bool + EnableNativeHistograms bool + EnableDelayedCompaction bool + CompactionDelayMaxPercent int + EnableOverlappingCompaction bool + UseUncachedIO bool + StaleSeriesCompactionInterval time.Duration + StaleSeriesCompactionThreshold float64 + StaleSeriesImmediateCompactionThreshold float64 } func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { return tsdb.Options{ - WALSegmentSize: int(opts.WALSegmentSize), - MaxBlockChunkSegmentSize: int64(opts.MaxBlockChunkSegmentSize), - RetentionDuration: int64(time.Duration(opts.RetentionDuration) / time.Millisecond), - MaxBytes: int64(opts.MaxBytes), - NoLockfile: opts.NoLockfile, - WALCompression: opts.WALCompressionType, - HeadChunksWriteQueueSize: opts.HeadChunksWriteQueueSize, - SamplesPerChunk: opts.SamplesPerChunk, - StripeSize: opts.StripeSize, - MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond), - MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond), - EnableExemplarStorage: opts.EnableExemplarStorage, - MaxExemplars: opts.MaxExemplars, - EnableMemorySnapshotOnShutdown: opts.EnableMemorySnapshotOnShutdown, - EnableNativeHistograms: opts.EnableNativeHistograms, - OutOfOrderTimeWindow: opts.OutOfOrderTimeWindow, - EnableDelayedCompaction: opts.EnableDelayedCompaction, - CompactionDelayMaxPercent: opts.CompactionDelayMaxPercent, - EnableOverlappingCompaction: opts.EnableOverlappingCompaction, - UseUncachedIO: opts.UseUncachedIO, + WALSegmentSize: int(opts.WALSegmentSize), + MaxBlockChunkSegmentSize: int64(opts.MaxBlockChunkSegmentSize), + RetentionDuration: int64(time.Duration(opts.RetentionDuration) / time.Millisecond), + MaxBytes: int64(opts.MaxBytes), + NoLockfile: opts.NoLockfile, + WALCompression: opts.WALCompressionType, + HeadChunksWriteQueueSize: opts.HeadChunksWriteQueueSize, + SamplesPerChunk: opts.SamplesPerChunk, + StripeSize: opts.StripeSize, + MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond), + MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond), + EnableExemplarStorage: opts.EnableExemplarStorage, + MaxExemplars: opts.MaxExemplars, + EnableMemorySnapshotOnShutdown: opts.EnableMemorySnapshotOnShutdown, + EnableNativeHistograms: opts.EnableNativeHistograms, + OutOfOrderTimeWindow: opts.OutOfOrderTimeWindow, + EnableDelayedCompaction: opts.EnableDelayedCompaction, + CompactionDelayMaxPercent: opts.CompactionDelayMaxPercent, + EnableOverlappingCompaction: opts.EnableOverlappingCompaction, + UseUncachedIO: opts.UseUncachedIO, + StaleSeriesCompactionInterval: opts.StaleSeriesCompactionInterval, + StaleSeriesCompactionThreshold: opts.StaleSeriesCompactionThreshold, + StaleSeriesImmediateCompactionThreshold: opts.StaleSeriesImmediateCompactionThreshold, } } diff --git a/config/config.go b/config/config.go index 058b4bf881..b1551c9840 100644 --- a/config/config.go +++ b/config/config.go @@ -1021,6 +1021,19 @@ type TSDBConfig struct { // During unmarshall, this is converted into milliseconds and stored in OutOfOrderTimeWindow. // This should not be used directly and must be converted into OutOfOrderTimeWindow. OutOfOrderTimeWindowFlag model.Duration `yaml:"out_of_order_time_window,omitempty"` + + // StaleSeriesCompactionInterval tells at what interval to attempt stale series compaction + // if the number of stale series crosses the given threshold. + StaleSeriesCompactionInterval model.Duration `yaml:"stale_series_compaction_interval,omitempty"` + + // StaleSeriesCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in + // the in-memory Head block. If the % of stale series crosses this threshold, stale series + // compaction will be run in the next stale series compaction interval. + StaleSeriesCompactionThreshold float64 `yaml:"stale_series_compaction_threshold,omitempty"` + + // StaleSeriesImmediateCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in + // the in-memory Head block. If the % of stale series crosses this threshold, stale series is run immediately. + StaleSeriesImmediateCompactionThreshold float64 `yaml:"stale_series_immediate_compaction_threshold,omitempty"` } // UnmarshalYAML implements the yaml.Unmarshaler interface. diff --git a/tsdb/db.go b/tsdb/db.go index adef3a9efd..85d7237daf 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -98,6 +98,16 @@ func DefaultOptions() *Options { // Options of the DB storage. type Options struct { + // staleSeriesCompactionInterval is same as below option with same name, but is atomic so that we can do live updates without locks. + // This is the one that must be used by the code. + staleSeriesCompactionInterval atomic.Int64 + // staleSeriesCompactionThreshold is same as below option with same name, but is atomic so that we can do live updates without locks. + // This is the one that must be used by the code. + staleSeriesCompactionThreshold atomic.Float64 + // staleSeriesImmediateCompactionThreshold is same as below option with same name, but is atomic so that we can do live updates without locks. + // This is the one that must be used by the code. + staleSeriesImmediateCompactionThreshold atomic.Float64 + // Segments (wal files) max size. // WALSegmentSize = 0, segment size is default size. // WALSegmentSize > 0, segment size is WALSegmentSize. @@ -832,6 +842,10 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) { rngs = ExponentialBlockRanges(opts.MinBlockDuration, 10, 3) } + opts.staleSeriesCompactionInterval.Store(int64(opts.StaleSeriesCompactionInterval)) + opts.staleSeriesCompactionThreshold.Store(opts.StaleSeriesCompactionThreshold) + opts.staleSeriesImmediateCompactionThreshold.Store(opts.StaleSeriesImmediateCompactionThreshold) + return opts, rngs } @@ -1103,15 +1117,13 @@ func (db *DB) run(ctx context.Context) { backoff := time.Duration(0) - nextStaleSeriesCompactionTime := time.Now().Round(db.opts.StaleSeriesCompactionInterval) - if nextStaleSeriesCompactionTime.Before(time.Now()) { - nextStaleSeriesCompactionTime = nextStaleSeriesCompactionTime.Add(db.opts.StaleSeriesCompactionInterval) - } - - staleSeriesWaitDur := time.Until(nextStaleSeriesCompactionTime) - if db.opts.StaleSeriesCompactionInterval <= 0 { - // Long enough interval so that we don't schedule a stale series compaction. - staleSeriesWaitDur = 365 * 24 * time.Hour + staleSeriesCompactionInterval := time.Duration(db.opts.staleSeriesCompactionInterval.Load()) + nextStaleSeriesCompactionTime := nextStepAlignedTime(staleSeriesCompactionInterval) + timedStaleSeriesCompactionActive := true + if staleSeriesCompactionInterval <= 0 { + // Far enough so that we don't schedule a stale series compaction. + timedStaleSeriesCompactionActive = false + nextStaleSeriesCompactionTime = time.Now().Add(365 * 24 * time.Hour) } for { @@ -1121,6 +1133,11 @@ func (db *DB) run(ctx context.Context) { case <-time.After(backoff): } + staleSeriesWaitDur := time.Until(nextStaleSeriesCompactionTime) + if staleSeriesWaitDur < 0 { + staleSeriesWaitDur = 0 + } + select { case <-time.After(1 * time.Minute): db.cmtx.Lock() @@ -1132,8 +1149,8 @@ func (db *DB) run(ctx context.Context) { // TODO: check if normal compaction is soon, and don't run stale series compaction if it is soon. numStaleSeries, numSeries := db.Head().NumStaleSeries(), db.Head().NumSeries() staleSeriesRatio := float64(numStaleSeries) / float64(numSeries) - if db.autoCompact && db.opts.StaleSeriesImmediateCompactionThreshold > 0 && - staleSeriesRatio >= db.opts.StaleSeriesImmediateCompactionThreshold { + if db.autoCompact && db.opts.staleSeriesImmediateCompactionThreshold.Load() > 0 && + staleSeriesRatio >= db.opts.staleSeriesImmediateCompactionThreshold.Load() { if err := db.CompactStaleHead(); err != nil { db.logger.Error("immediate stale series compaction failed", "err", err) } @@ -1145,6 +1162,14 @@ func (db *DB) run(ctx context.Context) { } // We attempt mmapping of head chunks regularly. db.head.mmapHeadChunks() + + staleSeriesCompactionInterval := time.Duration(db.opts.staleSeriesCompactionInterval.Load()) + if !timedStaleSeriesCompactionActive && staleSeriesCompactionInterval > 0 { + // The config was updated in realtime. + timedStaleSeriesCompactionActive = true + nextStaleSeriesCompactionTime = nextStepAlignedTime(staleSeriesCompactionInterval) + } + case <-db.compactc: db.metrics.compactionsTriggered.Inc() @@ -1161,18 +1186,26 @@ func (db *DB) run(ctx context.Context) { } db.autoCompactMtx.Unlock() case <-time.After(staleSeriesWaitDur): + staleSeriesCompactionInterval := time.Duration(db.opts.staleSeriesCompactionInterval.Load()) + if staleSeriesCompactionInterval <= 0 { + // The config was updated in realtime. + // Far enough so that we don't schedule a stale series compaction. + timedStaleSeriesCompactionActive = false + nextStaleSeriesCompactionTime = time.Now().Add(365 * 24 * time.Hour) + continue + } + // TODO: check if normal compaction is soon, and don't run stale series compaction if it is soon. numStaleSeries, numSeries := db.Head().NumStaleSeries(), db.Head().NumSeries() staleSeriesRatio := float64(numStaleSeries) / float64(numSeries) - if db.autoCompact && db.opts.StaleSeriesCompactionThreshold > 0 && - staleSeriesRatio >= db.opts.StaleSeriesCompactionThreshold { + if db.autoCompact && db.opts.staleSeriesCompactionThreshold.Load() > 0 && + staleSeriesRatio >= db.opts.staleSeriesCompactionThreshold.Load() { if err := db.CompactStaleHead(); err != nil { db.logger.Error("scheduled stale series compaction failed", "err", err) } } - nextStaleSeriesCompactionTime = nextStaleSeriesCompactionTime.Add(db.opts.StaleSeriesCompactionInterval) - staleSeriesWaitDur = time.Until(nextStaleSeriesCompactionTime) + nextStaleSeriesCompactionTime = nextStepAlignedTime(db.opts.StaleSeriesCompactionInterval) case <-db.stopc: return @@ -1180,6 +1213,14 @@ func (db *DB) run(ctx context.Context) { } } +func nextStepAlignedTime(step time.Duration) (next time.Time) { + next = time.Now().Round(step) + if next.Before(time.Now()) { + next = next.Add(step) + } + return +} + // Appender opens a new appender against the database. func (db *DB) Appender(ctx context.Context) storage.Appender { return dbAppender{db: db, Appender: db.head.Appender(ctx)} @@ -1206,6 +1247,13 @@ func (db *DB) ApplyConfig(conf *config.Config) error { oooTimeWindow := int64(0) if conf.StorageConfig.TSDBConfig != nil { oooTimeWindow = conf.StorageConfig.TSDBConfig.OutOfOrderTimeWindow + db.opts.staleSeriesCompactionInterval.Store(int64(conf.StorageConfig.TSDBConfig.StaleSeriesCompactionInterval)) + db.opts.staleSeriesCompactionThreshold.Store(conf.StorageConfig.TSDBConfig.StaleSeriesCompactionThreshold) + db.opts.staleSeriesImmediateCompactionThreshold.Store(conf.StorageConfig.TSDBConfig.StaleSeriesImmediateCompactionThreshold) + } else { + db.opts.staleSeriesCompactionInterval.Store(0) + db.opts.staleSeriesCompactionThreshold.Store(0) + db.opts.staleSeriesImmediateCompactionThreshold.Store(0) } if oooTimeWindow < 0 { oooTimeWindow = 0 diff --git a/tsdb/head.go b/tsdb/head.go index 3fd9285a47..f17d11d5ad 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1756,7 +1756,7 @@ func (h *Head) gcStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) { // that reads the WAL, wouldn't be able to use those // samples since we would have no labels for that ref ID. for ref := range deleted { - h.walExpiries[chunks.HeadSeriesRef(ref)] = last + h.walExpiries[chunks.HeadSeriesRef(ref)] = int64(last) } h.walExpiriesMtx.Unlock() }