diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 17bb5de835..e673466ccc 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1059,7 +1059,7 @@ func (a *headAppenderBase) log() error { defer func() { a.head.putBytesBuffer(buf) }() var rec []byte - var enc record.Encoder + enc := record.Encoder{EnableSTStorage: a.head.opts.EnableSTStorage.Load()} if len(a.seriesRefs) > 0 { rec = enc.Series(a.seriesRefs, buf) @@ -1743,7 +1743,7 @@ func (a *headAppenderBase) Commit() (err error) { samplesPerChunk: h.opts.SamplesPerChunk, }, oooEnc: record.Encoder{ - EnableSTStorage: false, + EnableSTStorage: h.opts.EnableSTStorage.Load(), }, } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 2cee989e40..91cd742330 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -7267,3 +7267,117 @@ func TestHistogramStalenessConversionMetrics(t *testing.T) { }) } } + +// TestHeadAppender_WALEncoder_EnableSTStorage verifies that when EnableSTStorage +// is true the WAL encoder writes SamplesV2 records, and when false it writes +// plain Samples (V1) records. The bug was that log() always created a zero-value +// record.Encoder (EnableSTStorage=false), ignoring the head option. +func TestHeadAppender_WALEncoder_EnableSTStorage(t *testing.T) { + for _, enableST := range []bool{false, true} { + t.Run(fmt.Sprintf("enableSTStorage=%v", enableST), func(t *testing.T) { + opts := newTestHeadDefaultOptions(DefaultBlockDuration, false) + opts.EnableSTStorage.Store(enableST) + h, w := newTestHeadWithOptions(t, compression.None, opts) + + lbls := labels.FromStrings("foo", "bar") + app := h.AppenderV2(context.Background()) + for ts := int64(100); ts < 110; ts++ { + _, err := app.Append(0, lbls, 0, ts, float64(ts), nil, nil, storage.AOptions{}) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + require.NoError(t, h.Close()) + + // Read WAL segments directly and check the sample record type. + sr, err := wlog.NewSegmentsReader(w.Dir()) + require.NoError(t, err) + defer func() { require.NoError(t, sr.Close()) }() + + dec := record.NewDecoder(labels.NewSymbolTable(), promslog.NewNopLogger()) + r := wlog.NewReader(sr) + + var foundSampleRecord bool + for r.Next() { + rt := dec.Type(r.Record()) + switch rt { + case record.Samples: + require.False(t, enableST, "WAL contains Samples (V1) record but EnableSTStorage=true, expected SamplesV2") + foundSampleRecord = true + case record.SamplesV2: + require.True(t, enableST, "WAL contains SamplesV2 record but EnableSTStorage=false, expected Samples (V1)") + foundSampleRecord = true + } + } + require.NoError(t, r.Err()) + require.True(t, foundSampleRecord, "no sample record found in WAL") + }) + } +} + +// TestHeadAppender_WBLEncoder_EnableSTStorage verifies that when EnableSTStorage +// is true the WBL encoder writes SamplesV2 records for out-of-order samples, and +// when false it writes plain Samples (V1) records. The bug was that collectOOORecords() +// always created record.Encoder{EnableSTStorage: false}, ignoring the head option. +func TestHeadAppender_WBLEncoder_EnableSTStorage(t *testing.T) { + for _, enableST := range []bool{false, true} { + t.Run(fmt.Sprintf("enableSTStorage=%v", enableST), func(t *testing.T) { + dir := t.TempDir() + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.None) + require.NoError(t, err) + wbl, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.None) + require.NoError(t, err) + + opts := DefaultHeadOptions() + opts.ChunkRange = DefaultBlockDuration + opts.ChunkDirRoot = dir + opts.OutOfOrderTimeWindow.Store(60 * time.Minute.Milliseconds()) + opts.EnableSTStorage.Store(enableST) + + h, err := NewHead(nil, nil, wal, wbl, opts, nil) + require.NoError(t, err) + require.NoError(t, h.Init(0)) + t.Cleanup(func() { _ = h.Close() }) + + lbls := labels.FromStrings("foo", "bar") + + // Append an in-order sample to establish head maxt. + app := h.AppenderV2(context.Background()) + _, err = app.Append(0, lbls, 0, 200, 200, nil, nil, storage.AOptions{}) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + // Append OOO samples; these are written to the WBL. + app = h.AppenderV2(context.Background()) + for ts := int64(100); ts < 110; ts++ { + _, err = app.Append(0, lbls, 0, ts, float64(ts), nil, nil, storage.AOptions{}) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + require.NoError(t, h.Close()) + + // Read WBL segments directly and check the sample record type. + sr, err := wlog.NewSegmentsReader(filepath.Join(dir, wlog.WblDirName)) + require.NoError(t, err) + defer func() { require.NoError(t, sr.Close()) }() + + dec := record.NewDecoder(labels.NewSymbolTable(), promslog.NewNopLogger()) + r := wlog.NewReader(sr) + + var foundSampleRecord bool + for r.Next() { + rt := dec.Type(r.Record()) + switch rt { + case record.Samples: + require.False(t, enableST, "WBL contains Samples (V1) record but EnableSTStorage=true, expected SamplesV2") + foundSampleRecord = true + case record.SamplesV2: + require.True(t, enableST, "WBL contains SamplesV2 record but EnableSTStorage=false, expected Samples (V1)") + foundSampleRecord = true + } + } + require.NoError(t, r.Err()) + require.True(t, foundSampleRecord, "no sample record found in WBL") + }) + } +}