feat[head]: implement EnableSTStorage for float appending

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2026-02-17 09:45:34 +00:00
parent d9fbe7b0ae
commit 9db326ad64
5 changed files with 121 additions and 100 deletions

View file

@ -236,7 +236,6 @@ type Options struct {
// EnableSTStorage determines whether TSDB should write a Start Timestamp (ST)
// per sample to WAL.
// TODO(bwplotka): Implement this option as per PROM-60, currently it's noop.
EnableSTStorage bool
// EnableMetadataWALRecords represents 'metadata-wal-records' feature flag.

View file

@ -203,7 +203,6 @@ type HeadOptions struct {
// EnableSTStorage determines whether agent DB should write a Start Timestamp (ST)
// per sample to WAL.
// TODO(bwplotka): Implement this option as per PROM-60, currently it's noop.
EnableSTStorage bool
}

View file

@ -1058,8 +1058,10 @@ func (a *headAppenderBase) log() error {
buf := a.head.getBytesBuffer()
defer func() { a.head.putBytesBuffer(buf) }()
var rec []byte
var enc record.Encoder
var (
rec []byte
enc = record.Encoder{EnableSTStorage: a.head.opts.EnableSTStorage}
)
if len(a.seriesRefs) > 0 {
rec = enc.Series(a.seriesRefs, buf)
@ -1178,7 +1180,7 @@ type appenderCommitContext struct {
oooRecords [][]byte
oooCapMax int64
appendChunkOpts chunkOpts
enc record.Encoder
oooEnc record.Encoder
}
// commitExemplars adds all exemplars from the provided batch to the head's exemplar storage.
@ -1228,31 +1230,31 @@ func (acc *appenderCommitContext) collectOOORecords(a *headAppenderBase) {
})
}
}
r := acc.enc.MmapMarkers(markers, a.head.getBytesBuffer())
r := acc.oooEnc.MmapMarkers(markers, a.head.getBytesBuffer())
acc.oooRecords = append(acc.oooRecords, r)
}
if len(acc.wblSamples) > 0 {
r := acc.enc.Samples(acc.wblSamples, a.head.getBytesBuffer())
r := acc.oooEnc.Samples(acc.wblSamples, a.head.getBytesBuffer())
acc.oooRecords = append(acc.oooRecords, r)
}
if len(acc.wblHistograms) > 0 {
r, customBucketsHistograms := acc.enc.HistogramSamples(acc.wblHistograms, a.head.getBytesBuffer())
r, customBucketsHistograms := acc.oooEnc.HistogramSamples(acc.wblHistograms, a.head.getBytesBuffer())
if len(r) > 0 {
acc.oooRecords = append(acc.oooRecords, r)
}
if len(customBucketsHistograms) > 0 {
r := acc.enc.CustomBucketsHistogramSamples(customBucketsHistograms, a.head.getBytesBuffer())
r := acc.oooEnc.CustomBucketsHistogramSamples(customBucketsHistograms, a.head.getBytesBuffer())
acc.oooRecords = append(acc.oooRecords, r)
}
}
if len(acc.wblFloatHistograms) > 0 {
r, customBucketsFloatHistograms := acc.enc.FloatHistogramSamples(acc.wblFloatHistograms, a.head.getBytesBuffer())
r, customBucketsFloatHistograms := acc.oooEnc.FloatHistogramSamples(acc.wblFloatHistograms, a.head.getBytesBuffer())
if len(r) > 0 {
acc.oooRecords = append(acc.oooRecords, r)
}
if len(customBucketsFloatHistograms) > 0 {
r := acc.enc.CustomBucketsFloatHistogramSamples(customBucketsFloatHistograms, a.head.getBytesBuffer())
r := acc.oooEnc.CustomBucketsFloatHistogramSamples(customBucketsFloatHistograms, a.head.getBytesBuffer())
acc.oooRecords = append(acc.oooRecords, r)
}
}
@ -1431,7 +1433,7 @@ func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitConte
default:
newlyStale := !value.IsStaleNaN(series.lastValue) && value.IsStaleNaN(s.V)
staleToNonStale := value.IsStaleNaN(series.lastValue) && !value.IsStaleNaN(s.V)
ok, chunkCreated = series.append(s.T, s.V, a.appendID, acc.appendChunkOpts)
ok, chunkCreated = series.append(s.ST, s.T, s.V, a.appendID, acc.appendChunkOpts)
if ok {
if s.T < acc.inOrderMint {
acc.inOrderMint = s.T
@ -1540,7 +1542,8 @@ func (a *headAppenderBase) commitHistograms(b *appendBatch, acc *appenderCommitC
newlyStale = newlyStale && !value.IsStaleNaN(series.lastHistogramValue.Sum)
staleToNonStale = value.IsStaleNaN(series.lastHistogramValue.Sum) && !value.IsStaleNaN(s.H.Sum)
}
ok, chunkCreated = series.appendHistogram(s.T, s.H, a.appendID, acc.appendChunkOpts)
// TODO(bwplotka): Add support for ST for Histograms.
ok, chunkCreated = series.appendHistogram(0, s.T, s.H, a.appendID, acc.appendChunkOpts)
if ok {
if s.T < acc.inOrderMint {
acc.inOrderMint = s.T
@ -1649,7 +1652,8 @@ func (a *headAppenderBase) commitFloatHistograms(b *appendBatch, acc *appenderCo
newlyStale = newlyStale && !value.IsStaleNaN(series.lastFloatHistogramValue.Sum)
staleToNonStale = value.IsStaleNaN(series.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.FH.Sum)
}
ok, chunkCreated = series.appendFloatHistogram(s.T, s.FH, a.appendID, acc.appendChunkOpts)
// TODO(bwplotka): Add support for ST for FloatHistograms.
ok, chunkCreated = series.appendFloatHistogram(0, s.T, s.FH, a.appendID, acc.appendChunkOpts)
if ok {
if s.T < acc.inOrderMint {
acc.inOrderMint = s.T
@ -1741,9 +1745,10 @@ func (a *headAppenderBase) Commit() (err error) {
chunkDiskMapper: h.chunkDiskMapper,
chunkRange: h.chunkRange.Load(),
samplesPerChunk: h.opts.SamplesPerChunk,
enableSTStorage: h.opts.EnableSTStorage,
},
enc: record.Encoder{
EnableSTStorage: false,
oooEnc: record.Encoder{
EnableSTStorage: a.head.opts.EnableSharding,
},
}
@ -1827,19 +1832,25 @@ type chunkOpts struct {
chunkDiskMapper *chunks.ChunkDiskMapper
chunkRange int64
samplesPerChunk int
enableSTStorage bool
}
// append adds the sample (t, v) to the series. The caller also has to provide
// the appendID for isolation. (The appendID can be zero, which results in no
// isolation for this append.)
// Series lock must be held when calling.
func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, o)
func (s *memSeries) append(st, t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
enc := chunkenc.EncXOR
if o.enableSTStorage {
enc = chunkenc.EncXOROptST
}
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, enc, o)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
// TODO(krajorama): pass ST.
s.app.Append(0, t, v)
s.app.Append(st, t, v)
c.maxTime = t
@ -1859,14 +1870,16 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sa
// In case of recoding the existing chunk, a new chunk is allocated and the old chunk is dropped.
// To keep the meaning of prometheus_tsdb_head_chunks and prometheus_tsdb_head_chunks_created_total
// consistent, we return chunkCreated=false in this case.
func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
func (s *memSeries) appendHistogram(st, t int64, h *histogram.Histogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
// Head controls the execution of recoding, so that we own the proper
// chunk reference afterwards and mmap used up chunks.
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevApp, _ := s.app.(*chunkenc.HistogramAppender)
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.EncHistogram, o)
enc := chunkenc.EncHistogram
// TODO(bwplotka): Implement histogram ST encoding and switch on o.enableSTStorage.
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, enc, o)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
@ -1882,7 +1895,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
}
// TODO(krajorama): pass ST.
newChunk, recoded, s.app, _ = s.app.AppendHistogram(prevApp, 0, t, h, false) // false=request a new chunk if needed
newChunk, recoded, s.app, _ = s.app.AppendHistogram(prevApp, st, t, h, false) // false=request a new chunk if needed
s.lastHistogramValue = h
s.lastFloatHistogramValue = nil
@ -1917,14 +1930,16 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
// In case of recoding the existing chunk, a new chunk is allocated and the old chunk is dropped.
// To keep the meaning of prometheus_tsdb_head_chunks and prometheus_tsdb_head_chunks_created_total
// consistent, we return chunkCreated=false in this case.
func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
func (s *memSeries) appendFloatHistogram(st, t int64, fh *histogram.FloatHistogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
// Head controls the execution of recoding, so that we own the proper
// chunk reference afterwards and mmap used up chunks.
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevApp, _ := s.app.(*chunkenc.FloatHistogramAppender)
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.EncFloatHistogram, o)
enc := chunkenc.EncFloatHistogram
// TODO(bwplotka): Implement histogram ST encoding and switch on o.enableSTStorage.
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, enc, o)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
@ -1939,8 +1954,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram,
prevApp = nil
}
// TODO(krajorama): pass ST.
newChunk, recoded, s.app, _ = s.app.AppendFloatHistogram(prevApp, 0, t, fh, false) // False means request a new chunk if needed.
newChunk, recoded, s.app, _ = s.app.AppendFloatHistogram(prevApp, st, t, fh, false) // False means request a new chunk if needed.
s.lastHistogramValue = nil
s.lastFloatHistogramValue = fh

View file

@ -148,10 +148,10 @@ func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t i
switch {
case fh != nil:
isStale = value.IsStaleNaN(fh.Sum)
appErr = a.appendFloatHistogram(s, t, fh, opts.RejectOutOfOrder)
appErr = a.appendFloatHistogram(s, st, t, fh, opts.RejectOutOfOrder)
case h != nil:
isStale = value.IsStaleNaN(h.Sum)
appErr = a.appendHistogram(s, t, h, opts.RejectOutOfOrder)
appErr = a.appendHistogram(s, st, t, h, opts.RejectOutOfOrder)
default:
isStale = value.IsStaleNaN(v)
if isStale {
@ -177,7 +177,7 @@ func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t i
// we do not need to check for the difference between "unknown
// series" and "known series with stNone".
}
appErr = a.appendFloat(s, t, v, opts.RejectOutOfOrder)
appErr = a.appendFloat(s, st, t, v, opts.RejectOutOfOrder)
}
// Handle append error, if any.
if appErr != nil {
@ -218,7 +218,7 @@ func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t i
return storage.SeriesRef(s.ref), partialErr
}
func (a *headAppenderV2) appendFloat(s *memSeries, t int64, v float64, fastRejectOOO bool) error {
func (a *headAppenderV2) appendFloat(s *memSeries, st, t int64, v float64, fastRejectOOO bool) error {
s.Lock()
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL.
@ -239,12 +239,12 @@ func (a *headAppenderV2) appendFloat(s *memSeries, t int64, v float64, fastRejec
}
b := a.getCurrentBatch(stFloat, s.ref)
b.floats = append(b.floats, record.RefSample{Ref: s.ref, T: t, V: v})
b.floats = append(b.floats, record.RefSample{Ref: s.ref, ST: st, T: t, V: v})
b.floatSeries = append(b.floatSeries, s)
return nil
}
func (a *headAppenderV2) appendHistogram(s *memSeries, t int64, h *histogram.Histogram, fastRejectOOO bool) error {
func (a *headAppenderV2) appendHistogram(s *memSeries, _, t int64, h *histogram.Histogram, fastRejectOOO bool) error {
s.Lock()
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL.
@ -263,17 +263,18 @@ func (a *headAppenderV2) appendHistogram(s *memSeries, t int64, h *histogram.His
if err != nil {
return err
}
st := stHistogram
sTyp := stHistogram
if h.UsesCustomBuckets() {
st = stCustomBucketHistogram
sTyp = stCustomBucketHistogram
}
b := a.getCurrentBatch(st, s.ref)
b := a.getCurrentBatch(sTyp, s.ref)
// TODO(bwplotka): Add ST support for RefHistogramSample.
b.histograms = append(b.histograms, record.RefHistogramSample{Ref: s.ref, T: t, H: h})
b.histogramSeries = append(b.histogramSeries, s)
return nil
}
func (a *headAppenderV2) appendFloatHistogram(s *memSeries, t int64, fh *histogram.FloatHistogram, fastRejectOOO bool) error {
func (a *headAppenderV2) appendFloatHistogram(s *memSeries, _, t int64, fh *histogram.FloatHistogram, fastRejectOOO bool) error {
s.Lock()
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL.
@ -292,11 +293,12 @@ func (a *headAppenderV2) appendFloatHistogram(s *memSeries, t int64, fh *histogr
if err != nil {
return err
}
st := stFloatHistogram
sTyp := stFloatHistogram
if fh.UsesCustomBuckets() {
st = stCustomBucketFloatHistogram
sTyp = stCustomBucketFloatHistogram
}
b := a.getCurrentBatch(st, s.ref)
b := a.getCurrentBatch(sTyp, s.ref)
// TODO(bwplotka): Add ST support for RefFloatHistogramSample.
b.floatHistograms = append(b.floatHistograms, record.RefFloatHistogramSample{Ref: s.ref, T: t, FH: fh})
b.floatHistogramSeries = append(b.floatHistogramSeries, s)
return nil
@ -354,7 +356,7 @@ func (a *headAppenderV2) bestEffortAppendSTZeroSample(s *memSeries, ls labels.La
ZeroThreshold: fh.ZeroThreshold,
CustomValues: fh.CustomValues,
}
err = a.appendFloatHistogram(s, st, zeroFloatHistogram, true)
err = a.appendFloatHistogram(s, 0, st, zeroFloatHistogram, true)
case h != nil:
zeroHistogram := &histogram.Histogram{
// The STZeroSample represents a counter reset by definition.
@ -364,9 +366,9 @@ func (a *headAppenderV2) bestEffortAppendSTZeroSample(s *memSeries, ls labels.La
ZeroThreshold: h.ZeroThreshold,
CustomValues: h.CustomValues,
}
err = a.appendHistogram(s, st, zeroHistogram, true)
err = a.appendHistogram(s, 0, st, zeroHistogram, true)
default:
err = a.appendFloat(s, st, 0, true)
err = a.appendFloat(s, 0, st, 0, true)
}
if err != nil {

View file

@ -61,19 +61,20 @@ import (
)
// newTestHeadDefaultOptions returns the HeadOptions that should be used by default in unit tests.
func newTestHeadDefaultOptions(chunkRange int64, oooEnabled bool) *HeadOptions {
func newTestHeadDefaultOptions(chunkRange int64, enabledOOO, enabledSTStorage bool) *HeadOptions {
opts := DefaultHeadOptions()
opts.ChunkRange = chunkRange
opts.EnableExemplarStorage = true
opts.EnableSTStorage = enabledSTStorage
opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars)
if oooEnabled {
if enabledOOO {
opts.OutOfOrderTimeWindow.Store(10 * time.Minute.Milliseconds())
}
return opts
}
func newTestHead(t testing.TB, chunkRange int64, compressWAL compression.Type, oooEnabled bool) (*Head, *wlog.WL) {
return newTestHeadWithOptions(t, compressWAL, newTestHeadDefaultOptions(chunkRange, oooEnabled))
func newTestHead(t testing.TB, chunkRange int64, compressWAL compression.Type, enabledOOO, enabledSTStorage bool) (*Head, *wlog.WL) {
return newTestHeadWithOptions(t, compressWAL, newTestHeadDefaultOptions(chunkRange, enabledOOO, enabledSTStorage))
}
func newTestHeadWithOptions(t testing.TB, compressWAL compression.Type, opts *HeadOptions) (*Head, *wlog.WL) {
@ -102,7 +103,7 @@ func newTestHeadWithOptions(t testing.TB, compressWAL compression.Type, opts *He
func BenchmarkCreateSeries(b *testing.B) {
series := genSeries(b.N, 10, 0, 0)
h, _ := newTestHead(b, 10000, compression.None, false)
h, _ := newTestHead(b, 10000, compression.None, false, false)
b.ReportAllocs()
b.ResetTimer()
@ -256,7 +257,7 @@ func BenchmarkLoadWLs(b *testing.B) {
// Rough estimates of most common % of samples that have an exemplar for each scrape.
exemplarsPercentages := []float64{0, 0.5, 1, 5}
lastExemplarsPerSeries := -1
for _, enableStStorage := range []bool{false, true} {
for _, enableSTStorage := range []bool{false, true} {
for _, c := range cases {
missingSeriesPercentages := []float64{0, 0.1}
for _, missingSeriesPct := range missingSeriesPercentages {
@ -268,7 +269,7 @@ func BenchmarkLoadWLs(b *testing.B) {
continue
}
lastExemplarsPerSeries = exemplarsPerSeries
b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d,exemplarsPerSeries=%d,mmappedChunkT=%d,oooSeriesPct=%.3f,oooSamplesPct=%.3f,oooCapMax=%d,missingSeriesPct=%.3f,stStorage=%v", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries, c.mmappedChunkT, c.oooSeriesPct, c.oooSamplesPct, c.oooCapMax, missingSeriesPct, enableStStorage),
b.Run(fmt.Sprintf("batches=%d/seriesPerBatch=%d/samplesPerSeries=%d/exemplarsPerSeries=%d/mmappedChunkT=%d/oooSeriesPct=%.3f/oooSamplesPct=%.3f/oooCapMax=%d/missingSeriesPct=%.3f/stStorage=%v", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries, c.mmappedChunkT, c.oooSeriesPct, c.oooSamplesPct, c.oooCapMax, missingSeriesPct, enableSTStorage),
func(b *testing.B) {
dir := b.TempDir()
@ -307,7 +308,7 @@ func BenchmarkLoadWLs(b *testing.B) {
writeSeries = newWriteSeries
}
buf = populateTestWL(b, wal, []any{writeSeries}, buf, enableStStorage)
buf = populateTestWL(b, wal, []any{writeSeries}, buf, enableSTStorage)
}
// Write samples.
@ -333,7 +334,7 @@ func BenchmarkLoadWLs(b *testing.B) {
V: float64(i) * 100,
})
}
buf = populateTestWL(b, wal, []any{refSamples}, buf, enableStStorage)
buf = populateTestWL(b, wal, []any{refSamples}, buf, enableSTStorage)
}
}
@ -345,11 +346,12 @@ func BenchmarkLoadWLs(b *testing.B) {
chunkDiskMapper: chunkDiskMapper,
chunkRange: c.mmappedChunkT,
samplesPerChunk: DefaultSamplesPerChunk,
enableSTStorage: enableSTStorage,
}
for k := 0; k < c.batches*c.seriesPerBatch; k++ {
// Create one mmapped chunk per series, with one sample at the given time.
s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, 0, defaultIsolationDisabled, false)
s.append(c.mmappedChunkT, 42, 0, cOpts)
s.append(c.mmappedChunkT, 10, 42, 0, cOpts)
// There's only one head chunk because only a single sample is appended. mmapChunks()
// ignores the latest chunk, so we need to cut a new head chunk to guarantee the chunk with
// the sample at c.mmappedChunkT is mmapped.
@ -372,7 +374,7 @@ func BenchmarkLoadWLs(b *testing.B) {
Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", i)),
})
}
buf = populateTestWL(b, wal, []any{refExemplars}, buf, enableStStorage)
buf = populateTestWL(b, wal, []any{refExemplars}, buf, enableSTStorage)
}
}
@ -396,15 +398,16 @@ func BenchmarkLoadWLs(b *testing.B) {
}
refSamples = append(refSamples, record.RefSample{
Ref: ref,
ST: int64(i)*10 - 9,
T: int64(i) * 10,
V: float64(i) * 100,
})
}
if shouldAddMarkers {
populateTestWL(b, wbl, []any{refMarkers}, buf, enableStStorage)
populateTestWL(b, wbl, []any{refMarkers}, buf, enableSTStorage)
}
buf = populateTestWL(b, wal, []any{refSamples}, buf, enableStStorage)
buf = populateTestWL(b, wbl, []any{refSamples}, buf, enableStStorage)
buf = populateTestWL(b, wal, []any{refSamples}, buf, enableSTStorage)
buf = populateTestWL(b, wbl, []any{refSamples}, buf, enableSTStorage)
}
}
@ -415,6 +418,7 @@ func BenchmarkLoadWLs(b *testing.B) {
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = dir
opts.EnableSTStorage = enableSTStorage
if c.oooCapMax > 0 {
opts.OutOfOrderCapMax.Store(c.oooCapMax)
}
@ -477,7 +481,7 @@ func BenchmarkLoadRealWLs(b *testing.B) {
// TestHead_InitAppenderRace_ErrOutOfBounds tests against init races with maxTime vs minTime on empty head concurrent appends.
// See: https://github.com/prometheus/prometheus/pull/17963
func TestHead_InitAppenderRace_ErrOutOfBounds(t *testing.T) {
head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false, false)
require.NoError(t, head.Init(0))
ts := timestamp.FromTime(time.Now())
@ -515,7 +519,7 @@ func TestHead_InitAppenderRace_ErrOutOfBounds(t *testing.T) {
func TestHead_HighConcurrencyReadAndWrite(t *testing.T) {
for _, appV2 := range []bool{false, true} {
t.Run(fmt.Sprintf("appV2=%v", appV2), func(t *testing.T) {
head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false, false)
seriesCnt := 1000
readConcurrency := 2
@ -713,9 +717,9 @@ func TestHead_HighConcurrencyReadAndWrite(t *testing.T) {
}
func TestHead_ReadWAL(t *testing.T) {
for _, enableStStorage := range []bool{false, true} {
for _, enableSTStorage := range []bool{false, true} {
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
t.Run(fmt.Sprintf("compress=%s,stStorage=%v", compress, enableStStorage), func(t *testing.T) {
t.Run(fmt.Sprintf("compress=%s/stStorage=%v", compress, enableSTStorage), func(t *testing.T) {
entries := []any{
[]record.RefSeries{
{Ref: 10, Labels: labels.FromStrings("a", "1")},
@ -723,9 +727,9 @@ func TestHead_ReadWAL(t *testing.T) {
{Ref: 100, Labels: labels.FromStrings("a", "3")},
},
[]record.RefSample{
{Ref: 0, T: 99, V: 1},
{Ref: 10, T: 100, V: 2},
{Ref: 100, T: 100, V: 3},
{Ref: 0, ST: 1, T: 99, V: 1},
{Ref: 10, ST: 1, T: 100, V: 2},
{Ref: 100, ST: 1, T: 100, V: 3},
},
[]record.RefSeries{
{Ref: 50, Labels: labels.FromStrings("a", "4")},
@ -733,10 +737,10 @@ func TestHead_ReadWAL(t *testing.T) {
{Ref: 101, Labels: labels.FromStrings("a", "3")},
},
[]record.RefSample{
{Ref: 10, T: 101, V: 5},
{Ref: 50, T: 101, V: 6},
{Ref: 10, ST: 100, T: 101, V: 5},
{Ref: 50, ST: 100, T: 101, V: 6},
// Sample for duplicate series record.
{Ref: 101, T: 101, V: 7},
{Ref: 101, ST: 100, T: 101, V: 7},
},
[]tombstones.Stone{
{Ref: 0, Intervals: []tombstones.Interval{{Mint: 99, Maxt: 101}}},
@ -754,9 +758,9 @@ func TestHead_ReadWAL(t *testing.T) {
},
}
head, w := newTestHead(t, 1000, compress, false)
head, w := newTestHead(t, 1000, compress, false, enableSTStorage)
populateTestWL(t, w, entries, nil, enableStStorage)
populateTestWL(t, w, entries, nil, enableSTStorage)
require.NoError(t, head.Init(math.MinInt64))
require.Equal(t, uint64(101), head.lastSeriesID.Load())
@ -794,7 +798,10 @@ func TestHead_ReadWAL(t *testing.T) {
// Verify samples and exemplar for series 10.
c, _, _, err := s10.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
require.NoError(t, err)
require.Equal(t, []sample{{0, 100, 2, nil, nil}, {0, 101, 5, nil, nil}}, expandChunk(c.chunk.Iterator(nil)))
require.Equal(t, []sample{
{1, 100, 2, nil, nil},
{100, 101, 5, nil, nil},
}, expandChunk(c.chunk.Iterator(nil)))
q, err := head.ExemplarQuerier(context.Background())
require.NoError(t, err)
@ -837,7 +844,7 @@ func TestHead_ReadWAL(t *testing.T) {
}
func TestHead_WALMultiRef(t *testing.T) {
head, w := newTestHead(t, 1000, compression.None, false)
head, w := newTestHead(t, 1000, compression.None, false, false)
require.NoError(t, head.Init(0))
@ -912,9 +919,9 @@ func TestHead_WALCheckpointMultiRef(t *testing.T) {
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]record.RefSample{
{Ref: 1, T: 100, V: 1},
{Ref: 2, T: 200, V: 2},
{Ref: 2, T: 500, V: 3},
{Ref: 1, ST: 1, T: 100, V: 1},
{Ref: 2, ST: 101, T: 200, V: 2},
{Ref: 2, ST: 101, T: 500, V: 3},
},
},
expectedWalExpiry: 500,
@ -925,7 +932,7 @@ func TestHead_WALCheckpointMultiRef(t *testing.T) {
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]record.RefSample{
{Ref: 2, T: 500, V: 3},
{Ref: 2, ST: 101, T: 500, V: 3},
},
},
},
@ -1038,7 +1045,7 @@ func TestHead_WALCheckpointMultiRef(t *testing.T) {
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]record.RefSample{
{Ref: 2, T: 500, V: 3},
{Ref: 2, ST: 101, T: 500, V: 3},
},
[]tombstones.Stone{
{Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 500}}},
@ -1074,8 +1081,8 @@ func TestHead_WALCheckpointMultiRef(t *testing.T) {
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]record.RefSample{
{Ref: 2, T: 500, V: 2},
{Ref: 1, T: 900, V: 3},
{Ref: 2, ST: 101, T: 500, V: 2},
{Ref: 1, ST: 501, T: 900, V: 3},
},
[]tombstones.Stone{
{Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 750}}},
@ -1097,17 +1104,17 @@ func TestHead_WALCheckpointMultiRef(t *testing.T) {
{Ref: 1, Labels: labels.FromStrings("a", "1")},
},
[]record.RefSample{
{Ref: 1, T: 900, V: 3},
{Ref: 1, ST: 501, T: 900, V: 3},
},
},
},
}
for _, enableStStorage := range []bool{false, true} {
for _, enableSTStorage := range []bool{false, true} {
for _, tc := range cases {
t.Run(tc.name+",stStorage="+strconv.FormatBool(enableStStorage), func(t *testing.T) {
h, w := newTestHead(t, 1000, compression.None, false)
populateTestWL(t, w, tc.walEntries, nil, enableStStorage)
t.Run(fmt.Sprintf("case=%v/stStorage=%v", tc.name, enableSTStorage), func(t *testing.T) {
h, w := newTestHead(t, 1000, compression.None, false, enableSTStorage)
populateTestWL(t, w, tc.walEntries, nil, enableSTStorage)
first, _, err := wlog.Segments(w.Dir())
require.NoError(t, err)
@ -1181,7 +1188,7 @@ func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) {
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
h, _ := newTestHead(t, 1000, compression.None, false)
h, _ := newTestHead(t, 1000, compression.None, false, false)
if tc.prepare != nil {
tc.prepare(t, h)
@ -1690,9 +1697,9 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) {
}
func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
for _, enableStStorage := range []bool{false, true} {
for _, enableSTStorage := range []bool{false, true} {
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
t.Run(fmt.Sprintf("compress=%s,stStorage=%v", compress, enableStStorage), func(t *testing.T) {
t.Run(fmt.Sprintf("compress=%s,stStorage=%v", compress, enableSTStorage), func(t *testing.T) {
entries := []any{
[]record.RefSeries{
{Ref: 10, Labels: labels.FromStrings("a", "1")},
@ -1708,7 +1715,7 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
}
head, w := newTestHead(t, 1000, compress, false)
populateTestWL(t, w, entries, nil, enableStStorage)
populateTestWL(t, w, entries, nil, enableSTStorage)
require.NoError(t, head.Init(math.MinInt64))
@ -2575,8 +2582,8 @@ func TestHead_ReturnsSortedLabelValues(t *testing.T) {
// TestWalRepair_DecodingError ensures that a repair is run for an error
// when decoding a record.
func TestWalRepair_DecodingError(t *testing.T) {
for _, enableStStorage := range []bool{false, true} {
enc := record.Encoder{EnableSTStorage: enableStStorage}
for _, enableSTStorage := range []bool{false, true} {
enc := record.Encoder{EnableSTStorage: enableSTStorage}
for name, test := range map[string]struct {
corrFunc func(rec []byte) []byte // Func that applies the corruption to a record.
rec []byte
@ -2609,7 +2616,7 @@ func TestWalRepair_DecodingError(t *testing.T) {
},
} {
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
t.Run(fmt.Sprintf("%s,compress=%s,stStorage=%v", name, compress, enableStStorage), func(t *testing.T) {
t.Run(fmt.Sprintf("%s,compress=%s,stStorage=%v", name, compress, enableSTStorage), func(t *testing.T) {
dir := t.TempDir()
// Fill the wal and corrupt it.
@ -2672,9 +2679,9 @@ func TestWalRepair_DecodingError(t *testing.T) {
// TestWblRepair_DecodingError ensures that a repair is run for an error
// when decoding a record.
func TestWblRepair_DecodingError(t *testing.T) {
for _, enableStStorage := range []bool{false, true} {
t.Run("enableStStorage="+strconv.FormatBool(enableStStorage), func(t *testing.T) {
enc := record.Encoder{EnableSTStorage: enableStStorage}
for _, enableSTStorage := range []bool{false, true} {
t.Run("enableSTStorage="+strconv.FormatBool(enableSTStorage), func(t *testing.T) {
enc := record.Encoder{EnableSTStorage: enableSTStorage}
corrFunc := func(rec []byte) []byte {
return rec[:3]
}
@ -4378,8 +4385,8 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
}
func TestChunkSnapshot(t *testing.T) {
for _, enableStStorage := range []bool{false, true} {
t.Run("enableStStorage="+strconv.FormatBool(enableStStorage), func(t *testing.T) {
for _, enableSTStorage := range []bool{false, true} {
t.Run("enableSTStorage="+strconv.FormatBool(enableSTStorage), func(t *testing.T) {
head, _ := newTestHead(t, 120*4, compression.None, false)
defer func() {
head.opts.EnableMemorySnapshotOnShutdown = false
@ -4525,7 +4532,7 @@ func TestChunkSnapshot(t *testing.T) {
require.NoError(t, app.Commit())
// Add some tombstones.
enc := record.Encoder{EnableSTStorage: enableStStorage}
enc := record.Encoder{EnableSTStorage: enableSTStorage}
for i := 1; i <= numSeries; i++ {
ref := storage.SeriesRef(i)
itvs := tombstones.Intervals{
@ -4599,7 +4606,7 @@ func TestChunkSnapshot(t *testing.T) {
require.NoError(t, app.Commit())
// Add more tombstones.
enc := record.Encoder{EnableSTStorage: enableStStorage}
enc := record.Encoder{EnableSTStorage: enableSTStorage}
for i := 1; i <= numSeries; i++ {
ref := storage.SeriesRef(i)
itvs := tombstones.Intervals{
@ -5392,8 +5399,8 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
// Tests https://github.com/prometheus/prometheus/issues/9725.
func TestChunkSnapshotReplayBug(t *testing.T) {
for _, enableStStorage := range []bool{false, true} {
t.Run("enableStStorage="+strconv.FormatBool(enableStStorage), func(t *testing.T) {
for _, enableSTStorage := range []bool{false, true} {
t.Run("enableSTStorage="+strconv.FormatBool(enableSTStorage), func(t *testing.T) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy)
require.NoError(t, err)
@ -5418,7 +5425,7 @@ func TestChunkSnapshotReplayBug(t *testing.T) {
}
// Add a sample so that the series is not garbage collected.
samplesRec := record.RefSample{Ref: ref, T: 1000, V: 1000}
enc := record.Encoder{EnableSTStorage: enableStStorage}
enc := record.Encoder{EnableSTStorage: enableSTStorage}
rec := enc.Series([]record.RefSeries{seriesRec}, buf)
buf = rec[:0]