More comments and only reset deleted if the new segment is larger

Signed-off-by: Kyle Eckhart <kgeckhart@users.noreply.github.com>
This commit is contained in:
Kyle Eckhart 2026-02-23 15:34:44 -05:00
parent 8f68b4d409
commit 1e60d7fd3b

View file

@ -556,9 +556,16 @@ func (db *DB) loadWAL(r *wlog.Reader, duplicateRefToValidRef map[chunks.HeadSeri
series, created := db.series.GetOrSet(series.lset.Hash(), series)
if !created {
// We don't need to check if entry.Ref exists / if the value is not series.ref because GetOrSet
// enforces that the same labels will always get the same Ref. If we did not create a new ref
// the only possible ref it should ever be in the WAL is series.ref
duplicateRefToValidRef[entry.Ref] = series.ref
// Make sure we keep the duplicate SeriesRef while it exists in the WAL
db.deleted[entry.Ref] = currentSegmentOrCheckpoint
// We want to track the largest segment where we encountered the duplicate ref, so we can ensure
// it remains in the checkpoint until we get past that segment.
if db.deleted[entry.Ref] <= currentSegmentOrCheckpoint {
db.deleted[entry.Ref] = currentSegmentOrCheckpoint
}
} else {
db.metrics.numActiveSeries.Inc()
}
@ -567,8 +574,11 @@ func (db *DB) loadWAL(r *wlog.Reader, duplicateRefToValidRef map[chunks.HeadSeri
case []record.RefSample:
for _, entry := range v {
if ref, ok := duplicateRefToValidRef[entry.Ref]; ok {
// Make sure we keep the duplicate SeriesRef in checkpoints until we get past the current segment.
db.deleted[entry.Ref] = currentSegmentOrCheckpoint
// We want to track the largest segment where we encountered the duplicate ref, so we can ensure
// it remains in the checkpoint until we get past that segment.
if db.deleted[entry.Ref] <= currentSegmentOrCheckpoint {
db.deleted[entry.Ref] = currentSegmentOrCheckpoint
}
entry.Ref = ref
}
@ -587,8 +597,11 @@ func (db *DB) loadWAL(r *wlog.Reader, duplicateRefToValidRef map[chunks.HeadSeri
case []record.RefHistogramSample:
for _, entry := range v {
if ref, ok := duplicateRefToValidRef[entry.Ref]; ok {
// Make sure we keep the duplicate SeriesRef in checkpoints until we get past the current segment.
db.deleted[entry.Ref] = currentSegmentOrCheckpoint
// We want to track the largest segment where we encountered the duplicate ref, so we can ensure
// it remains in the checkpoint until we get past that segment.
if db.deleted[entry.Ref] <= currentSegmentOrCheckpoint {
db.deleted[entry.Ref] = currentSegmentOrCheckpoint
}
entry.Ref = ref
}
series := db.series.GetByID(entry.Ref)
@ -606,8 +619,11 @@ func (db *DB) loadWAL(r *wlog.Reader, duplicateRefToValidRef map[chunks.HeadSeri
case []record.RefFloatHistogramSample:
for _, entry := range v {
if ref, ok := duplicateRefToValidRef[entry.Ref]; ok {
// Make sure we keep the duplicate SeriesRef in checkpoints until we get past the current segment.
db.deleted[entry.Ref] = currentSegmentOrCheckpoint
// We want to track the largest segment where we encountered the duplicate ref, so we can ensure
// it remains in the checkpoint until we get past that segment.
if db.deleted[entry.Ref] <= currentSegmentOrCheckpoint {
db.deleted[entry.Ref] = currentSegmentOrCheckpoint
}
entry.Ref = ref
}
series := db.series.GetByID(entry.Ref)