diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index f46f1fa64d..516a6c6d11 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -330,6 +330,9 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error { } c.tsdb.UseUncachedIO = true logger.Info("Experimental Uncached IO is enabled.") + case "fast-startup": + c.tsdb.EnableFastStartup = true + logger.Info("Experimental fast startup is enabled.") default: logger.Warn("Unknown option for --enable-feature", "option", o) } @@ -2033,6 +2036,7 @@ type tsdbOptions struct { EnableSTStorage bool EnableXOR2Encoding bool StaleSeriesCompactionThreshold float64 + EnableFastStartup bool } func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { @@ -2064,6 +2068,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { EnableSTStorage: opts.EnableSTStorage, EnableXOR2Encoding: opts.EnableXOR2Encoding, StaleSeriesCompactionThreshold: opts.StaleSeriesCompactionThreshold, + EnableFastStartup: opts.EnableFastStartup, } } diff --git a/tsdb/db.go b/tsdb/db.go index 2ca1bccf0d..fde5c5e22c 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -271,6 +271,9 @@ type Options struct { // FsSizeFunc is a function returning the total disk size for a given path. FsSizeFunc FsSizeFunc + + // EnableFastStartup enables scraping in parallel with WAL replay but with queries still disabled. + EnableFastStartup bool } type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error) @@ -1084,6 +1087,7 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn headOpts.EnableSTStorage.Store(opts.EnableSTStorage) headOpts.EnableXOR2Encoding.Store(opts.EnableXOR2Encoding) headOpts.EnableMetadataWALRecords = opts.EnableMetadataWALRecords + headOpts.EnableFastStartup = opts.EnableFastStartup if opts.WALReplayConcurrency > 0 { headOpts.WALReplayConcurrency = opts.WALReplayConcurrency } diff --git a/tsdb/head.go b/tsdb/head.go index 838b4bb699..fb85691638 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -136,6 +136,10 @@ type Head struct { closedMtx sync.Mutex closed bool + // For running the background goroutine which updates series_state.json. + seriesStateQuit chan struct{} + seriesStateWg sync.WaitGroup + stats *HeadStats reg prometheus.Registerer @@ -210,6 +214,9 @@ type HeadOptions struct { // NOTE(bwplotka): This feature might be deprecated and removed once PROM-60 // is implemented. EnableMetadataWALRecords bool + + // EnableFastStartup enables scraping in parallel with WAL replay but with queries still disabled. + EnableFastStartup bool } const ( @@ -294,8 +301,9 @@ func NewHead(r prometheus.Registerer, l *slog.Logger, wal, wbl *wlog.WL, opts *H return &memChunk{} }, }, - stats: stats, - reg: r, + stats: stats, + reg: r, + seriesStateQuit: make(chan struct{}), } if err := h.resetInMemoryState(); err != nil { return nil, err @@ -901,6 +909,13 @@ func (h *Head) Init(minValidTime int64) error { "total_replay_duration", totalReplayDuration.String(), ) + // TODO(RushabhMehta2005): Remove this 'if' block and always run the series state ticker when the feature is fully implemented. + if h.opts.EnableFastStartup { + // Start the background goroutine that writes to series_state.json. + h.seriesStateWg.Add(1) + go h.runSeriesStateTicker() + } + return nil } @@ -1817,6 +1832,15 @@ func (h *Head) Close() error { defer h.closedMtx.Unlock() h.closed = true + // Stop the background series_state.json writer. + if h.opts.EnableFastStartup && h.seriesStateQuit != nil { + close(h.seriesStateQuit) + h.seriesStateWg.Wait() + h.seriesStateQuit = nil + // Flush the final clean state. + h.writeSeriesState(true) + } + // mmap all but last chunk in case we're performing snapshot since that only // takes samples from most recent head chunk. h.mmapHeadChunks() diff --git a/tsdb/head_test.go b/tsdb/head_test.go index c04cd51278..8aaad13c0e 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -15,6 +15,7 @@ package tsdb import ( "context" + "encoding/json" "fmt" "io" "math" @@ -7871,3 +7872,46 @@ func TestWALReplayRaceWithStaleSeriesCompaction(t *testing.T) { require.Equal(t, uint64(numNewSeries), head.NumSeries()) require.NoError(t, head.Close()) } + +func TestHead_FastStartupStateFile(t *testing.T) { + opts := newTestHeadDefaultOptions(1000, false) + // Enable the fast startup feature. + opts.EnableFastStartup = true + + head, w := newTestHeadWithOptions(t, compression.None, opts) + require.NoError(t, head.Init(0)) + + // Add a single sample to the Head. + app := head.Appender(context.Background()) + _, err := app.Append(0, labels.FromStrings("fast", "startup"), 100, 1.0) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + // Wait for the ticker to update the series state file. + time.Sleep(1500 * time.Millisecond) + + stateFilePath := filepath.Join(w.Dir(), "series_state.json") + + b, err := os.ReadFile(stateFilePath) + require.NoError(t, err, "series_state.json should exist after ticker runs") + + var state SeriesLifecycleState + require.NoError(t, json.Unmarshal(b, &state), "file should be valid JSON") + + // The ticker should write an unclean state. + require.False(t, state.CleanShutdown, "ticker should write CleanShutdown: false") + require.Equal(t, uint64(1), state.LastSeriesID, "LastSeriesID should be 1 after adding our sample") + require.Equal(t, 0, state.LastWALSegment, "LastWALSegment should be 0 on a fresh WAL") + + // Perform a clean shutdown. + require.NoError(t, head.Close()) + + b, err = os.ReadFile(stateFilePath) + require.NoError(t, err, "series_state.json should still exist after Close()") + require.NoError(t, json.Unmarshal(b, &state)) + + // Calling head.Close() should put us in the clean state. + require.True(t, state.CleanShutdown, "Close() should write CleanShutdown: true") + require.Equal(t, uint64(1), state.LastSeriesID, "LastSeriesID should remain 1") + require.Equal(t, 0, state.LastWALSegment, "LastWALSegment should remain 0") +} diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index b8837a3aa9..de8b19b6c1 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -14,6 +14,7 @@ package tsdb import ( + "encoding/json" "errors" "fmt" "maps" @@ -1805,3 +1806,70 @@ Outer: return snapIdx, snapOffset, refSeries, nil } + +// Name of the file used to store the state. +const seriesStateFilename = "series_state.json" + +// SeriesLifecycleState descibes the information we record in the series_state.json file. +type SeriesLifecycleState struct { + LastSeriesID uint64 `json:"last_series_id"` + LastWALSegment int `json:"last_wal_segment"` + CleanShutdown bool `json:"clean_shutdown"` +} + +// Atomically writes the current series state to disk. +func (h *Head) writeSeriesState(cleanShutdown bool) { + if h.wal == nil { + return + } + + // Find the last segment number by checking the wal/ directory. + last, _, err := h.wal.LastSegmentAndOffset() + if err != nil { + h.logger.Warn("Failed to get WAL segments for series state", "err", err) + last = -1 + } + + state := SeriesLifecycleState{ + LastSeriesID: h.lastSeriesID.Load(), + LastWALSegment: last, + CleanShutdown: cleanShutdown, + } + + path := filepath.Join(h.wal.Dir(), seriesStateFilename) + tmpPath := path + ".tmp" + + f, err := os.Create(tmpPath) + if err != nil { + h.logger.Error("Failed to create temp series state file", "err", err) + return + } + + if err := json.NewEncoder(f).Encode(state); err != nil { + h.logger.Error("Failed to encode series state", "err", err) + f.Close() + return + } + + f.Close() + + if err := os.Rename(tmpPath, path); err != nil { + h.logger.Error("Failed to rename the temporary series state file", "err", err) + } +} + +// runSeriesStateTicker writes the series state to disk every second. +func (h *Head) runSeriesStateTicker() { + defer h.seriesStateWg.Done() + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + h.writeSeriesState(false) + case <-h.seriesStateQuit: + return + } + } +}