mirror of
https://github.com/prometheus/prometheus.git
synced 2026-05-28 04:02:21 -04:00
Prevent duplicate SeriesRefs from being lost in db stripeSeries
Signed-off-by: Kyle Eckhart <kgeckhart@users.noreply.github.com>
This commit is contained in:
parent
53f89055c3
commit
920ee7f99d
4 changed files with 105 additions and 42 deletions
|
|
@ -403,7 +403,7 @@ func (db *DB) replayWAL() error {
|
|||
return fmt.Errorf("find last checkpoint: %w", err)
|
||||
}
|
||||
|
||||
multiRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{}
|
||||
duplicateRefToValidRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{}
|
||||
|
||||
if err == nil {
|
||||
sr, err := wlog.NewSegmentsReader(dir)
|
||||
|
|
@ -418,7 +418,7 @@ func (db *DB) replayWAL() error {
|
|||
|
||||
// A corrupted checkpoint is a hard error for now and requires user
|
||||
// intervention. There's likely little data that can be recovered anyway.
|
||||
if err := db.loadWAL(wlog.NewReader(sr), multiRef); err != nil {
|
||||
if err := db.loadWAL(wlog.NewReader(sr), duplicateRefToValidRef, startFrom); err != nil {
|
||||
return fmt.Errorf("backfill checkpoint: %w", err)
|
||||
}
|
||||
startFrom++
|
||||
|
|
@ -439,7 +439,7 @@ func (db *DB) replayWAL() error {
|
|||
}
|
||||
|
||||
sr := wlog.NewSegmentBufReader(seg)
|
||||
err = db.loadWAL(wlog.NewReader(sr), multiRef)
|
||||
err = db.loadWAL(wlog.NewReader(sr), duplicateRefToValidRef, i)
|
||||
if err := sr.Close(); err != nil {
|
||||
db.logger.Warn("error while closing the wal segments reader", "err", err)
|
||||
}
|
||||
|
|
@ -462,7 +462,7 @@ func (db *DB) resetWALReplayResources() {
|
|||
db.walReplayFloatHistogramsPool = zeropool.Pool[[]record.RefFloatHistogramSample]{}
|
||||
}
|
||||
|
||||
func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef) (err error) {
|
||||
func (db *DB) loadWAL(r *wlog.Reader, duplicateRefToValidRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, currentSegmentOrCheckpoint int) (err error) {
|
||||
var (
|
||||
syms = labels.NewSymbolTable() // One table for the whole WAL.
|
||||
dec = record.NewDecoder(syms, db.logger)
|
||||
|
|
@ -547,29 +547,38 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|||
switch v := d.(type) {
|
||||
case []record.RefSeries:
|
||||
for _, entry := range v {
|
||||
// If this is a new series, create it in memory. If we never read in a
|
||||
// sample for this series, its timestamp will remain at 0 and it will
|
||||
// be deleted at the next GC.
|
||||
if db.series.GetByID(entry.Ref) == nil {
|
||||
series := &memSeries{ref: entry.Ref, lset: entry.Labels, lastTs: 0}
|
||||
db.series.Set(entry.Labels.Hash(), series)
|
||||
multiRef[entry.Ref] = series.ref
|
||||
// Make sure we don't try to reuse a Ref that already exists in the WAL.
|
||||
if entry.Ref > lastRef {
|
||||
lastRef = entry.Ref
|
||||
}
|
||||
|
||||
series := &memSeries{ref: entry.Ref, lset: entry.Labels, lastTs: 0}
|
||||
series, created := db.series.GetOrSet(series.lset.Hash(), series)
|
||||
|
||||
if !created {
|
||||
duplicateRefToValidRef[entry.Ref] = series.ref
|
||||
// Make sure we keep the duplicate SeriesRef while it exists in the WAL
|
||||
db.deleted[entry.Ref] = currentSegmentOrCheckpoint
|
||||
} else {
|
||||
db.metrics.numActiveSeries.Inc()
|
||||
if entry.Ref > lastRef {
|
||||
lastRef = entry.Ref
|
||||
}
|
||||
}
|
||||
}
|
||||
db.walReplaySeriesPool.Put(v)
|
||||
case []record.RefSample:
|
||||
for _, entry := range v {
|
||||
// Update the lastTs for the series based
|
||||
ref, ok := multiRef[entry.Ref]
|
||||
if !ok {
|
||||
if ref, ok := duplicateRefToValidRef[entry.Ref]; ok {
|
||||
// Make sure we keep the duplicate SeriesRef in checkpoints until we get past the current segment.
|
||||
db.deleted[entry.Ref] = currentSegmentOrCheckpoint
|
||||
entry.Ref = ref
|
||||
}
|
||||
|
||||
series := db.series.GetByID(entry.Ref)
|
||||
if series == nil {
|
||||
nonExistentSeriesRefs.Inc()
|
||||
continue
|
||||
}
|
||||
series := db.series.GetByID(ref)
|
||||
|
||||
// Update the lastTs for the series if this sample is newer.
|
||||
if entry.T > series.lastTs {
|
||||
series.lastTs = entry.T
|
||||
}
|
||||
|
|
@ -577,13 +586,18 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|||
db.walReplaySamplesPool.Put(v)
|
||||
case []record.RefHistogramSample:
|
||||
for _, entry := range v {
|
||||
// Update the lastTs for the series based
|
||||
ref, ok := multiRef[entry.Ref]
|
||||
if !ok {
|
||||
if ref, ok := duplicateRefToValidRef[entry.Ref]; ok {
|
||||
// Make sure we keep the duplicate SeriesRef in checkpoints until we get past the current segment.
|
||||
db.deleted[entry.Ref] = currentSegmentOrCheckpoint
|
||||
entry.Ref = ref
|
||||
}
|
||||
series := db.series.GetByID(entry.Ref)
|
||||
if series == nil {
|
||||
nonExistentSeriesRefs.Inc()
|
||||
continue
|
||||
}
|
||||
series := db.series.GetByID(ref)
|
||||
|
||||
// Update the lastTs for the series if this sample is newer.
|
||||
if entry.T > series.lastTs {
|
||||
series.lastTs = entry.T
|
||||
}
|
||||
|
|
@ -591,13 +605,18 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|||
db.walReplayHistogramsPool.Put(v)
|
||||
case []record.RefFloatHistogramSample:
|
||||
for _, entry := range v {
|
||||
// Update the lastTs for the series based
|
||||
ref, ok := multiRef[entry.Ref]
|
||||
if !ok {
|
||||
if ref, ok := duplicateRefToValidRef[entry.Ref]; ok {
|
||||
// Make sure we keep the duplicate SeriesRef in checkpoints until we get past the current segment.
|
||||
db.deleted[entry.Ref] = currentSegmentOrCheckpoint
|
||||
entry.Ref = ref
|
||||
}
|
||||
series := db.series.GetByID(entry.Ref)
|
||||
if series == nil {
|
||||
nonExistentSeriesRefs.Inc()
|
||||
continue
|
||||
}
|
||||
series := db.series.GetByID(ref)
|
||||
|
||||
// Update the lastTs for the series if this sample is newer.
|
||||
if entry.T > series.lastTs {
|
||||
series.lastTs = entry.T
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1325,7 +1325,6 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) {
|
|||
func TestDuplicateSeriesRefsByHash(t *testing.T) {
|
||||
dbDir := t.TempDir()
|
||||
opts := DefaultOptions()
|
||||
opts.OutOfOrderTimeWindow = 360_000
|
||||
db := createTestAgentDB(t, nil, opts, dbDir)
|
||||
|
||||
app := db.Appender(context.Background())
|
||||
|
|
@ -1344,19 +1343,19 @@ func TestDuplicateSeriesRefsByHash(t *testing.T) {
|
|||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// Forcefully create a bunch of new segments to force a truncation
|
||||
// Forcefully create a bunch of new segments to force a truncation.
|
||||
for i := 0; i < 3; i++ {
|
||||
_, err := db.wal.NextSegmentSync()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
// No series should be deleted yet
|
||||
require.Equal(t, 0, len(db.deleted))
|
||||
require.Empty(t, db.deleted)
|
||||
|
||||
// Truncate at 1 ms higher than the highest timestamp
|
||||
// Truncate at 1 ms higher than the highest timestamp.
|
||||
err := db.truncate(11)
|
||||
require.NoError(t, err)
|
||||
|
||||
// The original SeriesRefs should be considered deleted
|
||||
// The original SeriesRefs should be considered deleted.
|
||||
for _, ref := range originalSeriesRefs {
|
||||
require.Nil(t, db.series.GetByID(ref))
|
||||
require.Contains(t, db.deleted, ref)
|
||||
|
|
@ -1371,22 +1370,22 @@ func TestDuplicateSeriesRefsByHash(t *testing.T) {
|
|||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// The duplicate SeriesRefs should be in series
|
||||
// The duplicate SeriesRefs should be in series.
|
||||
for _, ref := range duplicateSeriesRefs {
|
||||
require.NotNil(t, db.series.GetByID(ref))
|
||||
}
|
||||
|
||||
// Close the WAL before we have a chance to remove the original RefIDs
|
||||
// Close the WAL before we have a chance to remove the original RefIDs.
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
db = createTestAgentDB(t, nil, opts, dbDir)
|
||||
// The original SeriesRefs should be in series
|
||||
// The original SeriesRefs should be in series.
|
||||
for _, ref := range originalSeriesRefs {
|
||||
require.NotNil(t, db.series.GetByID(ref))
|
||||
require.NotContains(t, db.deleted, ref)
|
||||
}
|
||||
|
||||
// The duplicated SeriesRefs should be considered deleted
|
||||
// The duplicated SeriesRefs should be considered deleted.
|
||||
for _, ref := range duplicateSeriesRefs {
|
||||
require.Nil(t, db.series.GetByID(ref))
|
||||
require.Contains(t, db.deleted, ref)
|
||||
|
|
|
|||
|
|
@ -182,7 +182,7 @@ func (s *stripeSeries) GC(mint int64) map[chunks.HeadSeriesRef]struct{} {
|
|||
|
||||
// The series is stale. We need to obtain a second lock for the
|
||||
// ref if it's different than the hash lock.
|
||||
refLock := int(series.ref) & (s.size - 1)
|
||||
refLock := int(s.refLock(series.ref))
|
||||
if hashLock != refLock {
|
||||
s.locks[refLock].Lock()
|
||||
}
|
||||
|
|
@ -220,14 +220,14 @@ func (s *stripeSeries) GC(mint int64) map[chunks.HeadSeriesRef]struct{} {
|
|||
}
|
||||
|
||||
func (s *stripeSeries) GetByID(id chunks.HeadSeriesRef) *memSeries {
|
||||
refLock := uint64(id) & uint64(s.size-1)
|
||||
refLock := s.refLock(id)
|
||||
s.locks[refLock].RLock()
|
||||
defer s.locks[refLock].RUnlock()
|
||||
return s.series[refLock][id]
|
||||
}
|
||||
|
||||
func (s *stripeSeries) GetByHash(hash uint64, lset labels.Labels) *memSeries {
|
||||
hashLock := hash & uint64(s.size-1)
|
||||
hashLock := s.hashLock(hash)
|
||||
|
||||
s.locks[hashLock].RLock()
|
||||
defer s.locks[hashLock].RUnlock()
|
||||
|
|
@ -236,8 +236,8 @@ func (s *stripeSeries) GetByHash(hash uint64, lset labels.Labels) *memSeries {
|
|||
|
||||
func (s *stripeSeries) Set(hash uint64, series *memSeries) {
|
||||
var (
|
||||
hashLock = hash & uint64(s.size-1)
|
||||
refLock = uint64(series.ref) & uint64(s.size-1)
|
||||
hashLock = s.hashLock(hash)
|
||||
refLock = s.refLock(series.ref)
|
||||
)
|
||||
|
||||
// We can't hold both locks at once otherwise we might deadlock with a
|
||||
|
|
@ -254,8 +254,31 @@ func (s *stripeSeries) Set(hash uint64, series *memSeries) {
|
|||
s.locks[hashLock].Unlock()
|
||||
}
|
||||
|
||||
// GetOrSet returns the existing series for the given label set, or sets it if it does not exist.
|
||||
// It returns the series and a boolean indicating whether it was newly created.
|
||||
func (s *stripeSeries) GetOrSet(hash uint64, series *memSeries) (*memSeries, bool) {
|
||||
hashLock := s.hashLock(hash)
|
||||
|
||||
s.locks[hashLock].Lock()
|
||||
// If it already exists in hashes, return it.
|
||||
if prev := s.hashes[hashLock].Get(hash, series.lset); prev != nil {
|
||||
s.locks[hashLock].Unlock()
|
||||
return prev, false
|
||||
}
|
||||
s.hashes[hashLock].Set(hash, series)
|
||||
s.locks[hashLock].Unlock()
|
||||
|
||||
refLock := s.refLock(series.ref)
|
||||
|
||||
s.locks[refLock].Lock()
|
||||
s.series[refLock][series.ref] = series
|
||||
s.locks[refLock].Unlock()
|
||||
|
||||
return series, true
|
||||
}
|
||||
|
||||
func (s *stripeSeries) GetLatestExemplar(ref chunks.HeadSeriesRef) *exemplar.Exemplar {
|
||||
i := uint64(ref) & uint64(s.size-1)
|
||||
i := s.refLock(ref)
|
||||
|
||||
s.locks[i].RLock()
|
||||
exemplar := s.exemplars[i][ref]
|
||||
|
|
@ -265,7 +288,7 @@ func (s *stripeSeries) GetLatestExemplar(ref chunks.HeadSeriesRef) *exemplar.Exe
|
|||
}
|
||||
|
||||
func (s *stripeSeries) SetLatestExemplar(ref chunks.HeadSeriesRef, exemplar *exemplar.Exemplar) {
|
||||
i := uint64(ref) & uint64(s.size-1)
|
||||
i := s.refLock(ref)
|
||||
|
||||
// Make sure that's a valid series id and record its latest exemplar
|
||||
s.locks[i].Lock()
|
||||
|
|
@ -274,3 +297,11 @@ func (s *stripeSeries) SetLatestExemplar(ref chunks.HeadSeriesRef, exemplar *exe
|
|||
}
|
||||
s.locks[i].Unlock()
|
||||
}
|
||||
|
||||
func (s *stripeSeries) hashLock(hash uint64) uint64 {
|
||||
return hash & uint64(s.size-1)
|
||||
}
|
||||
|
||||
func (s *stripeSeries) refLock(ref chunks.HeadSeriesRef) uint64 {
|
||||
return uint64(ref) & uint64(s.size-1)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -122,6 +122,20 @@ func TestStripeSeries_Get(t *testing.T) {
|
|||
require.Same(t, ms2, got)
|
||||
}
|
||||
|
||||
func TestStripeSeries_GetOrSet(t *testing.T) {
|
||||
lbl := labels.FromStrings("__name__", "metric", "lbl", "HFnEaGl")
|
||||
|
||||
ss := newStripeSeries(1)
|
||||
|
||||
ms, created := ss.GetOrSet(lbl.Hash(), &memSeries{ref: chunks.HeadSeriesRef(1), lset: lbl})
|
||||
require.True(t, created)
|
||||
require.Equal(t, lbl, ms.lset)
|
||||
|
||||
ms2, created := ss.GetOrSet(lbl.Hash(), &memSeries{ref: chunks.HeadSeriesRef(2), lset: lbl})
|
||||
require.False(t, created)
|
||||
require.Equal(t, ms, ms2)
|
||||
}
|
||||
|
||||
func TestStripeSeries_gc(t *testing.T) {
|
||||
s, ms1, ms2 := stripeSeriesWithCollidingSeries(t)
|
||||
hash := ms1.lset.Hash()
|
||||
|
|
|
|||
Loading…
Reference in a new issue