Attempt to start scraping without waiting for WAL replay

Signed-off-by: Ganesh Vernekar <ganesh.vernekar@reddit.com>
This commit is contained in:
Ganesh Vernekar 2025-08-18 15:58:41 -07:00
parent 93bbf4bc90
commit 652f79b54d
3 changed files with 63 additions and 28 deletions

View file

@ -285,6 +285,9 @@ type DB struct {
blockQuerierFunc BlockQuerierFunc
blockChunkQuerierFunc BlockChunkQuerierFunc
isReady atomic.Bool
readyError atomic.Error
}
type dbMetrics struct {
@ -1006,34 +1009,40 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn
minValidTime = inOrderMaxTime
}
if initErr := db.head.Init(minValidTime); initErr != nil {
db.head.metrics.walCorruptionsTotal.Inc()
var e *errLoadWbl
if errors.As(initErr, &e) {
db.logger.Warn("Encountered WBL read error, attempting repair", "err", initErr)
if err := wbl.Repair(e.err); err != nil {
return nil, fmt.Errorf("repair corrupted WBL: %w", err)
go func() {
if initErr := db.head.Init(minValidTime); initErr != nil {
db.head.metrics.walCorruptionsTotal.Inc()
var e *errLoadWbl
if errors.As(initErr, &e) {
db.logger.Warn("Encountered WBL read error, attempting repair", "err", initErr)
if err := wbl.Repair(e.err); err != nil {
db.readyError.Store(fmt.Errorf("repair corrupted WBL: %w", err))
return
}
db.logger.Info("Successfully repaired WBL")
} else {
db.logger.Warn("Encountered WAL read error, attempting repair", "err", initErr)
if err := wal.Repair(initErr); err != nil {
db.readyError.Store(fmt.Errorf("repair corrupted WAL: %w", err))
return
}
db.logger.Info("Successfully repaired WAL")
}
db.logger.Info("Successfully repaired WBL")
} else {
db.logger.Warn("Encountered WAL read error, attempting repair", "err", initErr)
if err := wal.Repair(initErr); err != nil {
return nil, fmt.Errorf("repair corrupted WAL: %w", err)
}
db.logger.Info("Successfully repaired WAL")
}
}
if db.head.MinOOOTime() != int64(math.MaxInt64) {
// Some OOO data was replayed from the disk that needs compaction and cleanup.
db.oooWasEnabled.Store(true)
}
if db.head.MinOOOTime() != int64(math.MaxInt64) {
// Some OOO data was replayed from the disk that needs compaction and cleanup.
db.oooWasEnabled.Store(true)
}
if opts.EnableDelayedCompaction {
opts.CompactionDelay = db.generateCompactionDelay()
}
if opts.EnableDelayedCompaction {
opts.CompactionDelay = db.generateCompactionDelay()
}
go db.run(ctx)
db.run(ctx)
db.isReady.Store(true)
}()
return db, nil
}
@ -1058,6 +1067,10 @@ func removeBestEffortTmpDirs(l *slog.Logger, dir string) error {
return nil
}
func (db *DB) Ready() (bool, error) {
return db.isReady.Load(), db.readyError.Load()
}
// StartTime implements the Storage interface.
func (db *DB) StartTime() (int64, error) {
db.mtx.RLock()
@ -2050,6 +2063,10 @@ func (db *DB) Snapshot(dir string, withHead bool) error {
// Querier returns a new querier over the data partition for the given time range.
func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
if !db.isReady.Load() {
return nil, errors.New("db is not ready")
}
var blocks []BlockReader
db.mtx.RLock()

View file

@ -1222,7 +1222,7 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) {
acc.floatsAppended--
}
default:
ok, chunkCreated = series.append(s.T, s.V, a.appendID, acc.appendChunkOpts)
ok, chunkCreated = series.append(s.T, s.V, a.appendID, acc.appendChunkOpts, nil)
if ok {
if s.T < acc.inOrderMint {
acc.inOrderMint = s.T
@ -1560,8 +1560,8 @@ type chunkOpts struct {
// the appendID for isolation. (The appendID can be zero, which results in no
// isolation for this append.)
// Series lock must be held when calling.
func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, o)
func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts, headChunks *memChunk) (sampleInOrder, chunkCreated bool) {
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, o, headChunks)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
@ -1580,6 +1580,14 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sa
return true, chunkCreated
}
func (s *memSeries) prependHeadChunks(headChunks *memChunk) {
if s.headChunks == nil {
s.headChunks = headChunks
} else {
s.headChunks.oldest().prev = headChunks
}
}
// appendHistogram adds the histogram.
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
// In case of recoding the existing chunk, a new chunk is allocated and the old chunk is dropped.
@ -1698,13 +1706,16 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram,
// number of samples they contain with a soft cap in bytes.
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
// This should be called only when appending data.
func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts) (c *memChunk, sampleInOrder, chunkCreated bool) {
func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts, headChunks *memChunk) (c *memChunk, sampleInOrder, chunkCreated bool) {
// We target chunkenc.MaxBytesPerXORChunk as a hard for the size of an XOR chunk. We must determine whether to cut
// a new head chunk without knowing the size of the next sample, however, so we assume the next sample will be a
// maximally-sized sample (19 bytes).
const maxBytesPerXORChunk = chunkenc.MaxBytesPerXORChunk - 19
c = s.headChunks
c = headChunks
if c == nil {
c = s.headChunks
}
if c == nil {
if len(s.mmappedChunks) > 0 && s.mmappedChunks[len(s.mmappedChunks)-1].maxTime >= t {

View file

@ -599,6 +599,8 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
missingSeries := make(map[chunks.HeadSeriesRef]struct{})
var unknownSampleRefs, unknownHistogramRefs, mmapOverlappingChunks uint64
headChunksMap := map[chunks.HeadSeriesRef]*memChunk{}
minValidTime := h.minValidTime.Load()
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
appendChunkOpts := chunkOpts{
@ -627,6 +629,9 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
if s.T <= ms.mmMaxTime {
continue
}
// TODO: change this to append to a temporary set of chunks and not clash with the head chunks.
// TODO: Even if you pass a temporary chunk, we need the temporary appender. Fix that.
if _, chunkCreated := ms.append(s.T, s.V, 0, appendChunkOpts); chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
@ -659,8 +664,10 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
}
var chunkCreated bool
if s.h != nil {
// TODO: change this to append to a temporary set of chunks and not clash with the head chunks.
_, chunkCreated = ms.appendHistogram(s.t, s.h, 0, appendChunkOpts)
} else {
// TODO: change this to append to a temporary set of chunks and not clash with the head chunks.
_, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, appendChunkOpts)
}
if chunkCreated {