From 652f79b54dc63623a82cc538c5efa15834d8e79a Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Mon, 18 Aug 2025 15:58:41 -0700 Subject: [PATCH] Attempt to start scraping without waiting for WAL replay Signed-off-by: Ganesh Vernekar --- tsdb/db.go | 63 ++++++++++++++++++++++++++++----------------- tsdb/head_append.go | 21 +++++++++++---- tsdb/head_wal.go | 7 +++++ 3 files changed, 63 insertions(+), 28 deletions(-) diff --git a/tsdb/db.go b/tsdb/db.go index 093ec5ab27..5b7ff9d2e3 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -285,6 +285,9 @@ type DB struct { blockQuerierFunc BlockQuerierFunc blockChunkQuerierFunc BlockChunkQuerierFunc + + isReady atomic.Bool + readyError atomic.Error } type dbMetrics struct { @@ -1006,34 +1009,40 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn minValidTime = inOrderMaxTime } - if initErr := db.head.Init(minValidTime); initErr != nil { - db.head.metrics.walCorruptionsTotal.Inc() - var e *errLoadWbl - if errors.As(initErr, &e) { - db.logger.Warn("Encountered WBL read error, attempting repair", "err", initErr) - if err := wbl.Repair(e.err); err != nil { - return nil, fmt.Errorf("repair corrupted WBL: %w", err) + go func() { + if initErr := db.head.Init(minValidTime); initErr != nil { + db.head.metrics.walCorruptionsTotal.Inc() + var e *errLoadWbl + if errors.As(initErr, &e) { + db.logger.Warn("Encountered WBL read error, attempting repair", "err", initErr) + if err := wbl.Repair(e.err); err != nil { + db.readyError.Store(fmt.Errorf("repair corrupted WBL: %w", err)) + return + } + db.logger.Info("Successfully repaired WBL") + } else { + db.logger.Warn("Encountered WAL read error, attempting repair", "err", initErr) + if err := wal.Repair(initErr); err != nil { + db.readyError.Store(fmt.Errorf("repair corrupted WAL: %w", err)) + return + } + db.logger.Info("Successfully repaired WAL") } - db.logger.Info("Successfully repaired WBL") - } else { - db.logger.Warn("Encountered WAL read error, attempting repair", "err", initErr) - if err := wal.Repair(initErr); err != nil { - return nil, fmt.Errorf("repair corrupted WAL: %w", err) - } - db.logger.Info("Successfully repaired WAL") } - } - if db.head.MinOOOTime() != int64(math.MaxInt64) { - // Some OOO data was replayed from the disk that needs compaction and cleanup. - db.oooWasEnabled.Store(true) - } + if db.head.MinOOOTime() != int64(math.MaxInt64) { + // Some OOO data was replayed from the disk that needs compaction and cleanup. + db.oooWasEnabled.Store(true) + } - if opts.EnableDelayedCompaction { - opts.CompactionDelay = db.generateCompactionDelay() - } + if opts.EnableDelayedCompaction { + opts.CompactionDelay = db.generateCompactionDelay() + } - go db.run(ctx) + db.run(ctx) + + db.isReady.Store(true) + }() return db, nil } @@ -1058,6 +1067,10 @@ func removeBestEffortTmpDirs(l *slog.Logger, dir string) error { return nil } +func (db *DB) Ready() (bool, error) { + return db.isReady.Load(), db.readyError.Load() +} + // StartTime implements the Storage interface. func (db *DB) StartTime() (int64, error) { db.mtx.RLock() @@ -2050,6 +2063,10 @@ func (db *DB) Snapshot(dir string, withHead bool) error { // Querier returns a new querier over the data partition for the given time range. func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { + if !db.isReady.Load() { + return nil, errors.New("db is not ready") + } + var blocks []BlockReader db.mtx.RLock() diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 43e523cae1..3867e714b8 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1222,7 +1222,7 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) { acc.floatsAppended-- } default: - ok, chunkCreated = series.append(s.T, s.V, a.appendID, acc.appendChunkOpts) + ok, chunkCreated = series.append(s.T, s.V, a.appendID, acc.appendChunkOpts, nil) if ok { if s.T < acc.inOrderMint { acc.inOrderMint = s.T @@ -1560,8 +1560,8 @@ type chunkOpts struct { // the appendID for isolation. (The appendID can be zero, which results in no // isolation for this append.) // Series lock must be held when calling. -func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { - c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, o) +func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts, headChunks *memChunk) (sampleInOrder, chunkCreated bool) { + c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, o, headChunks) if !sampleInOrder { return sampleInOrder, chunkCreated } @@ -1580,6 +1580,14 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sa return true, chunkCreated } +func (s *memSeries) prependHeadChunks(headChunks *memChunk) { + if s.headChunks == nil { + s.headChunks = headChunks + } else { + s.headChunks.oldest().prev = headChunks + } +} + // appendHistogram adds the histogram. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. // In case of recoding the existing chunk, a new chunk is allocated and the old chunk is dropped. @@ -1698,13 +1706,16 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, // number of samples they contain with a soft cap in bytes. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. // This should be called only when appending data. -func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts) (c *memChunk, sampleInOrder, chunkCreated bool) { +func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts, headChunks *memChunk) (c *memChunk, sampleInOrder, chunkCreated bool) { // We target chunkenc.MaxBytesPerXORChunk as a hard for the size of an XOR chunk. We must determine whether to cut // a new head chunk without knowing the size of the next sample, however, so we assume the next sample will be a // maximally-sized sample (19 bytes). const maxBytesPerXORChunk = chunkenc.MaxBytesPerXORChunk - 19 - c = s.headChunks + c = headChunks + if c == nil { + c = s.headChunks + } if c == nil { if len(s.mmappedChunks) > 0 && s.mmappedChunks[len(s.mmappedChunks)-1].maxTime >= t { diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index ee6557fdad..e1255583c0 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -599,6 +599,8 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp missingSeries := make(map[chunks.HeadSeriesRef]struct{}) var unknownSampleRefs, unknownHistogramRefs, mmapOverlappingChunks uint64 + headChunksMap := map[chunks.HeadSeriesRef]*memChunk{} + minValidTime := h.minValidTime.Load() mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) appendChunkOpts := chunkOpts{ @@ -627,6 +629,9 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp if s.T <= ms.mmMaxTime { continue } + + // TODO: change this to append to a temporary set of chunks and not clash with the head chunks. + // TODO: Even if you pass a temporary chunk, we need the temporary appender. Fix that. if _, chunkCreated := ms.append(s.T, s.V, 0, appendChunkOpts); chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() @@ -659,8 +664,10 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp } var chunkCreated bool if s.h != nil { + // TODO: change this to append to a temporary set of chunks and not clash with the head chunks. _, chunkCreated = ms.appendHistogram(s.t, s.h, 0, appendChunkOpts) } else { + // TODO: change this to append to a temporary set of chunks and not clash with the head chunks. _, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, appendChunkOpts) } if chunkCreated {