mirror of
https://github.com/prometheus/prometheus.git
synced 2026-05-28 04:02:21 -04:00
tsdb: Add series_state.json file to wal/ directory to track state (#18303)
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Compliance testing (push) Waiting to run
CI / Build Prometheus for common architectures (push) Waiting to run
CI / Build Prometheus for all architectures (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Compliance testing (push) Waiting to run
CI / Build Prometheus for common architectures (push) Waiting to run
CI / Build Prometheus for all architectures (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
* Add series_state.json file creation and updation logic. Signed-off-by: Rushabh Mehta <mehtarushabh2005@gmail.com> * Make comments follow the guidelines. Signed-off-by: Rushabh Mehta <mehtarushabh2005@gmail.com> * Fix linter complaints Signed-off-by: Rushabh Mehta <mehtarushabh2005@gmail.com> * Put PR behind feature flag fast-startup Signed-off-by: Rushabh Mehta <mehtarushabh2005@gmail.com> * Marshal updated information to file directly Signed-off-by: Rushabh Mehta <mehtarushabh2005@gmail.com> * Fix linter failures Signed-off-by: Rushabh Mehta <mehtarushabh2005@gmail.com> * Move series state code from head.go to head_wal.go Signed-off-by: Rushabh Mehta <mehtarushabh2005@gmail.com> * Fix nits Signed-off-by: Rushabh Mehta <mehtarushabh2005@gmail.com> * Add unit test Signed-off-by: Rushabh Mehta <mehtarushabh2005@gmail.com> --------- Signed-off-by: Rushabh Mehta <mehtarushabh2005@gmail.com>
This commit is contained in:
parent
7df2d13f00
commit
df61021436
5 changed files with 147 additions and 2 deletions
|
|
@ -330,6 +330,9 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
|
|||
}
|
||||
c.tsdb.UseUncachedIO = true
|
||||
logger.Info("Experimental Uncached IO is enabled.")
|
||||
case "fast-startup":
|
||||
c.tsdb.EnableFastStartup = true
|
||||
logger.Info("Experimental fast startup is enabled.")
|
||||
default:
|
||||
logger.Warn("Unknown option for --enable-feature", "option", o)
|
||||
}
|
||||
|
|
@ -2033,6 +2036,7 @@ type tsdbOptions struct {
|
|||
EnableSTStorage bool
|
||||
EnableXOR2Encoding bool
|
||||
StaleSeriesCompactionThreshold float64
|
||||
EnableFastStartup bool
|
||||
}
|
||||
|
||||
func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
|
||||
|
|
@ -2064,6 +2068,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
|
|||
EnableSTStorage: opts.EnableSTStorage,
|
||||
EnableXOR2Encoding: opts.EnableXOR2Encoding,
|
||||
StaleSeriesCompactionThreshold: opts.StaleSeriesCompactionThreshold,
|
||||
EnableFastStartup: opts.EnableFastStartup,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -271,6 +271,9 @@ type Options struct {
|
|||
|
||||
// FsSizeFunc is a function returning the total disk size for a given path.
|
||||
FsSizeFunc FsSizeFunc
|
||||
|
||||
// EnableFastStartup enables scraping in parallel with WAL replay but with queries still disabled.
|
||||
EnableFastStartup bool
|
||||
}
|
||||
|
||||
type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error)
|
||||
|
|
@ -1084,6 +1087,7 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn
|
|||
headOpts.EnableSTStorage.Store(opts.EnableSTStorage)
|
||||
headOpts.EnableXOR2Encoding.Store(opts.EnableXOR2Encoding)
|
||||
headOpts.EnableMetadataWALRecords = opts.EnableMetadataWALRecords
|
||||
headOpts.EnableFastStartup = opts.EnableFastStartup
|
||||
if opts.WALReplayConcurrency > 0 {
|
||||
headOpts.WALReplayConcurrency = opts.WALReplayConcurrency
|
||||
}
|
||||
|
|
|
|||
28
tsdb/head.go
28
tsdb/head.go
|
|
@ -136,6 +136,10 @@ type Head struct {
|
|||
closedMtx sync.Mutex
|
||||
closed bool
|
||||
|
||||
// For running the background goroutine which updates series_state.json.
|
||||
seriesStateQuit chan struct{}
|
||||
seriesStateWg sync.WaitGroup
|
||||
|
||||
stats *HeadStats
|
||||
reg prometheus.Registerer
|
||||
|
||||
|
|
@ -210,6 +214,9 @@ type HeadOptions struct {
|
|||
// NOTE(bwplotka): This feature might be deprecated and removed once PROM-60
|
||||
// is implemented.
|
||||
EnableMetadataWALRecords bool
|
||||
|
||||
// EnableFastStartup enables scraping in parallel with WAL replay but with queries still disabled.
|
||||
EnableFastStartup bool
|
||||
}
|
||||
|
||||
const (
|
||||
|
|
@ -294,8 +301,9 @@ func NewHead(r prometheus.Registerer, l *slog.Logger, wal, wbl *wlog.WL, opts *H
|
|||
return &memChunk{}
|
||||
},
|
||||
},
|
||||
stats: stats,
|
||||
reg: r,
|
||||
stats: stats,
|
||||
reg: r,
|
||||
seriesStateQuit: make(chan struct{}),
|
||||
}
|
||||
if err := h.resetInMemoryState(); err != nil {
|
||||
return nil, err
|
||||
|
|
@ -901,6 +909,13 @@ func (h *Head) Init(minValidTime int64) error {
|
|||
"total_replay_duration", totalReplayDuration.String(),
|
||||
)
|
||||
|
||||
// TODO(RushabhMehta2005): Remove this 'if' block and always run the series state ticker when the feature is fully implemented.
|
||||
if h.opts.EnableFastStartup {
|
||||
// Start the background goroutine that writes to series_state.json.
|
||||
h.seriesStateWg.Add(1)
|
||||
go h.runSeriesStateTicker()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -1817,6 +1832,15 @@ func (h *Head) Close() error {
|
|||
defer h.closedMtx.Unlock()
|
||||
h.closed = true
|
||||
|
||||
// Stop the background series_state.json writer.
|
||||
if h.opts.EnableFastStartup && h.seriesStateQuit != nil {
|
||||
close(h.seriesStateQuit)
|
||||
h.seriesStateWg.Wait()
|
||||
h.seriesStateQuit = nil
|
||||
// Flush the final clean state.
|
||||
h.writeSeriesState(true)
|
||||
}
|
||||
|
||||
// mmap all but last chunk in case we're performing snapshot since that only
|
||||
// takes samples from most recent head chunk.
|
||||
h.mmapHeadChunks()
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ package tsdb
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
|
|
@ -7871,3 +7872,46 @@ func TestWALReplayRaceWithStaleSeriesCompaction(t *testing.T) {
|
|||
require.Equal(t, uint64(numNewSeries), head.NumSeries())
|
||||
require.NoError(t, head.Close())
|
||||
}
|
||||
|
||||
func TestHead_FastStartupStateFile(t *testing.T) {
|
||||
opts := newTestHeadDefaultOptions(1000, false)
|
||||
// Enable the fast startup feature.
|
||||
opts.EnableFastStartup = true
|
||||
|
||||
head, w := newTestHeadWithOptions(t, compression.None, opts)
|
||||
require.NoError(t, head.Init(0))
|
||||
|
||||
// Add a single sample to the Head.
|
||||
app := head.Appender(context.Background())
|
||||
_, err := app.Append(0, labels.FromStrings("fast", "startup"), 100, 1.0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// Wait for the ticker to update the series state file.
|
||||
time.Sleep(1500 * time.Millisecond)
|
||||
|
||||
stateFilePath := filepath.Join(w.Dir(), "series_state.json")
|
||||
|
||||
b, err := os.ReadFile(stateFilePath)
|
||||
require.NoError(t, err, "series_state.json should exist after ticker runs")
|
||||
|
||||
var state SeriesLifecycleState
|
||||
require.NoError(t, json.Unmarshal(b, &state), "file should be valid JSON")
|
||||
|
||||
// The ticker should write an unclean state.
|
||||
require.False(t, state.CleanShutdown, "ticker should write CleanShutdown: false")
|
||||
require.Equal(t, uint64(1), state.LastSeriesID, "LastSeriesID should be 1 after adding our sample")
|
||||
require.Equal(t, 0, state.LastWALSegment, "LastWALSegment should be 0 on a fresh WAL")
|
||||
|
||||
// Perform a clean shutdown.
|
||||
require.NoError(t, head.Close())
|
||||
|
||||
b, err = os.ReadFile(stateFilePath)
|
||||
require.NoError(t, err, "series_state.json should still exist after Close()")
|
||||
require.NoError(t, json.Unmarshal(b, &state))
|
||||
|
||||
// Calling head.Close() should put us in the clean state.
|
||||
require.True(t, state.CleanShutdown, "Close() should write CleanShutdown: true")
|
||||
require.Equal(t, uint64(1), state.LastSeriesID, "LastSeriesID should remain 1")
|
||||
require.Equal(t, 0, state.LastWALSegment, "LastWALSegment should remain 0")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"maps"
|
||||
|
|
@ -1805,3 +1806,70 @@ Outer:
|
|||
|
||||
return snapIdx, snapOffset, refSeries, nil
|
||||
}
|
||||
|
||||
// Name of the file used to store the state.
|
||||
const seriesStateFilename = "series_state.json"
|
||||
|
||||
// SeriesLifecycleState descibes the information we record in the series_state.json file.
|
||||
type SeriesLifecycleState struct {
|
||||
LastSeriesID uint64 `json:"last_series_id"`
|
||||
LastWALSegment int `json:"last_wal_segment"`
|
||||
CleanShutdown bool `json:"clean_shutdown"`
|
||||
}
|
||||
|
||||
// Atomically writes the current series state to disk.
|
||||
func (h *Head) writeSeriesState(cleanShutdown bool) {
|
||||
if h.wal == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Find the last segment number by checking the wal/ directory.
|
||||
last, _, err := h.wal.LastSegmentAndOffset()
|
||||
if err != nil {
|
||||
h.logger.Warn("Failed to get WAL segments for series state", "err", err)
|
||||
last = -1
|
||||
}
|
||||
|
||||
state := SeriesLifecycleState{
|
||||
LastSeriesID: h.lastSeriesID.Load(),
|
||||
LastWALSegment: last,
|
||||
CleanShutdown: cleanShutdown,
|
||||
}
|
||||
|
||||
path := filepath.Join(h.wal.Dir(), seriesStateFilename)
|
||||
tmpPath := path + ".tmp"
|
||||
|
||||
f, err := os.Create(tmpPath)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to create temp series state file", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(f).Encode(state); err != nil {
|
||||
h.logger.Error("Failed to encode series state", "err", err)
|
||||
f.Close()
|
||||
return
|
||||
}
|
||||
|
||||
f.Close()
|
||||
|
||||
if err := os.Rename(tmpPath, path); err != nil {
|
||||
h.logger.Error("Failed to rename the temporary series state file", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// runSeriesStateTicker writes the series state to disk every second.
|
||||
func (h *Head) runSeriesStateTicker() {
|
||||
defer h.seriesStateWg.Done()
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
h.writeSeriesState(false)
|
||||
case <-h.seriesStateQuit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue