From 53f89055c30f450b1b32b2c17cc89b638f09fc67 Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Fri, 14 Nov 2025 16:20:46 -0500 Subject: [PATCH 1/8] Show the agent db can hold duplicate series by hash Signed-off-by: Kyle Eckhart --- tsdb/agent/db_test.go | 80 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 78 insertions(+), 2 deletions(-) diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index 31e309d3fd..9ba71721f5 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -88,10 +88,15 @@ func TestDB_InvalidSeries(t *testing.T) { }) } -func createTestAgentDB(t testing.TB, reg prometheus.Registerer, opts *Options) *DB { +func createTestAgentDB(t testing.TB, reg prometheus.Registerer, opts *Options, dir ...string) *DB { t.Helper() dbDir := t.TempDir() + + if len(dir) > 0 && dir[0] != "" { + dbDir = dir[0] + } + rs := remote.NewStorage(promslog.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil, false) t.Cleanup(func() { require.NoError(t, rs.Close()) @@ -1317,7 +1322,78 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) { } } -func readWALSamples(t *testing.T, walDir string) []walSample { +func TestDuplicateSeriesRefsByHash(t *testing.T) { + dbDir := t.TempDir() + opts := DefaultOptions() + opts.OutOfOrderTimeWindow = 360_000 + db := createTestAgentDB(t, nil, opts, dbDir) + + app := db.Appender(context.Background()) + + metricNames := []string{"foo", "bar", "baz", "blerg"} + originalSeriesRefs := make([]chunks.HeadSeriesRef, 0, len(metricNames)) + for _, metricName := range metricNames { + lbls := labels.FromMap(map[string]string{"__name__": metricName}) + + ref, err := app.Append(storage.SeriesRef(0), lbls, int64(0), 10.0) + require.NoError(t, err) + originalSeriesRefs = append(originalSeriesRefs, chunks.HeadSeriesRef(ref)) + ref2, err := app.Append(ref, lbls, int64(10), 100.0) + require.NoError(t, err) + require.Equal(t, ref, ref2) + } + require.NoError(t, app.Commit()) + + // Forcefully create a bunch of new segments to force a truncation + for i := 0; i < 3; i++ { + _, err := db.wal.NextSegmentSync() + require.NoError(t, err) + } + // No series should be deleted yet + require.Equal(t, 0, len(db.deleted)) + + // Truncate at 1 ms higher than the highest timestamp + err := db.truncate(11) + require.NoError(t, err) + + // The original SeriesRefs should be considered deleted + for _, ref := range originalSeriesRefs { + require.Nil(t, db.series.GetByID(ref)) + require.Contains(t, db.deleted, ref) + } + + duplicateSeriesRefs := make([]chunks.HeadSeriesRef, 0, len(metricNames)) + for _, metricName := range metricNames { + lbls := labels.FromMap(map[string]string{"__name__": metricName}) + ref, err := app.Append(storage.SeriesRef(0), lbls, int64(20), 10.0) + require.NoError(t, err) + duplicateSeriesRefs = append(duplicateSeriesRefs, chunks.HeadSeriesRef(ref)) + } + require.NoError(t, app.Commit()) + + // The duplicate SeriesRefs should be in series + for _, ref := range duplicateSeriesRefs { + require.NotNil(t, db.series.GetByID(ref)) + } + + // Close the WAL before we have a chance to remove the original RefIDs + require.NoError(t, db.Close()) + + db = createTestAgentDB(t, nil, opts, dbDir) + // The original SeriesRefs should be in series + for _, ref := range originalSeriesRefs { + require.NotNil(t, db.series.GetByID(ref)) + require.NotContains(t, db.deleted, ref) + } + + // The duplicated SeriesRefs should be considered deleted + for _, ref := range duplicateSeriesRefs { + require.Nil(t, db.series.GetByID(ref)) + require.Contains(t, db.deleted, ref) + } +} + +func readWALSamples(t *testing.T, walDir string) []*walSample { t.Helper() sr, err := wlog.NewSegmentsReader(walDir) require.NoError(t, err) From 920ee7f99dea5a774a22419f53600ff133abda9a Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Fri, 14 Nov 2025 16:42:16 -0500 Subject: [PATCH 2/8] Prevent duplicate SeriesRefs from being lost in db stripeSeries Signed-off-by: Kyle Eckhart --- tsdb/agent/db.go | 71 +++++++++++++++++++++++++-------------- tsdb/agent/db_test.go | 17 +++++----- tsdb/agent/series.go | 45 +++++++++++++++++++++---- tsdb/agent/series_test.go | 14 ++++++++ 4 files changed, 105 insertions(+), 42 deletions(-) diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 460ceb7c04..a6635db338 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -403,7 +403,7 @@ func (db *DB) replayWAL() error { return fmt.Errorf("find last checkpoint: %w", err) } - multiRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{} + duplicateRefToValidRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{} if err == nil { sr, err := wlog.NewSegmentsReader(dir) @@ -418,7 +418,7 @@ func (db *DB) replayWAL() error { // A corrupted checkpoint is a hard error for now and requires user // intervention. There's likely little data that can be recovered anyway. - if err := db.loadWAL(wlog.NewReader(sr), multiRef); err != nil { + if err := db.loadWAL(wlog.NewReader(sr), duplicateRefToValidRef, startFrom); err != nil { return fmt.Errorf("backfill checkpoint: %w", err) } startFrom++ @@ -439,7 +439,7 @@ func (db *DB) replayWAL() error { } sr := wlog.NewSegmentBufReader(seg) - err = db.loadWAL(wlog.NewReader(sr), multiRef) + err = db.loadWAL(wlog.NewReader(sr), duplicateRefToValidRef, i) if err := sr.Close(); err != nil { db.logger.Warn("error while closing the wal segments reader", "err", err) } @@ -462,7 +462,7 @@ func (db *DB) resetWALReplayResources() { db.walReplayFloatHistogramsPool = zeropool.Pool[[]record.RefFloatHistogramSample]{} } -func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef) (err error) { +func (db *DB) loadWAL(r *wlog.Reader, duplicateRefToValidRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, currentSegmentOrCheckpoint int) (err error) { var ( syms = labels.NewSymbolTable() // One table for the whole WAL. dec = record.NewDecoder(syms, db.logger) @@ -547,29 +547,38 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H switch v := d.(type) { case []record.RefSeries: for _, entry := range v { - // If this is a new series, create it in memory. If we never read in a - // sample for this series, its timestamp will remain at 0 and it will - // be deleted at the next GC. - if db.series.GetByID(entry.Ref) == nil { - series := &memSeries{ref: entry.Ref, lset: entry.Labels, lastTs: 0} - db.series.Set(entry.Labels.Hash(), series) - multiRef[entry.Ref] = series.ref + // Make sure we don't try to reuse a Ref that already exists in the WAL. + if entry.Ref > lastRef { + lastRef = entry.Ref + } + + series := &memSeries{ref: entry.Ref, lset: entry.Labels, lastTs: 0} + series, created := db.series.GetOrSet(series.lset.Hash(), series) + + if !created { + duplicateRefToValidRef[entry.Ref] = series.ref + // Make sure we keep the duplicate SeriesRef while it exists in the WAL + db.deleted[entry.Ref] = currentSegmentOrCheckpoint + } else { db.metrics.numActiveSeries.Inc() - if entry.Ref > lastRef { - lastRef = entry.Ref - } } } db.walReplaySeriesPool.Put(v) case []record.RefSample: for _, entry := range v { - // Update the lastTs for the series based - ref, ok := multiRef[entry.Ref] - if !ok { + if ref, ok := duplicateRefToValidRef[entry.Ref]; ok { + // Make sure we keep the duplicate SeriesRef in checkpoints until we get past the current segment. + db.deleted[entry.Ref] = currentSegmentOrCheckpoint + entry.Ref = ref + } + + series := db.series.GetByID(entry.Ref) + if series == nil { nonExistentSeriesRefs.Inc() continue } - series := db.series.GetByID(ref) + + // Update the lastTs for the series if this sample is newer. if entry.T > series.lastTs { series.lastTs = entry.T } @@ -577,13 +586,18 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H db.walReplaySamplesPool.Put(v) case []record.RefHistogramSample: for _, entry := range v { - // Update the lastTs for the series based - ref, ok := multiRef[entry.Ref] - if !ok { + if ref, ok := duplicateRefToValidRef[entry.Ref]; ok { + // Make sure we keep the duplicate SeriesRef in checkpoints until we get past the current segment. + db.deleted[entry.Ref] = currentSegmentOrCheckpoint + entry.Ref = ref + } + series := db.series.GetByID(entry.Ref) + if series == nil { nonExistentSeriesRefs.Inc() continue } - series := db.series.GetByID(ref) + + // Update the lastTs for the series if this sample is newer. if entry.T > series.lastTs { series.lastTs = entry.T } @@ -591,13 +605,18 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H db.walReplayHistogramsPool.Put(v) case []record.RefFloatHistogramSample: for _, entry := range v { - // Update the lastTs for the series based - ref, ok := multiRef[entry.Ref] - if !ok { + if ref, ok := duplicateRefToValidRef[entry.Ref]; ok { + // Make sure we keep the duplicate SeriesRef in checkpoints until we get past the current segment. + db.deleted[entry.Ref] = currentSegmentOrCheckpoint + entry.Ref = ref + } + series := db.series.GetByID(entry.Ref) + if series == nil { nonExistentSeriesRefs.Inc() continue } - series := db.series.GetByID(ref) + + // Update the lastTs for the series if this sample is newer. if entry.T > series.lastTs { series.lastTs = entry.T } diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index 9ba71721f5..3275f2b057 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -1325,7 +1325,6 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) { func TestDuplicateSeriesRefsByHash(t *testing.T) { dbDir := t.TempDir() opts := DefaultOptions() - opts.OutOfOrderTimeWindow = 360_000 db := createTestAgentDB(t, nil, opts, dbDir) app := db.Appender(context.Background()) @@ -1344,19 +1343,19 @@ func TestDuplicateSeriesRefsByHash(t *testing.T) { } require.NoError(t, app.Commit()) - // Forcefully create a bunch of new segments to force a truncation + // Forcefully create a bunch of new segments to force a truncation. for i := 0; i < 3; i++ { _, err := db.wal.NextSegmentSync() require.NoError(t, err) } // No series should be deleted yet - require.Equal(t, 0, len(db.deleted)) + require.Empty(t, db.deleted) - // Truncate at 1 ms higher than the highest timestamp + // Truncate at 1 ms higher than the highest timestamp. err := db.truncate(11) require.NoError(t, err) - // The original SeriesRefs should be considered deleted + // The original SeriesRefs should be considered deleted. for _, ref := range originalSeriesRefs { require.Nil(t, db.series.GetByID(ref)) require.Contains(t, db.deleted, ref) @@ -1371,22 +1370,22 @@ func TestDuplicateSeriesRefsByHash(t *testing.T) { } require.NoError(t, app.Commit()) - // The duplicate SeriesRefs should be in series + // The duplicate SeriesRefs should be in series. for _, ref := range duplicateSeriesRefs { require.NotNil(t, db.series.GetByID(ref)) } - // Close the WAL before we have a chance to remove the original RefIDs + // Close the WAL before we have a chance to remove the original RefIDs. require.NoError(t, db.Close()) db = createTestAgentDB(t, nil, opts, dbDir) - // The original SeriesRefs should be in series + // The original SeriesRefs should be in series. for _, ref := range originalSeriesRefs { require.NotNil(t, db.series.GetByID(ref)) require.NotContains(t, db.deleted, ref) } - // The duplicated SeriesRefs should be considered deleted + // The duplicated SeriesRefs should be considered deleted. for _, ref := range duplicateSeriesRefs { require.Nil(t, db.series.GetByID(ref)) require.Contains(t, db.deleted, ref) diff --git a/tsdb/agent/series.go b/tsdb/agent/series.go index 4eb691bfd5..7315363176 100644 --- a/tsdb/agent/series.go +++ b/tsdb/agent/series.go @@ -182,7 +182,7 @@ func (s *stripeSeries) GC(mint int64) map[chunks.HeadSeriesRef]struct{} { // The series is stale. We need to obtain a second lock for the // ref if it's different than the hash lock. - refLock := int(series.ref) & (s.size - 1) + refLock := int(s.refLock(series.ref)) if hashLock != refLock { s.locks[refLock].Lock() } @@ -220,14 +220,14 @@ func (s *stripeSeries) GC(mint int64) map[chunks.HeadSeriesRef]struct{} { } func (s *stripeSeries) GetByID(id chunks.HeadSeriesRef) *memSeries { - refLock := uint64(id) & uint64(s.size-1) + refLock := s.refLock(id) s.locks[refLock].RLock() defer s.locks[refLock].RUnlock() return s.series[refLock][id] } func (s *stripeSeries) GetByHash(hash uint64, lset labels.Labels) *memSeries { - hashLock := hash & uint64(s.size-1) + hashLock := s.hashLock(hash) s.locks[hashLock].RLock() defer s.locks[hashLock].RUnlock() @@ -236,8 +236,8 @@ func (s *stripeSeries) GetByHash(hash uint64, lset labels.Labels) *memSeries { func (s *stripeSeries) Set(hash uint64, series *memSeries) { var ( - hashLock = hash & uint64(s.size-1) - refLock = uint64(series.ref) & uint64(s.size-1) + hashLock = s.hashLock(hash) + refLock = s.refLock(series.ref) ) // We can't hold both locks at once otherwise we might deadlock with a @@ -254,8 +254,31 @@ func (s *stripeSeries) Set(hash uint64, series *memSeries) { s.locks[hashLock].Unlock() } +// GetOrSet returns the existing series for the given label set, or sets it if it does not exist. +// It returns the series and a boolean indicating whether it was newly created. +func (s *stripeSeries) GetOrSet(hash uint64, series *memSeries) (*memSeries, bool) { + hashLock := s.hashLock(hash) + + s.locks[hashLock].Lock() + // If it already exists in hashes, return it. + if prev := s.hashes[hashLock].Get(hash, series.lset); prev != nil { + s.locks[hashLock].Unlock() + return prev, false + } + s.hashes[hashLock].Set(hash, series) + s.locks[hashLock].Unlock() + + refLock := s.refLock(series.ref) + + s.locks[refLock].Lock() + s.series[refLock][series.ref] = series + s.locks[refLock].Unlock() + + return series, true +} + func (s *stripeSeries) GetLatestExemplar(ref chunks.HeadSeriesRef) *exemplar.Exemplar { - i := uint64(ref) & uint64(s.size-1) + i := s.refLock(ref) s.locks[i].RLock() exemplar := s.exemplars[i][ref] @@ -265,7 +288,7 @@ func (s *stripeSeries) GetLatestExemplar(ref chunks.HeadSeriesRef) *exemplar.Exe } func (s *stripeSeries) SetLatestExemplar(ref chunks.HeadSeriesRef, exemplar *exemplar.Exemplar) { - i := uint64(ref) & uint64(s.size-1) + i := s.refLock(ref) // Make sure that's a valid series id and record its latest exemplar s.locks[i].Lock() @@ -274,3 +297,11 @@ func (s *stripeSeries) SetLatestExemplar(ref chunks.HeadSeriesRef, exemplar *exe } s.locks[i].Unlock() } + +func (s *stripeSeries) hashLock(hash uint64) uint64 { + return hash & uint64(s.size-1) +} + +func (s *stripeSeries) refLock(ref chunks.HeadSeriesRef) uint64 { + return uint64(ref) & uint64(s.size-1) +} diff --git a/tsdb/agent/series_test.go b/tsdb/agent/series_test.go index 4b277b36b7..f8b6431a3e 100644 --- a/tsdb/agent/series_test.go +++ b/tsdb/agent/series_test.go @@ -122,6 +122,20 @@ func TestStripeSeries_Get(t *testing.T) { require.Same(t, ms2, got) } +func TestStripeSeries_GetOrSet(t *testing.T) { + lbl := labels.FromStrings("__name__", "metric", "lbl", "HFnEaGl") + + ss := newStripeSeries(1) + + ms, created := ss.GetOrSet(lbl.Hash(), &memSeries{ref: chunks.HeadSeriesRef(1), lset: lbl}) + require.True(t, created) + require.Equal(t, lbl, ms.lset) + + ms2, created := ss.GetOrSet(lbl.Hash(), &memSeries{ref: chunks.HeadSeriesRef(2), lset: lbl}) + require.False(t, created) + require.Equal(t, ms, ms2) +} + func TestStripeSeries_gc(t *testing.T) { s, ms1, ms2 := stripeSeriesWithCollidingSeries(t) hash := ms1.lset.Hash() From 8f68b4d4098033a9dde6102f85bf61a8ccdd96d3 Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Mon, 23 Feb 2026 15:08:15 -0500 Subject: [PATCH 3/8] Drop default initialized value Signed-off-by: Kyle Eckhart --- tsdb/agent/db.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index a6635db338..46a47a2105 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -552,7 +552,7 @@ func (db *DB) loadWAL(r *wlog.Reader, duplicateRefToValidRef map[chunks.HeadSeri lastRef = entry.Ref } - series := &memSeries{ref: entry.Ref, lset: entry.Labels, lastTs: 0} + series := &memSeries{ref: entry.Ref, lset: entry.Labels} series, created := db.series.GetOrSet(series.lset.Hash(), series) if !created { From 1e60d7fd3b807c5d4624d393b36bcd37338898ea Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Mon, 23 Feb 2026 15:34:44 -0500 Subject: [PATCH 4/8] More comments and only reset deleted if the new segment is larger Signed-off-by: Kyle Eckhart --- tsdb/agent/db.go | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 46a47a2105..93a9d1e712 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -556,9 +556,16 @@ func (db *DB) loadWAL(r *wlog.Reader, duplicateRefToValidRef map[chunks.HeadSeri series, created := db.series.GetOrSet(series.lset.Hash(), series) if !created { + // We don't need to check if entry.Ref exists / if the value is not series.ref because GetOrSet + // enforces that the same labels will always get the same Ref. If we did not create a new ref + // the only possible ref it should ever be in the WAL is series.ref duplicateRefToValidRef[entry.Ref] = series.ref - // Make sure we keep the duplicate SeriesRef while it exists in the WAL - db.deleted[entry.Ref] = currentSegmentOrCheckpoint + + // We want to track the largest segment where we encountered the duplicate ref, so we can ensure + // it remains in the checkpoint until we get past that segment. + if db.deleted[entry.Ref] <= currentSegmentOrCheckpoint { + db.deleted[entry.Ref] = currentSegmentOrCheckpoint + } } else { db.metrics.numActiveSeries.Inc() } @@ -567,8 +574,11 @@ func (db *DB) loadWAL(r *wlog.Reader, duplicateRefToValidRef map[chunks.HeadSeri case []record.RefSample: for _, entry := range v { if ref, ok := duplicateRefToValidRef[entry.Ref]; ok { - // Make sure we keep the duplicate SeriesRef in checkpoints until we get past the current segment. - db.deleted[entry.Ref] = currentSegmentOrCheckpoint + // We want to track the largest segment where we encountered the duplicate ref, so we can ensure + // it remains in the checkpoint until we get past that segment. + if db.deleted[entry.Ref] <= currentSegmentOrCheckpoint { + db.deleted[entry.Ref] = currentSegmentOrCheckpoint + } entry.Ref = ref } @@ -587,8 +597,11 @@ func (db *DB) loadWAL(r *wlog.Reader, duplicateRefToValidRef map[chunks.HeadSeri case []record.RefHistogramSample: for _, entry := range v { if ref, ok := duplicateRefToValidRef[entry.Ref]; ok { - // Make sure we keep the duplicate SeriesRef in checkpoints until we get past the current segment. - db.deleted[entry.Ref] = currentSegmentOrCheckpoint + // We want to track the largest segment where we encountered the duplicate ref, so we can ensure + // it remains in the checkpoint until we get past that segment. + if db.deleted[entry.Ref] <= currentSegmentOrCheckpoint { + db.deleted[entry.Ref] = currentSegmentOrCheckpoint + } entry.Ref = ref } series := db.series.GetByID(entry.Ref) @@ -606,8 +619,11 @@ func (db *DB) loadWAL(r *wlog.Reader, duplicateRefToValidRef map[chunks.HeadSeri case []record.RefFloatHistogramSample: for _, entry := range v { if ref, ok := duplicateRefToValidRef[entry.Ref]; ok { - // Make sure we keep the duplicate SeriesRef in checkpoints until we get past the current segment. - db.deleted[entry.Ref] = currentSegmentOrCheckpoint + // We want to track the largest segment where we encountered the duplicate ref, so we can ensure + // it remains in the checkpoint until we get past that segment. + if db.deleted[entry.Ref] <= currentSegmentOrCheckpoint { + db.deleted[entry.Ref] = currentSegmentOrCheckpoint + } entry.Ref = ref } series := db.series.GetByID(entry.Ref) From f4a15255e47b4f4d4acd6f12382cf8a7a53c283c Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Mon, 23 Feb 2026 15:35:18 -0500 Subject: [PATCH 5/8] Manually manage db/rw to prevent windows test error Signed-off-by: Kyle Eckhart --- tsdb/agent/db_test.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index 3275f2b057..dc45c4197b 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -1325,7 +1325,9 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) { func TestDuplicateSeriesRefsByHash(t *testing.T) { dbDir := t.TempDir() opts := DefaultOptions() - db := createTestAgentDB(t, nil, opts, dbDir) + rs1 := remote.NewStorage(promslog.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil, false) + db, err := Open(promslog.NewNopLogger(), nil, rs1, dbDir, opts) + require.NoError(t, err) app := db.Appender(context.Background()) @@ -1344,7 +1346,7 @@ func TestDuplicateSeriesRefsByHash(t *testing.T) { require.NoError(t, app.Commit()) // Forcefully create a bunch of new segments to force a truncation. - for i := 0; i < 3; i++ { + for range 3 { _, err := db.wal.NextSegmentSync() require.NoError(t, err) } @@ -1352,7 +1354,7 @@ func TestDuplicateSeriesRefsByHash(t *testing.T) { require.Empty(t, db.deleted) // Truncate at 1 ms higher than the highest timestamp. - err := db.truncate(11) + err = db.truncate(11) require.NoError(t, err) // The original SeriesRefs should be considered deleted. @@ -1376,9 +1378,17 @@ func TestDuplicateSeriesRefsByHash(t *testing.T) { } // Close the WAL before we have a chance to remove the original RefIDs. + // Both db and rs1 must be closed to release all file handles before + // reopening the same directory — important on Windows. require.NoError(t, db.Close()) + require.NoError(t, rs1.Close()) + + rs2 := remote.NewStorage(promslog.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil, false) + t.Cleanup(func() { require.NoError(t, rs2.Close()) }) + db, err = Open(promslog.NewNopLogger(), nil, rs2, dbDir, opts) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, db.Close()) }) - db = createTestAgentDB(t, nil, opts, dbDir) // The original SeriesRefs should be in series. for _, ref := range originalSeriesRefs { require.NotNil(t, db.series.GetByID(ref)) From 33afbb799a9b655282b5754ae7b07ae78a1e932d Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Mon, 23 Feb 2026 16:08:17 -0500 Subject: [PATCH 6/8] Fix incorrect type from rebase Signed-off-by: Kyle Eckhart --- tsdb/agent/db_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index dc45c4197b..c0d600785c 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -1402,7 +1402,7 @@ func TestDuplicateSeriesRefsByHash(t *testing.T) { } } -func readWALSamples(t *testing.T, walDir string) []*walSample { +func readWALSamples(t *testing.T, walDir string) []walSample { t.Helper() sr, err := wlog.NewSegmentsReader(walDir) require.NoError(t, err) From 9c6b468ae3d864e5299dbe641b2b7b3fee6a9ddb Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Wed, 25 Feb 2026 16:52:07 -0500 Subject: [PATCH 7/8] Use Set in GetOrSet to enforce proper lock ordering Signed-off-by: Kyle Eckhart --- tsdb/agent/series.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/tsdb/agent/series.go b/tsdb/agent/series.go index 7315363176..0c9f8203df 100644 --- a/tsdb/agent/series.go +++ b/tsdb/agent/series.go @@ -260,20 +260,13 @@ func (s *stripeSeries) GetOrSet(hash uint64, series *memSeries) (*memSeries, boo hashLock := s.hashLock(hash) s.locks[hashLock].Lock() - // If it already exists in hashes, return it. if prev := s.hashes[hashLock].Get(hash, series.lset); prev != nil { s.locks[hashLock].Unlock() return prev, false } - s.hashes[hashLock].Set(hash, series) s.locks[hashLock].Unlock() - refLock := s.refLock(series.ref) - - s.locks[refLock].Lock() - s.series[refLock][series.ref] = series - s.locks[refLock].Unlock() - + s.Set(hash, series) return series, true } From dca993ed3c5be9941ebf5f5685583128c6dcb737 Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Wed, 25 Feb 2026 16:52:37 -0500 Subject: [PATCH 8/8] Missing period and left over refactor Signed-off-by: Kyle Eckhart --- tsdb/agent/db.go | 2 +- tsdb/agent/db_test.go | 6 +----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 93a9d1e712..f7e83ae7dd 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -558,7 +558,7 @@ func (db *DB) loadWAL(r *wlog.Reader, duplicateRefToValidRef map[chunks.HeadSeri if !created { // We don't need to check if entry.Ref exists / if the value is not series.ref because GetOrSet // enforces that the same labels will always get the same Ref. If we did not create a new ref - // the only possible ref it should ever be in the WAL is series.ref + // the only possible ref it should ever be in the WAL is series.ref. duplicateRefToValidRef[entry.Ref] = series.ref // We want to track the largest segment where we encountered the duplicate ref, so we can ensure diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index c0d600785c..e6b8cadc22 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -88,15 +88,11 @@ func TestDB_InvalidSeries(t *testing.T) { }) } -func createTestAgentDB(t testing.TB, reg prometheus.Registerer, opts *Options, dir ...string) *DB { +func createTestAgentDB(t testing.TB, reg prometheus.Registerer, opts *Options) *DB { t.Helper() dbDir := t.TempDir() - if len(dir) > 0 && dir[0] != "" { - dbDir = dir[0] - } - rs := remote.NewStorage(promslog.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil, false) t.Cleanup(func() { require.NoError(t, rs.Close())