From 222353a433fa7058a4683be4b8fa7455fc8bf7fe Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Tue, 13 Jan 2026 17:29:57 -0800 Subject: [PATCH] Fixes, comments, touchups Signed-off-by: Ganesh Vernekar --- tsdb/db.go | 12 ++++++------ tsdb/head.go | 29 +++++++++++++++++------------ tsdb/head_read.go | 17 ++++++++++------- tsdb/head_wal.go | 9 +++++++-- 4 files changed, 40 insertions(+), 27 deletions(-) diff --git a/tsdb/db.go b/tsdb/db.go index 23240358ef..0c6957a53a 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -100,7 +100,7 @@ func DefaultOptions() *Options { // Options of the DB storage. type Options struct { - // staleSeriesImmediateCompactionThreshold is same as below option with same name, but is atomic so that we can do live updates without locks. + // staleSeriesCompactionThreshold is same as below option with same name, but is atomic so that we can do live updates without locks. // This is the one that must be used by the code. staleSeriesCompactionThreshold atomic.Float64 @@ -1172,7 +1172,6 @@ func (db *DB) run(ctx context.Context) { } // We attempt mmapping of head chunks regularly. db.head.mmapHeadChunks() - case <-db.compactc: db.metrics.compactionsTriggered.Inc() @@ -1188,7 +1187,6 @@ func (db *DB) run(ctx context.Context) { db.metrics.compactionsSkipped.Inc() } db.autoCompactMtx.Unlock() - case <-db.stopc: return } @@ -1608,8 +1606,8 @@ func (db *DB) compactHead(head *RangeHead) error { return nil } -// CompactStaleHead compacts all the stale series that do no have out-of-order data into persistent blocks. -// If a stale series has out-of-order data, it is not possible to tell if the series stopped getting any data completely. +// CompactStaleHead compacts all the stale series into persistent blocks that do no have out-of-order data. +// If a stale series has out-of-order data, it is not possible to tell if the series stopped getting data completely. func (db *DB) CompactStaleHead() error { db.cmtx.Lock() defer db.cmtx.Unlock() @@ -1617,6 +1615,8 @@ func (db *DB) CompactStaleHead() error { db.logger.Info("Starting stale series compaction") start := time.Now() + // We get the stale series reference first because this list can change during the compaction below. + // It is more efficient and easier to provide an index interface for the stale series when we have a static list. staleSeriesRefs, err := db.head.SortedStaleSeriesRefsNoOOOData(context.Background()) if err != nil { return err @@ -1650,7 +1650,7 @@ func (db *DB) CompactStaleHead() error { } db.head.RebuildSymbolTable(db.logger) - db.logger.Info("Ending stale series compaction", "duration", time.Since(start)) + db.logger.Info("Ending stale series compaction", "num_series", meta.Stats.NumSeries, "duration", time.Since(start)) return nil } diff --git a/tsdb/head.go b/tsdb/head.go index 5ab5d9dc34..77b49ca1cb 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1203,14 +1203,23 @@ func (h *Head) truncateMemory(mint int64) (err error) { return h.truncateSeriesAndChunkDiskMapper("truncateMemory") } +// truncateStaleSeries removes the provided series as long as they are still stale. func (h *Head) truncateStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) error { h.chunkSnapshotMtx.Lock() defer h.chunkSnapshotMtx.Unlock() + if h.MinTime() >= maxt { + return nil + } + + h.WaitForPendingReadersInTimeRange(h.MinTime(), maxt) + + deleted := h.gcStaleSeries(seriesRefs, maxt) + // Record these stale series refs in the WAL so that we can ignore them during replay. if h.wal != nil { stones := make([]tombstones.Stone, 0, len(seriesRefs)) - for _, ref := range seriesRefs { + for ref := range deleted { stones = append(stones, tombstones.Stone{ Ref: ref, Intervals: tombstones.Intervals{{Mint: math.MinInt64, Maxt: math.MaxInt64}}, @@ -1221,14 +1230,6 @@ func (h *Head) truncateStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) e return err } } - - if h.MinTime() >= maxt { - return nil - } - - h.WaitForPendingReadersInTimeRange(h.MinTime(), maxt) - - h.gcStaleSeries(seriesRefs, maxt) return nil } @@ -1732,9 +1733,10 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) { return actualInOrderMint, minOOOTime, minMmapFile } -// gcStaleSeries removes all the stale series provided given that they are still stale +// gcStaleSeries removes all the provided series as long as they are still stale // and the series maxt is <= the given max. -func (h *Head) gcStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) { +// The returned references are the series that got deleted. +func (h *Head) gcStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) map[storage.SeriesRef]struct{} { // Drop old chunks and remember series IDs and hashes if they can be // deleted entirely. deleted, affected, chunksRemoved := h.series.gcStaleSeries(seriesRefs, maxt) @@ -1766,6 +1768,8 @@ func (h *Head) gcStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) { } h.walExpiriesMtx.Unlock() } + + return deleted } // Tombstones returns a new reader over the head's tombstones. @@ -2198,7 +2202,8 @@ func (h *Head) deleteSeriesByID(refs []chunks.HeadSeriesRef) { h.tombstones.DeleteTombstones(deleted) } -// TODO: add comments. +// gcStaleSeries removes all the stale series provided that they are still stale +// and the series maxt is <= the given max. func (s *stripeSeries) gcStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int) { var ( deleted = map[storage.SeriesRef]struct{}{} diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 48f842430a..f0a1331fbb 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -210,6 +210,9 @@ func (h *Head) staleIndex(mint, maxt int64, staleSeriesRefs []storage.SeriesRef) } // headStaleIndexReader gives the stale series that have no out-of-order data. +// This is only used for stale series compaction at the moment, that will only ask for all +// the series during compaction. So to make that efficient, this index reader requires the +// pre-calculated list of stale series refs that can be returned without re-reading the Head. type headStaleIndexReader struct { *headIndexReader staleSeriesRefs []storage.SeriesRef @@ -249,7 +252,7 @@ func (h *headStaleIndexReader) PostingsForAllLabelValues(ctx context.Context, na // filterStaleSeriesAndSortPostings returns the stale series references from the given postings // that also do not have any out-of-order data. func (h *Head) filterStaleSeriesAndSortPostings(p index.Postings) ([]storage.SeriesRef, error) { - series := make([]*memSeries, 0, 128) + series := make([]*memSeries, 0, 1024) notFoundSeriesCount := 0 for p.Next() { @@ -261,7 +264,8 @@ func (h *Head) filterStaleSeriesAndSortPostings(p index.Postings) ([]storage.Ser s.Lock() if s.ooo != nil { - // Has out-of-order data; skip it. + // Has out-of-order data; skip it because we cannot determine if a series + // is stale when it's getting out-of-order data. s.Unlock() continue } @@ -284,12 +288,11 @@ func (h *Head) filterStaleSeriesAndSortPostings(p index.Postings) ([]storage.Ser return labels.Compare(a.labels(), b.labels()) }) - // Convert back to list. - ep := make([]storage.SeriesRef, 0, len(series)) + refs := make([]storage.SeriesRef, 0, len(series)) for _, p := range series { - ep = append(ep, storage.SeriesRef(p.ref)) + refs = append(refs, storage.SeriesRef(p.ref)) } - return ep, nil + return refs, nil } // SortedPostings returns the postings as it is because we expect any postings obtained via @@ -307,7 +310,7 @@ func (h *Head) SortedStaleSeriesRefsNoOOOData(ctx context.Context) ([]storage.Se func appendSeriesChunks(s *memSeries, mint, maxt int64, chks []chunks.Meta) []chunks.Meta { for i, c := range s.mmappedChunks { - // Do not expose chunks that are outside the specified range. + // Do not expose chunks that are outside of the specified range. if !c.OverlapsClosedInterval(mint, maxt) { continue } diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 5802a570e7..b323f0dbf6 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -308,14 +308,19 @@ Outer: } h.wlReplaySamplesPool.Put(v) case []tombstones.Stone: - // TODO: what if the ref doesnt match a series directly? Check about multiref! // Tombstone records will be fairly rare, so not trying to optimise the allocations here. deleteSeriesShards := make([][]chunks.HeadSeriesRef, concurrency) for _, s := range v { if len(s.Intervals) == 1 && s.Intervals[0].Mint == math.MinInt64 && s.Intervals[0].Maxt == math.MaxInt64 { - // This series was fully deleted at this point. + // This series was fully deleted at this point. This record is only done for stale series at the moment. mod := uint64(s.Ref) % uint64(concurrency) deleteSeriesShards[mod] = append(deleteSeriesShards[mod], chunks.HeadSeriesRef(s.Ref)) + + // If the series is with a different reference, try deleting that. + if r, ok := multiRef[chunks.HeadSeriesRef(s.Ref)]; ok { + mod := uint64(r) % uint64(concurrency) + deleteSeriesShards[mod] = append(deleteSeriesShards[mod], r) + } continue } for _, itv := range s.Intervals {