mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-18 18:25:24 -05:00
fix(tsdb): missing passing head option to wal/wbl write (#18113)
Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
parent
d1220defd3
commit
dc8613df54
2 changed files with 116 additions and 2 deletions
|
|
@ -1059,7 +1059,7 @@ func (a *headAppenderBase) log() error {
|
|||
defer func() { a.head.putBytesBuffer(buf) }()
|
||||
|
||||
var rec []byte
|
||||
var enc record.Encoder
|
||||
enc := record.Encoder{EnableSTStorage: a.head.opts.EnableSTStorage.Load()}
|
||||
|
||||
if len(a.seriesRefs) > 0 {
|
||||
rec = enc.Series(a.seriesRefs, buf)
|
||||
|
|
@ -1743,7 +1743,7 @@ func (a *headAppenderBase) Commit() (err error) {
|
|||
samplesPerChunk: h.opts.SamplesPerChunk,
|
||||
},
|
||||
oooEnc: record.Encoder{
|
||||
EnableSTStorage: false,
|
||||
EnableSTStorage: h.opts.EnableSTStorage.Load(),
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -7267,3 +7267,117 @@ func TestHistogramStalenessConversionMetrics(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestHeadAppender_WALEncoder_EnableSTStorage verifies that when EnableSTStorage
|
||||
// is true the WAL encoder writes SamplesV2 records, and when false it writes
|
||||
// plain Samples (V1) records. The bug was that log() always created a zero-value
|
||||
// record.Encoder (EnableSTStorage=false), ignoring the head option.
|
||||
func TestHeadAppender_WALEncoder_EnableSTStorage(t *testing.T) {
|
||||
for _, enableST := range []bool{false, true} {
|
||||
t.Run(fmt.Sprintf("enableSTStorage=%v", enableST), func(t *testing.T) {
|
||||
opts := newTestHeadDefaultOptions(DefaultBlockDuration, false)
|
||||
opts.EnableSTStorage.Store(enableST)
|
||||
h, w := newTestHeadWithOptions(t, compression.None, opts)
|
||||
|
||||
lbls := labels.FromStrings("foo", "bar")
|
||||
app := h.AppenderV2(context.Background())
|
||||
for ts := int64(100); ts < 110; ts++ {
|
||||
_, err := app.Append(0, lbls, 0, ts, float64(ts), nil, nil, storage.AOptions{})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
require.NoError(t, h.Close())
|
||||
|
||||
// Read WAL segments directly and check the sample record type.
|
||||
sr, err := wlog.NewSegmentsReader(w.Dir())
|
||||
require.NoError(t, err)
|
||||
defer func() { require.NoError(t, sr.Close()) }()
|
||||
|
||||
dec := record.NewDecoder(labels.NewSymbolTable(), promslog.NewNopLogger())
|
||||
r := wlog.NewReader(sr)
|
||||
|
||||
var foundSampleRecord bool
|
||||
for r.Next() {
|
||||
rt := dec.Type(r.Record())
|
||||
switch rt {
|
||||
case record.Samples:
|
||||
require.False(t, enableST, "WAL contains Samples (V1) record but EnableSTStorage=true, expected SamplesV2")
|
||||
foundSampleRecord = true
|
||||
case record.SamplesV2:
|
||||
require.True(t, enableST, "WAL contains SamplesV2 record but EnableSTStorage=false, expected Samples (V1)")
|
||||
foundSampleRecord = true
|
||||
}
|
||||
}
|
||||
require.NoError(t, r.Err())
|
||||
require.True(t, foundSampleRecord, "no sample record found in WAL")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestHeadAppender_WBLEncoder_EnableSTStorage verifies that when EnableSTStorage
|
||||
// is true the WBL encoder writes SamplesV2 records for out-of-order samples, and
|
||||
// when false it writes plain Samples (V1) records. The bug was that collectOOORecords()
|
||||
// always created record.Encoder{EnableSTStorage: false}, ignoring the head option.
|
||||
func TestHeadAppender_WBLEncoder_EnableSTStorage(t *testing.T) {
|
||||
for _, enableST := range []bool{false, true} {
|
||||
t.Run(fmt.Sprintf("enableSTStorage=%v", enableST), func(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.None)
|
||||
require.NoError(t, err)
|
||||
wbl, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.None)
|
||||
require.NoError(t, err)
|
||||
|
||||
opts := DefaultHeadOptions()
|
||||
opts.ChunkRange = DefaultBlockDuration
|
||||
opts.ChunkDirRoot = dir
|
||||
opts.OutOfOrderTimeWindow.Store(60 * time.Minute.Milliseconds())
|
||||
opts.EnableSTStorage.Store(enableST)
|
||||
|
||||
h, err := NewHead(nil, nil, wal, wbl, opts, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, h.Init(0))
|
||||
t.Cleanup(func() { _ = h.Close() })
|
||||
|
||||
lbls := labels.FromStrings("foo", "bar")
|
||||
|
||||
// Append an in-order sample to establish head maxt.
|
||||
app := h.AppenderV2(context.Background())
|
||||
_, err = app.Append(0, lbls, 0, 200, 200, nil, nil, storage.AOptions{})
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// Append OOO samples; these are written to the WBL.
|
||||
app = h.AppenderV2(context.Background())
|
||||
for ts := int64(100); ts < 110; ts++ {
|
||||
_, err = app.Append(0, lbls, 0, ts, float64(ts), nil, nil, storage.AOptions{})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
require.NoError(t, h.Close())
|
||||
|
||||
// Read WBL segments directly and check the sample record type.
|
||||
sr, err := wlog.NewSegmentsReader(filepath.Join(dir, wlog.WblDirName))
|
||||
require.NoError(t, err)
|
||||
defer func() { require.NoError(t, sr.Close()) }()
|
||||
|
||||
dec := record.NewDecoder(labels.NewSymbolTable(), promslog.NewNopLogger())
|
||||
r := wlog.NewReader(sr)
|
||||
|
||||
var foundSampleRecord bool
|
||||
for r.Next() {
|
||||
rt := dec.Type(r.Record())
|
||||
switch rt {
|
||||
case record.Samples:
|
||||
require.False(t, enableST, "WBL contains Samples (V1) record but EnableSTStorage=true, expected SamplesV2")
|
||||
foundSampleRecord = true
|
||||
case record.SamplesV2:
|
||||
require.True(t, enableST, "WBL contains SamplesV2 record but EnableSTStorage=false, expected Samples (V1)")
|
||||
foundSampleRecord = true
|
||||
}
|
||||
}
|
||||
require.NoError(t, r.Err())
|
||||
require.True(t, foundSampleRecord, "no sample record found in WBL")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue