Fixes, comments, touchups
Some checks failed
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Build Prometheus for common architectures (push) Has been cancelled
CI / Build Prometheus for all architectures (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled

Signed-off-by: Ganesh Vernekar <ganesh.vernekar@reddit.com>
This commit is contained in:
Ganesh Vernekar 2026-01-13 17:29:57 -08:00
parent 3f51be0f54
commit 222353a433
4 changed files with 40 additions and 27 deletions

View file

@ -100,7 +100,7 @@ func DefaultOptions() *Options {
// Options of the DB storage.
type Options struct {
// staleSeriesImmediateCompactionThreshold is same as below option with same name, but is atomic so that we can do live updates without locks.
// staleSeriesCompactionThreshold is same as below option with same name, but is atomic so that we can do live updates without locks.
// This is the one that must be used by the code.
staleSeriesCompactionThreshold atomic.Float64
@ -1172,7 +1172,6 @@ func (db *DB) run(ctx context.Context) {
}
// We attempt mmapping of head chunks regularly.
db.head.mmapHeadChunks()
case <-db.compactc:
db.metrics.compactionsTriggered.Inc()
@ -1188,7 +1187,6 @@ func (db *DB) run(ctx context.Context) {
db.metrics.compactionsSkipped.Inc()
}
db.autoCompactMtx.Unlock()
case <-db.stopc:
return
}
@ -1608,8 +1606,8 @@ func (db *DB) compactHead(head *RangeHead) error {
return nil
}
// CompactStaleHead compacts all the stale series that do no have out-of-order data into persistent blocks.
// If a stale series has out-of-order data, it is not possible to tell if the series stopped getting any data completely.
// CompactStaleHead compacts all the stale series into persistent blocks that do no have out-of-order data.
// If a stale series has out-of-order data, it is not possible to tell if the series stopped getting data completely.
func (db *DB) CompactStaleHead() error {
db.cmtx.Lock()
defer db.cmtx.Unlock()
@ -1617,6 +1615,8 @@ func (db *DB) CompactStaleHead() error {
db.logger.Info("Starting stale series compaction")
start := time.Now()
// We get the stale series reference first because this list can change during the compaction below.
// It is more efficient and easier to provide an index interface for the stale series when we have a static list.
staleSeriesRefs, err := db.head.SortedStaleSeriesRefsNoOOOData(context.Background())
if err != nil {
return err
@ -1650,7 +1650,7 @@ func (db *DB) CompactStaleHead() error {
}
db.head.RebuildSymbolTable(db.logger)
db.logger.Info("Ending stale series compaction", "duration", time.Since(start))
db.logger.Info("Ending stale series compaction", "num_series", meta.Stats.NumSeries, "duration", time.Since(start))
return nil
}

View file

@ -1203,14 +1203,23 @@ func (h *Head) truncateMemory(mint int64) (err error) {
return h.truncateSeriesAndChunkDiskMapper("truncateMemory")
}
// truncateStaleSeries removes the provided series as long as they are still stale.
func (h *Head) truncateStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) error {
h.chunkSnapshotMtx.Lock()
defer h.chunkSnapshotMtx.Unlock()
if h.MinTime() >= maxt {
return nil
}
h.WaitForPendingReadersInTimeRange(h.MinTime(), maxt)
deleted := h.gcStaleSeries(seriesRefs, maxt)
// Record these stale series refs in the WAL so that we can ignore them during replay.
if h.wal != nil {
stones := make([]tombstones.Stone, 0, len(seriesRefs))
for _, ref := range seriesRefs {
for ref := range deleted {
stones = append(stones, tombstones.Stone{
Ref: ref,
Intervals: tombstones.Intervals{{Mint: math.MinInt64, Maxt: math.MaxInt64}},
@ -1221,14 +1230,6 @@ func (h *Head) truncateStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) e
return err
}
}
if h.MinTime() >= maxt {
return nil
}
h.WaitForPendingReadersInTimeRange(h.MinTime(), maxt)
h.gcStaleSeries(seriesRefs, maxt)
return nil
}
@ -1732,9 +1733,10 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
return actualInOrderMint, minOOOTime, minMmapFile
}
// gcStaleSeries removes all the stale series provided given that they are still stale
// gcStaleSeries removes all the provided series as long as they are still stale
// and the series maxt is <= the given max.
func (h *Head) gcStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) {
// The returned references are the series that got deleted.
func (h *Head) gcStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) map[storage.SeriesRef]struct{} {
// Drop old chunks and remember series IDs and hashes if they can be
// deleted entirely.
deleted, affected, chunksRemoved := h.series.gcStaleSeries(seriesRefs, maxt)
@ -1766,6 +1768,8 @@ func (h *Head) gcStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) {
}
h.walExpiriesMtx.Unlock()
}
return deleted
}
// Tombstones returns a new reader over the head's tombstones.
@ -2198,7 +2202,8 @@ func (h *Head) deleteSeriesByID(refs []chunks.HeadSeriesRef) {
h.tombstones.DeleteTombstones(deleted)
}
// TODO: add comments.
// gcStaleSeries removes all the stale series provided that they are still stale
// and the series maxt is <= the given max.
func (s *stripeSeries) gcStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int) {
var (
deleted = map[storage.SeriesRef]struct{}{}

View file

@ -210,6 +210,9 @@ func (h *Head) staleIndex(mint, maxt int64, staleSeriesRefs []storage.SeriesRef)
}
// headStaleIndexReader gives the stale series that have no out-of-order data.
// This is only used for stale series compaction at the moment, that will only ask for all
// the series during compaction. So to make that efficient, this index reader requires the
// pre-calculated list of stale series refs that can be returned without re-reading the Head.
type headStaleIndexReader struct {
*headIndexReader
staleSeriesRefs []storage.SeriesRef
@ -249,7 +252,7 @@ func (h *headStaleIndexReader) PostingsForAllLabelValues(ctx context.Context, na
// filterStaleSeriesAndSortPostings returns the stale series references from the given postings
// that also do not have any out-of-order data.
func (h *Head) filterStaleSeriesAndSortPostings(p index.Postings) ([]storage.SeriesRef, error) {
series := make([]*memSeries, 0, 128)
series := make([]*memSeries, 0, 1024)
notFoundSeriesCount := 0
for p.Next() {
@ -261,7 +264,8 @@ func (h *Head) filterStaleSeriesAndSortPostings(p index.Postings) ([]storage.Ser
s.Lock()
if s.ooo != nil {
// Has out-of-order data; skip it.
// Has out-of-order data; skip it because we cannot determine if a series
// is stale when it's getting out-of-order data.
s.Unlock()
continue
}
@ -284,12 +288,11 @@ func (h *Head) filterStaleSeriesAndSortPostings(p index.Postings) ([]storage.Ser
return labels.Compare(a.labels(), b.labels())
})
// Convert back to list.
ep := make([]storage.SeriesRef, 0, len(series))
refs := make([]storage.SeriesRef, 0, len(series))
for _, p := range series {
ep = append(ep, storage.SeriesRef(p.ref))
refs = append(refs, storage.SeriesRef(p.ref))
}
return ep, nil
return refs, nil
}
// SortedPostings returns the postings as it is because we expect any postings obtained via
@ -307,7 +310,7 @@ func (h *Head) SortedStaleSeriesRefsNoOOOData(ctx context.Context) ([]storage.Se
func appendSeriesChunks(s *memSeries, mint, maxt int64, chks []chunks.Meta) []chunks.Meta {
for i, c := range s.mmappedChunks {
// Do not expose chunks that are outside the specified range.
// Do not expose chunks that are outside of the specified range.
if !c.OverlapsClosedInterval(mint, maxt) {
continue
}

View file

@ -308,14 +308,19 @@ Outer:
}
h.wlReplaySamplesPool.Put(v)
case []tombstones.Stone:
// TODO: what if the ref doesnt match a series directly? Check about multiref!
// Tombstone records will be fairly rare, so not trying to optimise the allocations here.
deleteSeriesShards := make([][]chunks.HeadSeriesRef, concurrency)
for _, s := range v {
if len(s.Intervals) == 1 && s.Intervals[0].Mint == math.MinInt64 && s.Intervals[0].Maxt == math.MaxInt64 {
// This series was fully deleted at this point.
// This series was fully deleted at this point. This record is only done for stale series at the moment.
mod := uint64(s.Ref) % uint64(concurrency)
deleteSeriesShards[mod] = append(deleteSeriesShards[mod], chunks.HeadSeriesRef(s.Ref))
// If the series is with a different reference, try deleting that.
if r, ok := multiRef[chunks.HeadSeriesRef(s.Ref)]; ok {
mod := uint64(r) % uint64(concurrency)
deleteSeriesShards[mod] = append(deleteSeriesShards[mod], r)
}
continue
}
for _, itv := range s.Intervals {