From 687502287389791c8b2f9ebdf20fe8150aaa6a3d Mon Sep 17 00:00:00 2001 From: Patryk Prus Date: Fri, 8 Aug 2025 12:52:34 -0400 Subject: [PATCH] Update head.walExpiries with record timestamps during WAL replay Signed-off-by: Patryk Prus --- tsdb/head.go | 5 +- tsdb/head_test.go | 254 +++++++++++++++++++++++++++++++++++++++++++++- tsdb/head_wal.go | 12 ++- 3 files changed, 266 insertions(+), 5 deletions(-) diff --git a/tsdb/head.go b/tsdb/head.go index c710ae8e71..a533b529db 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1280,11 +1280,12 @@ func (h *Head) getWALExpiry(id chunks.HeadSeriesRef) (int64, bool) { return keepUntil, ok } -func (h *Head) setWALExpiry(id chunks.HeadSeriesRef, keepUntil int64) { +// updateWALExpiry updates the WAL expiry for a series, keeping the higher of the current value and keepUntil. +func (h *Head) updateWALExpiry(id chunks.HeadSeriesRef, keepUntil int64) { h.walExpiriesMtx.Lock() defer h.walExpiriesMtx.Unlock() - h.walExpiries[id] = keepUntil + h.walExpiries[id] = max(keepUntil, h.walExpiries[id]) } // keepSeriesInWALCheckpoint is used to determine whether a series record should be kept in the checkpoint diff --git a/tsdb/head_test.go b/tsdb/head_test.go index d3e544b69e..934378f844 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -162,6 +162,10 @@ func populateTestWL(t testing.TB, w *wlog.WL, recs []interface{}, buf []byte) [] buf = enc.Tombstones(v, buf) case []record.RefExemplar: buf = enc.Exemplars(v, buf) + case []record.RefHistogramSample: + buf, _ = enc.HistogramSamples(v, buf) + case []record.RefFloatHistogramSample: + buf, _ = enc.FloatHistogramSamples(v, buf) case []record.RefMmapMarker: buf = enc.MmapMarkers(v, buf) case []record.RefMetadata: @@ -886,6 +890,254 @@ func TestHead_WALMultiRef(t *testing.T) { }}, series) } +func TestHead_WALCheckpointMultiRef(t *testing.T) { + + cases := []struct { + name string + walEntries []interface{} + expectedWalExpiry int64 + walTruncateMinT int64 + expectedWalEntries []interface{} + }{ + { + name: "Samples only; keep needed duplicate series record", + walEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefSample{ + {Ref: 1, T: 100, V: 1}, + {Ref: 2, T: 200, V: 2}, + {Ref: 2, T: 500, V: 3}, + }, + }, + expectedWalExpiry: 500, + walTruncateMinT: 500, + expectedWalEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefSample{ + {Ref: 2, T: 500, V: 3}, + }, + }, + }, + { + name: "Tombstones only; keep needed duplicate series record", + walEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []tombstones.Stone{ + {Ref: 1, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 100}}}, + {Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 200}}}, + {Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 500}}}, + }, + }, + expectedWalExpiry: 500, + walTruncateMinT: 500, + expectedWalEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []tombstones.Stone{ + {Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 500}}}, + }, + }, + }, + { + name: "Exemplars only; keep needed duplicate series record", + walEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefExemplar{ + {Ref: 1, T: 100, V: 1, Labels: labels.FromStrings("trace_id", "asdf")}, + {Ref: 2, T: 200, V: 2, Labels: labels.FromStrings("trace_id", "asdf")}, + {Ref: 2, T: 500, V: 3, Labels: labels.FromStrings("trace_id", "asdf")}, + }, + }, + expectedWalExpiry: 500, + walTruncateMinT: 500, + expectedWalEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefExemplar{ + {Ref: 2, T: 500, V: 3, Labels: labels.FromStrings("trace_id", "asdf")}, + }, + }, + }, + { + name: "Histograms only; keep needed duplicate series record", + walEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefHistogramSample{ + {Ref: 1, T: 100, H: &histogram.Histogram{}}, + {Ref: 2, T: 200, H: &histogram.Histogram{}}, + {Ref: 2, T: 500, H: &histogram.Histogram{}}, + }, + }, + expectedWalExpiry: 500, + walTruncateMinT: 500, + expectedWalEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: 500, H: &histogram.Histogram{}}, + }, + }, + }, + { + name: "Float histograms only; keep needed duplicate series record", + walEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefFloatHistogramSample{ + {Ref: 1, T: 100, FH: &histogram.FloatHistogram{}}, + {Ref: 2, T: 200, FH: &histogram.FloatHistogram{}}, + {Ref: 2, T: 500, FH: &histogram.FloatHistogram{}}, + }, + }, + expectedWalExpiry: 500, + walTruncateMinT: 500, + expectedWalEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: 500, FH: &histogram.FloatHistogram{}}, + }, + }, + }, + { + name: "All record types; keep needed duplicate series record until last record", + // Series with 2 refs and samples for both + walEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefSample{ + {Ref: 2, T: 500, V: 3}, + }, + []tombstones.Stone{ + {Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 500}}}, + }, + []record.RefExemplar{ + {Ref: 2, T: 800, V: 2, Labels: labels.FromStrings("trace_id", "asdf")}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: 500, H: &histogram.Histogram{}}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: 500, FH: &histogram.FloatHistogram{}}, + }, + }, + expectedWalExpiry: 800, + walTruncateMinT: 700, + expectedWalEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefExemplar{ + {Ref: 2, T: 800, V: 2, Labels: labels.FromStrings("trace_id", "asdf")}, + }, + }, + }, + { + name: "All record types; drop expired duplicate series record", + // Series with 2 refs and samples for both + walEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefSample{ + {Ref: 2, T: 500, V: 2}, + {Ref: 1, T: 900, V: 3}, + }, + []tombstones.Stone{ + {Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 750}}}, + }, + []record.RefExemplar{ + {Ref: 2, T: 800, V: 2, Labels: labels.FromStrings("trace_id", "asdf")}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: 600, H: &histogram.Histogram{}}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: 700, FH: &histogram.FloatHistogram{}}, + }, + }, + expectedWalExpiry: 800, + walTruncateMinT: 900, + expectedWalEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefSample{ + {Ref: 1, T: 900, V: 3}, + }, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + h, w := newTestHead(t, 1000, compression.None, false) + t.Cleanup(func() { + require.NoError(t, h.Close()) + }) + + populateTestWL(t, w, tc.walEntries, nil) + first, _, err := wlog.Segments(w.Dir()) + require.NoError(t, err) + + require.NoError(t, h.Init(0)) + + keepUntil, ok := h.getWALExpiry(2) + require.True(t, ok) + require.Equal(t, tc.expectedWalExpiry, keepUntil) + + // Each truncation creates a new segment, so attempt truncations until a checkpoint is created + for { + h.lastWALTruncationTime.Store(0) // Reset so that it's always time to truncate the WAL + err := h.truncateWAL(tc.walTruncateMinT) + require.NoError(t, err) + f, _, err := wlog.Segments(w.Dir()) + require.NoError(t, err) + if f > first { + break + } + } + + // Read test WAL , checkpoint first + checkpointDir, _, err := wlog.LastCheckpoint(w.Dir()) + require.NoError(t, err) + cprecs := readTestWAL(t, checkpointDir) + recs := readTestWAL(t, w.Dir()) + recs = append(cprecs, recs...) + require.Equal(t, tc.expectedWalEntries, recs) + }) + } +} + func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) { existingRef := 1 existingLbls := labels.FromStrings("foo", "bar") @@ -932,7 +1184,7 @@ func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) { if tc.prepare != nil { tc.prepare(t, h) } else { - h.setWALExpiry(chunks.HeadSeriesRef(existingRef), keepUntil) + h.updateWALExpiry(chunks.HeadSeriesRef(existingRef), keepUntil) } kept := h.keepSeriesInWALCheckpoint(chunks.HeadSeriesRef(existingRef), tc.mint) diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 9d96882b57..f648abe7ea 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -265,8 +265,6 @@ Outer: } if !created { multiRef[walSeries.Ref] = mSeries.ref - // Set the WAL expiry for the duplicate series, so it is kept in subsequent WAL checkpoints. - h.setWALExpiry(walSeries.Ref, int64(lastSegment)) } idx := uint64(mSeries.ref) % uint64(concurrency) @@ -292,6 +290,8 @@ Outer: continue // Before minValidTime: discard. } if r, ok := multiRef[sam.Ref]; ok { + // This is a sample for a duplicate series, so we need to keep the series record at least until this record's timestamp. + h.updateWALExpiry(sam.Ref, sam.T) sam.Ref = r } mod := uint64(sam.Ref) % uint64(concurrency) @@ -313,6 +313,8 @@ Outer: continue } if r, ok := multiRef[chunks.HeadSeriesRef(s.Ref)]; ok { + // This is a tombstone for a duplicate series, so we need to keep the series record at least until this record's timestamp. + h.updateWALExpiry(chunks.HeadSeriesRef(s.Ref), itv.Maxt) s.Ref = storage.SeriesRef(r) } if m := h.series.getByID(chunks.HeadSeriesRef(s.Ref)); m == nil { @@ -330,6 +332,8 @@ Outer: continue } if r, ok := multiRef[e.Ref]; ok { + // This is an exemplar for a duplicate series, so we need to keep the series record at least until this record's timestamp. + h.updateWALExpiry(e.Ref, e.T) e.Ref = r } exemplarsInput <- e @@ -354,6 +358,8 @@ Outer: continue // Before minValidTime: discard. } if r, ok := multiRef[sam.Ref]; ok { + // This is a histogram sample for a duplicate series, so we need to keep the series record at least until this record's timestamp. + h.updateWALExpiry(sam.Ref, sam.T) sam.Ref = r } mod := uint64(sam.Ref) % uint64(concurrency) @@ -387,6 +393,8 @@ Outer: continue // Before minValidTime: discard. } if r, ok := multiRef[sam.Ref]; ok { + // This is a float histogram sample for a duplicate series, so we need to keep the series record at least until this record's timestamp. + h.updateWALExpiry(sam.Ref, sam.T) sam.Ref = r } mod := uint64(sam.Ref) % uint64(concurrency)