tsdb/agent: Prevent duplicate SeriesRefs from being lost in stripeSeries (#17538)

* Show the agent db can hold duplicate series by hash

Signed-off-by: Kyle Eckhart <kgeckhart@users.noreply.github.com>

* Prevent duplicate SeriesRefs from being lost in db stripeSeries

Signed-off-by: Kyle Eckhart <kgeckhart@users.noreply.github.com>

* Drop default initialized value

Signed-off-by: Kyle Eckhart <kgeckhart@users.noreply.github.com>

* More comments and only reset deleted if the new segment is larger

Signed-off-by: Kyle Eckhart <kgeckhart@users.noreply.github.com>

* Manually manage db/rw to prevent windows test error

Signed-off-by: Kyle Eckhart <kgeckhart@users.noreply.github.com>

* Fix incorrect type from rebase

Signed-off-by: Kyle Eckhart <kgeckhart@users.noreply.github.com>

* Use Set in GetOrSet to enforce proper lock ordering

Signed-off-by: Kyle Eckhart <kgeckhart@users.noreply.github.com>

* Missing period and left over refactor

Signed-off-by: Kyle Eckhart <kgeckhart@users.noreply.github.com>

---------

Signed-off-by: Kyle Eckhart <kgeckhart@users.noreply.github.com>
This commit is contained in:
George Krajcsovits 2026-03-10 14:31:48 +01:00 committed by GitHub
commit 7cbe0e3a5c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 187 additions and 33 deletions

View file

@ -403,7 +403,7 @@ func (db *DB) replayWAL() error {
return fmt.Errorf("find last checkpoint: %w", err)
}
multiRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{}
duplicateRefToValidRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{}
if err == nil {
sr, err := wlog.NewSegmentsReader(dir)
@ -418,7 +418,7 @@ func (db *DB) replayWAL() error {
// A corrupted checkpoint is a hard error for now and requires user
// intervention. There's likely little data that can be recovered anyway.
if err := db.loadWAL(wlog.NewReader(sr), multiRef); err != nil {
if err := db.loadWAL(wlog.NewReader(sr), duplicateRefToValidRef, startFrom); err != nil {
return fmt.Errorf("backfill checkpoint: %w", err)
}
startFrom++
@ -439,7 +439,7 @@ func (db *DB) replayWAL() error {
}
sr := wlog.NewSegmentBufReader(seg)
err = db.loadWAL(wlog.NewReader(sr), multiRef)
err = db.loadWAL(wlog.NewReader(sr), duplicateRefToValidRef, i)
if err := sr.Close(); err != nil {
db.logger.Warn("error while closing the wal segments reader", "err", err)
}
@ -462,7 +462,7 @@ func (db *DB) resetWALReplayResources() {
db.walReplayFloatHistogramsPool = zeropool.Pool[[]record.RefFloatHistogramSample]{}
}
func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef) (err error) {
func (db *DB) loadWAL(r *wlog.Reader, duplicateRefToValidRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, currentSegmentOrCheckpoint int) (err error) {
var (
syms = labels.NewSymbolTable() // One table for the whole WAL.
dec = record.NewDecoder(syms, db.logger)
@ -547,29 +547,48 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
switch v := d.(type) {
case []record.RefSeries:
for _, entry := range v {
// If this is a new series, create it in memory. If we never read in a
// sample for this series, its timestamp will remain at 0 and it will
// be deleted at the next GC.
if db.series.GetByID(entry.Ref) == nil {
series := &memSeries{ref: entry.Ref, lset: entry.Labels, lastTs: 0}
db.series.Set(entry.Labels.Hash(), series)
multiRef[entry.Ref] = series.ref
db.metrics.numActiveSeries.Inc()
if entry.Ref > lastRef {
lastRef = entry.Ref
// Make sure we don't try to reuse a Ref that already exists in the WAL.
if entry.Ref > lastRef {
lastRef = entry.Ref
}
series := &memSeries{ref: entry.Ref, lset: entry.Labels}
series, created := db.series.GetOrSet(series.lset.Hash(), series)
if !created {
// We don't need to check if entry.Ref exists / if the value is not series.ref because GetOrSet
// enforces that the same labels will always get the same Ref. If we did not create a new ref
// the only possible ref it should ever be in the WAL is series.ref.
duplicateRefToValidRef[entry.Ref] = series.ref
// We want to track the largest segment where we encountered the duplicate ref, so we can ensure
// it remains in the checkpoint until we get past that segment.
if db.deleted[entry.Ref] <= currentSegmentOrCheckpoint {
db.deleted[entry.Ref] = currentSegmentOrCheckpoint
}
} else {
db.metrics.numActiveSeries.Inc()
}
}
db.walReplaySeriesPool.Put(v)
case []record.RefSample:
for _, entry := range v {
// Update the lastTs for the series based
ref, ok := multiRef[entry.Ref]
if !ok {
if ref, ok := duplicateRefToValidRef[entry.Ref]; ok {
// We want to track the largest segment where we encountered the duplicate ref, so we can ensure
// it remains in the checkpoint until we get past that segment.
if db.deleted[entry.Ref] <= currentSegmentOrCheckpoint {
db.deleted[entry.Ref] = currentSegmentOrCheckpoint
}
entry.Ref = ref
}
series := db.series.GetByID(entry.Ref)
if series == nil {
nonExistentSeriesRefs.Inc()
continue
}
series := db.series.GetByID(ref)
// Update the lastTs for the series if this sample is newer.
if entry.T > series.lastTs {
series.lastTs = entry.T
}
@ -577,13 +596,21 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
db.walReplaySamplesPool.Put(v)
case []record.RefHistogramSample:
for _, entry := range v {
// Update the lastTs for the series based
ref, ok := multiRef[entry.Ref]
if !ok {
if ref, ok := duplicateRefToValidRef[entry.Ref]; ok {
// We want to track the largest segment where we encountered the duplicate ref, so we can ensure
// it remains in the checkpoint until we get past that segment.
if db.deleted[entry.Ref] <= currentSegmentOrCheckpoint {
db.deleted[entry.Ref] = currentSegmentOrCheckpoint
}
entry.Ref = ref
}
series := db.series.GetByID(entry.Ref)
if series == nil {
nonExistentSeriesRefs.Inc()
continue
}
series := db.series.GetByID(ref)
// Update the lastTs for the series if this sample is newer.
if entry.T > series.lastTs {
series.lastTs = entry.T
}
@ -591,13 +618,21 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
db.walReplayHistogramsPool.Put(v)
case []record.RefFloatHistogramSample:
for _, entry := range v {
// Update the lastTs for the series based
ref, ok := multiRef[entry.Ref]
if !ok {
if ref, ok := duplicateRefToValidRef[entry.Ref]; ok {
// We want to track the largest segment where we encountered the duplicate ref, so we can ensure
// it remains in the checkpoint until we get past that segment.
if db.deleted[entry.Ref] <= currentSegmentOrCheckpoint {
db.deleted[entry.Ref] = currentSegmentOrCheckpoint
}
entry.Ref = ref
}
series := db.series.GetByID(entry.Ref)
if series == nil {
nonExistentSeriesRefs.Inc()
continue
}
series := db.series.GetByID(ref)
// Update the lastTs for the series if this sample is newer.
if entry.T > series.lastTs {
series.lastTs = entry.T
}

View file

@ -92,6 +92,7 @@ func createTestAgentDB(t testing.TB, reg prometheus.Registerer, opts *Options) *
t.Helper()
dbDir := t.TempDir()
rs := remote.NewStorage(promslog.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil, false)
t.Cleanup(func() {
require.NoError(t, rs.Close())
@ -1317,6 +1318,86 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) {
}
}
func TestDuplicateSeriesRefsByHash(t *testing.T) {
dbDir := t.TempDir()
opts := DefaultOptions()
rs1 := remote.NewStorage(promslog.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil, false)
db, err := Open(promslog.NewNopLogger(), nil, rs1, dbDir, opts)
require.NoError(t, err)
app := db.Appender(context.Background())
metricNames := []string{"foo", "bar", "baz", "blerg"}
originalSeriesRefs := make([]chunks.HeadSeriesRef, 0, len(metricNames))
for _, metricName := range metricNames {
lbls := labels.FromMap(map[string]string{"__name__": metricName})
ref, err := app.Append(storage.SeriesRef(0), lbls, int64(0), 10.0)
require.NoError(t, err)
originalSeriesRefs = append(originalSeriesRefs, chunks.HeadSeriesRef(ref))
ref2, err := app.Append(ref, lbls, int64(10), 100.0)
require.NoError(t, err)
require.Equal(t, ref, ref2)
}
require.NoError(t, app.Commit())
// Forcefully create a bunch of new segments to force a truncation.
for range 3 {
_, err := db.wal.NextSegmentSync()
require.NoError(t, err)
}
// No series should be deleted yet
require.Empty(t, db.deleted)
// Truncate at 1 ms higher than the highest timestamp.
err = db.truncate(11)
require.NoError(t, err)
// The original SeriesRefs should be considered deleted.
for _, ref := range originalSeriesRefs {
require.Nil(t, db.series.GetByID(ref))
require.Contains(t, db.deleted, ref)
}
duplicateSeriesRefs := make([]chunks.HeadSeriesRef, 0, len(metricNames))
for _, metricName := range metricNames {
lbls := labels.FromMap(map[string]string{"__name__": metricName})
ref, err := app.Append(storage.SeriesRef(0), lbls, int64(20), 10.0)
require.NoError(t, err)
duplicateSeriesRefs = append(duplicateSeriesRefs, chunks.HeadSeriesRef(ref))
}
require.NoError(t, app.Commit())
// The duplicate SeriesRefs should be in series.
for _, ref := range duplicateSeriesRefs {
require.NotNil(t, db.series.GetByID(ref))
}
// Close the WAL before we have a chance to remove the original RefIDs.
// Both db and rs1 must be closed to release all file handles before
// reopening the same directory — important on Windows.
require.NoError(t, db.Close())
require.NoError(t, rs1.Close())
rs2 := remote.NewStorage(promslog.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil, false)
t.Cleanup(func() { require.NoError(t, rs2.Close()) })
db, err = Open(promslog.NewNopLogger(), nil, rs2, dbDir, opts)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
// The original SeriesRefs should be in series.
for _, ref := range originalSeriesRefs {
require.NotNil(t, db.series.GetByID(ref))
require.NotContains(t, db.deleted, ref)
}
// The duplicated SeriesRefs should be considered deleted.
for _, ref := range duplicateSeriesRefs {
require.Nil(t, db.series.GetByID(ref))
require.Contains(t, db.deleted, ref)
}
}
func readWALSamples(t *testing.T, walDir string) []walSample {
t.Helper()
sr, err := wlog.NewSegmentsReader(walDir)

View file

@ -182,7 +182,7 @@ func (s *stripeSeries) GC(mint int64) map[chunks.HeadSeriesRef]struct{} {
// The series is stale. We need to obtain a second lock for the
// ref if it's different than the hash lock.
refLock := int(series.ref) & (s.size - 1)
refLock := int(s.refLock(series.ref))
if hashLock != refLock {
s.locks[refLock].Lock()
}
@ -220,14 +220,14 @@ func (s *stripeSeries) GC(mint int64) map[chunks.HeadSeriesRef]struct{} {
}
func (s *stripeSeries) GetByID(id chunks.HeadSeriesRef) *memSeries {
refLock := uint64(id) & uint64(s.size-1)
refLock := s.refLock(id)
s.locks[refLock].RLock()
defer s.locks[refLock].RUnlock()
return s.series[refLock][id]
}
func (s *stripeSeries) GetByHash(hash uint64, lset labels.Labels) *memSeries {
hashLock := hash & uint64(s.size-1)
hashLock := s.hashLock(hash)
s.locks[hashLock].RLock()
defer s.locks[hashLock].RUnlock()
@ -236,8 +236,8 @@ func (s *stripeSeries) GetByHash(hash uint64, lset labels.Labels) *memSeries {
func (s *stripeSeries) Set(hash uint64, series *memSeries) {
var (
hashLock = hash & uint64(s.size-1)
refLock = uint64(series.ref) & uint64(s.size-1)
hashLock = s.hashLock(hash)
refLock = s.refLock(series.ref)
)
// We can't hold both locks at once otherwise we might deadlock with a
@ -254,8 +254,24 @@ func (s *stripeSeries) Set(hash uint64, series *memSeries) {
s.locks[hashLock].Unlock()
}
// GetOrSet returns the existing series for the given label set, or sets it if it does not exist.
// It returns the series and a boolean indicating whether it was newly created.
func (s *stripeSeries) GetOrSet(hash uint64, series *memSeries) (*memSeries, bool) {
hashLock := s.hashLock(hash)
s.locks[hashLock].Lock()
if prev := s.hashes[hashLock].Get(hash, series.lset); prev != nil {
s.locks[hashLock].Unlock()
return prev, false
}
s.locks[hashLock].Unlock()
s.Set(hash, series)
return series, true
}
func (s *stripeSeries) GetLatestExemplar(ref chunks.HeadSeriesRef) *exemplar.Exemplar {
i := uint64(ref) & uint64(s.size-1)
i := s.refLock(ref)
s.locks[i].RLock()
exemplar := s.exemplars[i][ref]
@ -265,7 +281,7 @@ func (s *stripeSeries) GetLatestExemplar(ref chunks.HeadSeriesRef) *exemplar.Exe
}
func (s *stripeSeries) SetLatestExemplar(ref chunks.HeadSeriesRef, exemplar *exemplar.Exemplar) {
i := uint64(ref) & uint64(s.size-1)
i := s.refLock(ref)
// Make sure that's a valid series id and record its latest exemplar
s.locks[i].Lock()
@ -274,3 +290,11 @@ func (s *stripeSeries) SetLatestExemplar(ref chunks.HeadSeriesRef, exemplar *exe
}
s.locks[i].Unlock()
}
func (s *stripeSeries) hashLock(hash uint64) uint64 {
return hash & uint64(s.size-1)
}
func (s *stripeSeries) refLock(ref chunks.HeadSeriesRef) uint64 {
return uint64(ref) & uint64(s.size-1)
}

View file

@ -122,6 +122,20 @@ func TestStripeSeries_Get(t *testing.T) {
require.Same(t, ms2, got)
}
func TestStripeSeries_GetOrSet(t *testing.T) {
lbl := labels.FromStrings("__name__", "metric", "lbl", "HFnEaGl")
ss := newStripeSeries(1)
ms, created := ss.GetOrSet(lbl.Hash(), &memSeries{ref: chunks.HeadSeriesRef(1), lset: lbl})
require.True(t, created)
require.Equal(t, lbl, ms.lset)
ms2, created := ss.GetOrSet(lbl.Hash(), &memSeries{ref: chunks.HeadSeriesRef(2), lset: lbl})
require.False(t, created)
require.Equal(t, ms, ms2)
}
func TestStripeSeries_gc(t *testing.T) {
s, ms1, ms2 := stripeSeriesWithCollidingSeries(t)
hash := ms1.lset.Hash()