Update head.walExpiries with record timestamps during WAL replay

Signed-off-by: Patryk Prus <p@trykpr.us>
This commit is contained in:
Patryk Prus 2025-08-08 12:52:34 -04:00
parent 218558f543
commit 6875022873
No known key found for this signature in database
GPG key ID: 795650115CA6A58F
3 changed files with 266 additions and 5 deletions

View file

@ -1280,11 +1280,12 @@ func (h *Head) getWALExpiry(id chunks.HeadSeriesRef) (int64, bool) {
return keepUntil, ok
}
func (h *Head) setWALExpiry(id chunks.HeadSeriesRef, keepUntil int64) {
// updateWALExpiry updates the WAL expiry for a series, keeping the higher of the current value and keepUntil.
func (h *Head) updateWALExpiry(id chunks.HeadSeriesRef, keepUntil int64) {
h.walExpiriesMtx.Lock()
defer h.walExpiriesMtx.Unlock()
h.walExpiries[id] = keepUntil
h.walExpiries[id] = max(keepUntil, h.walExpiries[id])
}
// keepSeriesInWALCheckpoint is used to determine whether a series record should be kept in the checkpoint

View file

@ -162,6 +162,10 @@ func populateTestWL(t testing.TB, w *wlog.WL, recs []interface{}, buf []byte) []
buf = enc.Tombstones(v, buf)
case []record.RefExemplar:
buf = enc.Exemplars(v, buf)
case []record.RefHistogramSample:
buf, _ = enc.HistogramSamples(v, buf)
case []record.RefFloatHistogramSample:
buf, _ = enc.FloatHistogramSamples(v, buf)
case []record.RefMmapMarker:
buf = enc.MmapMarkers(v, buf)
case []record.RefMetadata:
@ -886,6 +890,254 @@ func TestHead_WALMultiRef(t *testing.T) {
}}, series)
}
func TestHead_WALCheckpointMultiRef(t *testing.T) {
cases := []struct {
name string
walEntries []interface{}
expectedWalExpiry int64
walTruncateMinT int64
expectedWalEntries []interface{}
}{
{
name: "Samples only; keep needed duplicate series record",
walEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]record.RefSample{
{Ref: 1, T: 100, V: 1},
{Ref: 2, T: 200, V: 2},
{Ref: 2, T: 500, V: 3},
},
},
expectedWalExpiry: 500,
walTruncateMinT: 500,
expectedWalEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]record.RefSample{
{Ref: 2, T: 500, V: 3},
},
},
},
{
name: "Tombstones only; keep needed duplicate series record",
walEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]tombstones.Stone{
{Ref: 1, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 100}}},
{Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 200}}},
{Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 500}}},
},
},
expectedWalExpiry: 500,
walTruncateMinT: 500,
expectedWalEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]tombstones.Stone{
{Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 500}}},
},
},
},
{
name: "Exemplars only; keep needed duplicate series record",
walEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]record.RefExemplar{
{Ref: 1, T: 100, V: 1, Labels: labels.FromStrings("trace_id", "asdf")},
{Ref: 2, T: 200, V: 2, Labels: labels.FromStrings("trace_id", "asdf")},
{Ref: 2, T: 500, V: 3, Labels: labels.FromStrings("trace_id", "asdf")},
},
},
expectedWalExpiry: 500,
walTruncateMinT: 500,
expectedWalEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]record.RefExemplar{
{Ref: 2, T: 500, V: 3, Labels: labels.FromStrings("trace_id", "asdf")},
},
},
},
{
name: "Histograms only; keep needed duplicate series record",
walEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]record.RefHistogramSample{
{Ref: 1, T: 100, H: &histogram.Histogram{}},
{Ref: 2, T: 200, H: &histogram.Histogram{}},
{Ref: 2, T: 500, H: &histogram.Histogram{}},
},
},
expectedWalExpiry: 500,
walTruncateMinT: 500,
expectedWalEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]record.RefHistogramSample{
{Ref: 2, T: 500, H: &histogram.Histogram{}},
},
},
},
{
name: "Float histograms only; keep needed duplicate series record",
walEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]record.RefFloatHistogramSample{
{Ref: 1, T: 100, FH: &histogram.FloatHistogram{}},
{Ref: 2, T: 200, FH: &histogram.FloatHistogram{}},
{Ref: 2, T: 500, FH: &histogram.FloatHistogram{}},
},
},
expectedWalExpiry: 500,
walTruncateMinT: 500,
expectedWalEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]record.RefFloatHistogramSample{
{Ref: 2, T: 500, FH: &histogram.FloatHistogram{}},
},
},
},
{
name: "All record types; keep needed duplicate series record until last record",
// Series with 2 refs and samples for both
walEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]record.RefSample{
{Ref: 2, T: 500, V: 3},
},
[]tombstones.Stone{
{Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 500}}},
},
[]record.RefExemplar{
{Ref: 2, T: 800, V: 2, Labels: labels.FromStrings("trace_id", "asdf")},
},
[]record.RefHistogramSample{
{Ref: 2, T: 500, H: &histogram.Histogram{}},
},
[]record.RefFloatHistogramSample{
{Ref: 2, T: 500, FH: &histogram.FloatHistogram{}},
},
},
expectedWalExpiry: 800,
walTruncateMinT: 700,
expectedWalEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]record.RefExemplar{
{Ref: 2, T: 800, V: 2, Labels: labels.FromStrings("trace_id", "asdf")},
},
},
},
{
name: "All record types; drop expired duplicate series record",
// Series with 2 refs and samples for both
walEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]record.RefSample{
{Ref: 2, T: 500, V: 2},
{Ref: 1, T: 900, V: 3},
},
[]tombstones.Stone{
{Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 750}}},
},
[]record.RefExemplar{
{Ref: 2, T: 800, V: 2, Labels: labels.FromStrings("trace_id", "asdf")},
},
[]record.RefHistogramSample{
{Ref: 2, T: 600, H: &histogram.Histogram{}},
},
[]record.RefFloatHistogramSample{
{Ref: 2, T: 700, FH: &histogram.FloatHistogram{}},
},
},
expectedWalExpiry: 800,
walTruncateMinT: 900,
expectedWalEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
},
[]record.RefSample{
{Ref: 1, T: 900, V: 3},
},
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
h, w := newTestHead(t, 1000, compression.None, false)
t.Cleanup(func() {
require.NoError(t, h.Close())
})
populateTestWL(t, w, tc.walEntries, nil)
first, _, err := wlog.Segments(w.Dir())
require.NoError(t, err)
require.NoError(t, h.Init(0))
keepUntil, ok := h.getWALExpiry(2)
require.True(t, ok)
require.Equal(t, tc.expectedWalExpiry, keepUntil)
// Each truncation creates a new segment, so attempt truncations until a checkpoint is created
for {
h.lastWALTruncationTime.Store(0) // Reset so that it's always time to truncate the WAL
err := h.truncateWAL(tc.walTruncateMinT)
require.NoError(t, err)
f, _, err := wlog.Segments(w.Dir())
require.NoError(t, err)
if f > first {
break
}
}
// Read test WAL , checkpoint first
checkpointDir, _, err := wlog.LastCheckpoint(w.Dir())
require.NoError(t, err)
cprecs := readTestWAL(t, checkpointDir)
recs := readTestWAL(t, w.Dir())
recs = append(cprecs, recs...)
require.Equal(t, tc.expectedWalEntries, recs)
})
}
}
func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) {
existingRef := 1
existingLbls := labels.FromStrings("foo", "bar")
@ -932,7 +1184,7 @@ func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) {
if tc.prepare != nil {
tc.prepare(t, h)
} else {
h.setWALExpiry(chunks.HeadSeriesRef(existingRef), keepUntil)
h.updateWALExpiry(chunks.HeadSeriesRef(existingRef), keepUntil)
}
kept := h.keepSeriesInWALCheckpoint(chunks.HeadSeriesRef(existingRef), tc.mint)

View file

@ -265,8 +265,6 @@ Outer:
}
if !created {
multiRef[walSeries.Ref] = mSeries.ref
// Set the WAL expiry for the duplicate series, so it is kept in subsequent WAL checkpoints.
h.setWALExpiry(walSeries.Ref, int64(lastSegment))
}
idx := uint64(mSeries.ref) % uint64(concurrency)
@ -292,6 +290,8 @@ Outer:
continue // Before minValidTime: discard.
}
if r, ok := multiRef[sam.Ref]; ok {
// This is a sample for a duplicate series, so we need to keep the series record at least until this record's timestamp.
h.updateWALExpiry(sam.Ref, sam.T)
sam.Ref = r
}
mod := uint64(sam.Ref) % uint64(concurrency)
@ -313,6 +313,8 @@ Outer:
continue
}
if r, ok := multiRef[chunks.HeadSeriesRef(s.Ref)]; ok {
// This is a tombstone for a duplicate series, so we need to keep the series record at least until this record's timestamp.
h.updateWALExpiry(chunks.HeadSeriesRef(s.Ref), itv.Maxt)
s.Ref = storage.SeriesRef(r)
}
if m := h.series.getByID(chunks.HeadSeriesRef(s.Ref)); m == nil {
@ -330,6 +332,8 @@ Outer:
continue
}
if r, ok := multiRef[e.Ref]; ok {
// This is an exemplar for a duplicate series, so we need to keep the series record at least until this record's timestamp.
h.updateWALExpiry(e.Ref, e.T)
e.Ref = r
}
exemplarsInput <- e
@ -354,6 +358,8 @@ Outer:
continue // Before minValidTime: discard.
}
if r, ok := multiRef[sam.Ref]; ok {
// This is a histogram sample for a duplicate series, so we need to keep the series record at least until this record's timestamp.
h.updateWALExpiry(sam.Ref, sam.T)
sam.Ref = r
}
mod := uint64(sam.Ref) % uint64(concurrency)
@ -387,6 +393,8 @@ Outer:
continue // Before minValidTime: discard.
}
if r, ok := multiRef[sam.Ref]; ok {
// This is a float histogram sample for a duplicate series, so we need to keep the series record at least until this record's timestamp.
h.updateWALExpiry(sam.Ref, sam.T)
sam.Ref = r
}
mod := uint64(sam.Ref) % uint64(concurrency)