diff --git a/promql/engine_test.go b/promql/engine_test.go index ca1d5471c1..ea1336569f 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -3755,7 +3755,8 @@ func TestHistogramRateWithFloatStaleness(t *testing.T) { app, err = c2.Appender() require.NoError(t, err) - app.Append(0, 20, math.Float64frombits(value.StaleNaN)) + newChunk, _ := app.Append(0, 20, math.Float64frombits(value.StaleNaN)) + require.Nil(t, newChunk) // Make a chunk with two normal histograms that have zero value. h2 := histogram.Histogram{ diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index 5da8c8176c..e172d7b5bf 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -1146,7 +1146,8 @@ func buildTestChunks(t *testing.T) []prompb.Chunk { minTimeMs := time for j := range numSamplesPerTestChunk { - a.Append(0, time, float64(i+j)) + newChunk, _ := a.Append(0, time, float64(i+j)) + require.Nil(t, newChunk) time += int64(1000) } diff --git a/storage/series.go b/storage/series.go index bf6df7db3e..f7bac91099 100644 --- a/storage/series.go +++ b/storage/series.go @@ -342,10 +342,11 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { seriesIter := s.Series.Iterator(nil) lastType := chunkenc.ValNone for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() { + st := seriesIter.AtST() if typ != lastType || i >= seriesToChunkEncoderSplit { // Create a new chunk if the sample type changed or too many samples in the current one. chks = appendChunk(chks, mint, maxt, chk) - chk, err = chunkenc.NewEmptyChunk(typ.ChunkEncoding()) + chk, err = chunkenc.NewEmptyChunk(typ.ChunkEncoding(st != 0)) if err != nil { return errChunksIterator{err: err} } @@ -360,19 +361,24 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { lastType = typ var ( - st, t int64 - v float64 - h *histogram.Histogram - fh *histogram.FloatHistogram + t int64 + v float64 + h *histogram.Histogram + fh *histogram.FloatHistogram ) switch typ { case chunkenc.ValFloat: t, v = seriesIter.At() - st = seriesIter.AtST() - app.Append(st, t, v) + newChk, app = app.Append(st, t, v) + if newChk != nil { + chks = appendChunk(chks, mint, maxt, chk) + mint = int64(math.MaxInt64) + // maxt is immediately overwritten below which is why setting it here won't make a difference. + i = 0 + chk = newChk + } case chunkenc.ValHistogram: t, h = seriesIter.AtHistogram(nil) - st = seriesIter.AtST() newChk, recoded, app, err = app.AppendHistogram(nil, st, t, h, false) if err != nil { return errChunksIterator{err: err} @@ -388,7 +394,6 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { } case chunkenc.ValFloatHistogram: t, fh = seriesIter.AtFloatHistogram(nil) - st = seriesIter.AtST() newChk, recoded, app, err = app.AppendFloatHistogram(nil, st, t, fh, false) if err != nil { return errChunksIterator{err: err} diff --git a/tsdb/chunkenc/benchmark_test.go b/tsdb/chunkenc/benchmark_test.go index 1b9e133c10..482be9e3a1 100644 --- a/tsdb/chunkenc/benchmark_test.go +++ b/tsdb/chunkenc/benchmark_test.go @@ -276,6 +276,15 @@ For profiles: */ func BenchmarkAppender(b *testing.B) { foreachFmtSampleCase(b, func(b *testing.B, f fmtCase, s sampleCase) { + samples := make([]triple, len(s.samples)) + copy(samples, s.samples) + if f.stUnsupported { + // If the format does not support ST, zero them out for appending. + for i := range samples { + samples[i].st = 0 + } + } + b.ReportAllocs() for b.Loop() { @@ -285,8 +294,10 @@ func BenchmarkAppender(b *testing.B) { if err != nil { b.Fatalf("get appender: %s", err) } - for _, p := range s.samples { - a.Append(p.st, p.t, p.v) + for _, p := range samples { + // We are ignoring potential new chunk return here, as we'll + // check the number of samples in the chunk after all appends. + _, _ = a.Append(p.st, p.t, p.v) } // NOTE: Some buffered implementations only encode on Bytes(). b.ReportMetric(float64(len(c.Bytes())), "B/chunk") @@ -317,6 +328,15 @@ For profiles: */ func BenchmarkIterator(b *testing.B) { foreachFmtSampleCase(b, func(b *testing.B, f fmtCase, s sampleCase) { + samples := make([]triple, len(s.samples)) + copy(samples, s.samples) + if f.stUnsupported { + // If the format does not support ST, zero them out for appending. + for i := range samples { + samples[i].st = 0 + } + } + floatEquals := func(a, b float64) bool { return a == b } @@ -333,8 +353,11 @@ func BenchmarkIterator(b *testing.B) { if err != nil { b.Fatalf("get appender: %s", err) } - for _, p := range s.samples { - a.Append(p.st, p.t, p.v) + for _, p := range samples { + newChunk, _ := a.Append(p.st, p.t, p.v) + if newChunk != nil { + b.Fatalf("unexpected new chunk allocation") + } } // Some chunk implementations might be buffered. Reset to ensure we don't reuse @@ -352,16 +375,8 @@ func BenchmarkIterator(b *testing.B) { if err := it.Err(); err != nil && !errors.Is(err, io.EOF) { require.NoError(b, err) } - expectedSamples := s.samples - if f.stUnsupported { - // If the format does not support ST, zero them out for comparison. - expectedSamples = make([]triple, len(s.samples)) - copy(expectedSamples, s.samples) - for i := range s.samples { - expectedSamples[i].st = 0 - } - } - if diff := cmp.Diff(expectedSamples, got, cmp.AllowUnexported(triple{}), cmp.Comparer(floatEquals)); diff != "" { + + if diff := cmp.Diff(samples, got, cmp.AllowUnexported(triple{}), cmp.Comparer(floatEquals)); diff != "" { b.Fatalf("mismatch (-want +got):\n%s", diff) } diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index 246a9aafa8..f2a5187885 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -49,6 +49,14 @@ func (e Encoding) String() string { return "" } +// Compatible reports whether next encoding is compatible with the current +// encoding e. It's true if they are the same or the current encoding supports +// start timestamps and the next encoding is the same encoding without start +// timestamps. +func (e Encoding) Compatible(next Encoding) bool { + return e == next || (e == EncXORST && next == EncXOR) +} + // IsValidEncoding returns true for supported encodings. func IsValidEncoding(e Encoding) bool { return e == EncXOR || e == EncHistogram || e == EncFloatHistogram || e == EncXORST @@ -104,7 +112,10 @@ type Iterable interface { // Appender adds sample with start timestamp, timestamp, and value to a chunk. type Appender interface { - Append(st, t int64, v float64) + // Append appends a sample with start timestamp st, timestamp t, and value v to the chunk. + // Returns a new Chunk if the sample could not be appended to the current + // chunk because ST storage is required but not supported by the current chunk. + Append(st, t int64, v float64) (c Chunk, app Appender) // AppendHistogram and AppendFloatHistogram append a histogram sample to a histogram or float histogram chunk. // Appending a histogram may require creating a completely new chunk or recoding (changing) the current chunk. @@ -189,9 +200,12 @@ func (v ValueType) String() string { } } -func (v ValueType) ChunkEncoding() Encoding { +func (v ValueType) ChunkEncoding(storeST bool) Encoding { switch v { case ValFloat: + if storeST { + return EncXORST + } return EncXOR case ValHistogram: return EncHistogram diff --git a/tsdb/chunkenc/chunk_test.go b/tsdb/chunkenc/chunk_test.go index 08825a4b4c..f80da18767 100644 --- a/tsdb/chunkenc/chunk_test.go +++ b/tsdb/chunkenc/chunk_test.go @@ -70,11 +70,13 @@ func testChunk(t *testing.T, c Chunk, supportsST bool) { require.NoError(t, err) } - app.Append(ts-100, ts, v) expST := int64(0) if supportsST { expST = ts - 100 } + newChunk, _ := app.Append(expST, ts, v) + require.Nil(t, newChunk, "unexpected new chunk allocation") + exp = append(exp, triple{st: expST, t: ts, v: v}) } diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index 6af2fa68e2..4158a83e3d 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -195,7 +195,7 @@ func (a *FloatHistogramAppender) NumSamples() int { // Append implements Appender. This implementation panics because normal float // samples must never be appended to a histogram chunk. -func (*FloatHistogramAppender) Append(int64, int64, float64) { +func (*FloatHistogramAppender) Append(int64, int64, float64) (Chunk, Appender) { panic("appended a float sample to a histogram chunk") } diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index 4e77f387d3..8ab8f8c230 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -219,7 +219,7 @@ func (a *HistogramAppender) NumSamples() int { // Append implements Appender. This implementation panics because normal float // samples must never be appended to a histogram chunk. -func (*HistogramAppender) Append(int64, int64, float64) { +func (*HistogramAppender) Append(int64, int64, float64) (Chunk, Appender) { panic("appended a float sample to a histogram chunk") } diff --git a/tsdb/chunkenc/st_helper_test.go b/tsdb/chunkenc/st_helper_test.go index 662866de93..563d3e8ff0 100644 --- a/tsdb/chunkenc/st_helper_test.go +++ b/tsdb/chunkenc/st_helper_test.go @@ -30,7 +30,8 @@ func testChunkSTHandling(t *testing.T, vt ValueType, chunkFactory func() Chunk) sampleAppend := func(app Appender, vt ValueType, st, ts int64, v float64) { switch vt { case ValFloat: - app.Append(st, ts, v) + newChunk, _ := app.Append(st, ts, v) + require.Nil(t, newChunk) case ValHistogram: _, recoded, _, err := app.AppendHistogram(nil, st, ts, &histogram.Histogram{Sum: v, Count: uint64(v * 10)}, false) require.NoError(t, err) diff --git a/tsdb/chunkenc/xor.go b/tsdb/chunkenc/xor.go index 5a9a59dc22..bbe755e859 100644 --- a/tsdb/chunkenc/xor.go +++ b/tsdb/chunkenc/xor.go @@ -158,7 +158,18 @@ type xorAppender struct { trailing uint8 } -func (a *xorAppender) Append(_, t int64, v float64) { +func (a *xorAppender) Append(st, t int64, v float64) (Chunk, Appender) { + if st != 0 { + c := NewXORSTChunk() + app, err := c.Appender() + if err != nil { + // This should never happen, we just created the chunk. + panic("unexpected error creating XORST appender: " + err.Error()) + } + app.Append(st, t, v) + return c, app + } + var tDelta uint64 num := binary.BigEndian.Uint16(a.b.bytes()) switch num { @@ -213,6 +224,7 @@ func (a *xorAppender) Append(_, t int64, v float64) { a.v = v binary.BigEndian.PutUint16(a.b.bytes(), num+1) a.tDelta = tDelta + return nil, a } // bitRange returns whether the given integer can be represented by nbits. diff --git a/tsdb/chunkenc/xor_test.go b/tsdb/chunkenc/xor_test.go index b30c65283d..59cbb8cc2e 100644 --- a/tsdb/chunkenc/xor_test.go +++ b/tsdb/chunkenc/xor_test.go @@ -14,6 +14,7 @@ package chunkenc import ( + "fmt" "testing" "github.com/stretchr/testify/require" @@ -24,7 +25,8 @@ func BenchmarkXorRead(b *testing.B) { app, err := c.Appender() require.NoError(b, err) for i := int64(0); i < 120*1000; i += 1000 { - app.Append(0, i, float64(i)+float64(i)/10+float64(i)/100+float64(i)/1000) + newChunk, _ := app.Append(0, i, float64(i)+float64(i)/10+float64(i)/100+float64(i)/1000) + require.Nil(b, newChunk) } b.ReportAllocs() @@ -40,3 +42,52 @@ func BenchmarkXorRead(b *testing.B) { _, _ = ts, v } } + +func TestXORChunk_AppendST(t *testing.T) { + for stStartAt := range 5 { + t.Run(fmt.Sprintf("start ST at sample %d", stStartAt), func(t *testing.T) { + c := NewXORChunk() + chunks := []Chunk{c} + app, err := c.Appender() + require.NoError(t, err) + + timestamp := func(i int) int64 { return int64((i + 1) * 5000) } + st := func(i int) int64 { + if i >= stStartAt { + return 1000 + } + return 0 + } + for i := range 4 { + newChunk, newApp := app.Append(st(i), timestamp(i), float64(i)) + if i == stStartAt { + require.NotNil(t, newChunk, "expected new chunk allocation") + require.NotEqual(t, app, newApp, "expected new app allocation") + app = newApp + chunks = append(chunks, newChunk) + } else { + require.Nil(t, newChunk, "unexpected new chunk allocation") + require.Equal(t, app, newApp, "unexpected new app allocation") + } + } + if stStartAt < 4 { + require.Len(t, chunks, 2, "expected two chunks to be created") + } else { + require.Len(t, chunks, 1, "expected only one chunk to be created") + } + // Verify samples. + count := 0 + for _, chk := range chunks { + var it Iterator + it = chk.Iterator(it) + for it.Next() != ValNone { + ts, v := it.At() + require.Equal(t, float64(count), v, "value mismatch at timestamp %d", ts) + require.Equal(t, timestamp(count), ts, "timestamp mismatch at count %d", count) + require.Equal(t, st(count), it.AtST(), "ST mismatch at timestamp %d", ts) + count++ + } + } + }) + } +} diff --git a/tsdb/chunkenc/xorst.go b/tsdb/chunkenc/xorst.go index c0f131a935..e871471109 100644 --- a/tsdb/chunkenc/xorst.go +++ b/tsdb/chunkenc/xorst.go @@ -228,7 +228,7 @@ func (it *xorSTIterator) Reset(b []byte) { it.err = nil } -func (a *xorSTAppender) Append(st, t int64, v float64) { +func (a *xorSTAppender) Append(st, t int64, v float64) (Chunk, Appender) { var ( stDelta int64 tDelta uint64 @@ -348,6 +348,7 @@ func (a *xorSTAppender) Append(st, t int64, v float64) { a.numTotal++ binary.BigEndian.PutUint16(a.b.bytes(), a.numTotal) + return nil, a } func (it *xorSTIterator) retErr(err error) ValueType { diff --git a/tsdb/chunkenc/xorst_test.go b/tsdb/chunkenc/xorst_test.go index aa77b968de..775f2d4571 100644 --- a/tsdb/chunkenc/xorst_test.go +++ b/tsdb/chunkenc/xorst_test.go @@ -33,7 +33,8 @@ func TestXorOptSTChunk_MoreThan127Samples(t *testing.T) { app, err := chunk.Appender() require.NoError(t, err) for i := range afterMax { - app.Append(0, int64(i*10+1), float64(i)*1.5) + newChunk, _ := app.Append(0, int64(i*10+1), float64(i)*1.5) + require.Nil(t, newChunk) } it := chunk.Iterator(nil) @@ -59,7 +60,8 @@ func TestXorOptSTChunk_MoreThan127Samples(t *testing.T) { if i == afterMax-1 { st = int64((afterMax - 1) * 10) } - app.Append(st, int64(i*10+1), float64(i)*1.5) + newChunk, _ := app.Append(st, int64(i*10+1), float64(i)*1.5) + require.Nil(t, newChunk) } it := chunk.Iterator(nil) diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go index ce4c9d3d78..bb17077a73 100644 --- a/tsdb/chunks/chunks.go +++ b/tsdb/chunks/chunks.go @@ -135,12 +135,12 @@ type Meta struct { } // ChunkFromSamples requires all samples to have the same type. -// TODO(krajorama): test with ST when chunk formats support it. func ChunkFromSamples(s []Sample) (Meta, error) { return ChunkFromSamplesGeneric(SampleSlice(s)) } // ChunkFromSamplesGeneric requires all samples to have the same type. +// Should only be used in tests. func ChunkFromSamplesGeneric(s Samples) (Meta, error) { emptyChunk := Meta{Chunk: chunkenc.NewXORChunk()} mint, maxt := int64(0), int64(0) @@ -153,8 +153,17 @@ func ChunkFromSamplesGeneric(s Samples) (Meta, error) { return emptyChunk, nil } + // Check if any sample has a non-zero start time (ST). + storeST := false + for i := 0; i < s.Len(); i++ { + if s.Get(i).ST() != 0 { + storeST = true + break + } + } + sampleType := s.Get(0).Type() - c, err := chunkenc.NewEmptyChunk(sampleType.ChunkEncoding()) + c, err := chunkenc.NewEmptyChunk(sampleType.ChunkEncoding(storeST)) if err != nil { return Meta{}, err } @@ -165,7 +174,10 @@ func ChunkFromSamplesGeneric(s Samples) (Meta, error) { for i := 0; i < s.Len(); i++ { switch sampleType { case chunkenc.ValFloat: - ca.Append(s.Get(i).ST(), s.Get(i).T(), s.Get(i).F()) + newChunk, ca = ca.Append(s.Get(i).ST(), s.Get(i).T(), s.Get(i).F()) + if newChunk != nil { + return emptyChunk, errors.New("did not expect to start a second chunk") + } case chunkenc.ValHistogram: newChunk, _, ca, err = ca.AppendHistogram(nil, s.Get(i).ST(), s.Get(i).T(), s.Get(i).H(), false) if err != nil { diff --git a/tsdb/chunks/chunks_test.go b/tsdb/chunks/chunks_test.go index f40f996fde..a1da813aa7 100644 --- a/tsdb/chunks/chunks_test.go +++ b/tsdb/chunks/chunks_test.go @@ -19,6 +19,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/tsdbutil" ) @@ -58,3 +59,64 @@ func TestWriterWithDefaultSegmentSize(t *testing.T) { require.NoError(t, err) require.Len(t, d, 1, "expected only one segment to be created to hold both chunks") } + +func TestChunkFromSamplesGenericWithST(t *testing.T) { + testCases := []struct { + name string + samples []Sample + }{ + { + name: "all samples have ST==0", + samples: []Sample{ + sample{t: 10, f: 1.0}, + sample{t: 20, f: 2.0}, + sample{t: 30, f: 3.0}, + sample{t: 40, f: 4.0}, + }, + }, + { + name: "all samples have ST!=0", + samples: []Sample{ + sample{st: 5, t: 10, f: 1.0}, + sample{st: 15, t: 20, f: 2.0}, + sample{st: 25, t: 30, f: 3.0}, + sample{st: 35, t: 40, f: 4.0}, + }, + }, + { + name: "half samples have ST and half not", + samples: []Sample{ + sample{t: 10, f: 1.0}, + sample{st: 15, t: 20, f: 2.0}, + sample{t: 30, f: 3.0}, + sample{st: 35, t: 40, f: 4.0}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + meta, err := ChunkFromSamplesGeneric(SampleSlice(tc.samples)) + require.NoError(t, err) + require.NotNil(t, meta.Chunk, "expected a single chunk to be returned") + + // Verify MinTime and MaxTime + require.Equal(t, tc.samples[0].T(), meta.MinTime) + require.Equal(t, tc.samples[len(tc.samples)-1].T(), meta.MaxTime) + + // Iterate through the chunk and verify values + it := meta.Chunk.Iterator(nil) + idx := 0 + for it.Next() != chunkenc.ValNone { + ts, val := it.At() + st := it.AtST() + require.Equal(t, tc.samples[idx].ST(), st, "ST mismatch at index %d", idx) + require.Equal(t, tc.samples[idx].T(), ts, "T mismatch at index %d", idx) + require.Equal(t, tc.samples[idx].F(), val, "F mismatch at index %d", idx) + idx++ + } + require.NoError(t, it.Err()) + require.Equal(t, len(tc.samples), idx, "number of samples mismatch") + }) + } +} diff --git a/tsdb/chunks/head_chunks_test.go b/tsdb/chunks/head_chunks_test.go index c3cbc5a618..94036011ed 100644 --- a/tsdb/chunks/head_chunks_test.go +++ b/tsdb/chunks/head_chunks_test.go @@ -559,7 +559,8 @@ func randomChunk(t *testing.T) chunkenc.Chunk { app, err := chunk.Appender() require.NoError(t, err) for range length { - app.Append(0, rand.Int63(), rand.Float64()) + // Not checking for new chunk as we supply ST==0 always. + _, _ = app.Append(0, rand.Int63(), rand.Float64()) } return chunk } diff --git a/tsdb/head_append.go b/tsdb/head_append.go index c171079509..977cf7ed3c 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1377,7 +1377,8 @@ func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitConte // Sample is OOO and OOO handling is enabled // and the delta is within the OOO tolerance. var mmapRefs []chunks.ChunkDiskMapperRef - ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, nil, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) + // TODO(krajorama): pass non zero ST if a.head.opts.EnableSTStorage is true. + ok, chunkCreated, mmapRefs = series.insert(0, s.T, s.V, nil, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) if chunkCreated { r, ok := acc.oooMmapMarkers[series.ref] if !ok || r != nil { @@ -1421,7 +1422,8 @@ func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitConte default: newlyStale := !value.IsStaleNaN(series.lastValue) && value.IsStaleNaN(s.V) staleToNonStale := value.IsStaleNaN(series.lastValue) && !value.IsStaleNaN(s.V) - ok, chunkCreated = series.append(s.T, s.V, a.appendID, acc.appendChunkOpts) + // TODO(krajorama): pass non zero ST if a.head.opts.EnableSTStorage is true. + ok, chunkCreated = series.append(0, s.T, s.V, a.appendID, acc.appendChunkOpts) if ok { if s.T < acc.inOrderMint { acc.inOrderMint = s.T @@ -1482,7 +1484,8 @@ func (a *headAppenderBase) commitHistograms(b *appendBatch, acc *appenderCommitC // Sample is OOO and OOO handling is enabled // and the delta is within the OOO tolerance. var mmapRefs []chunks.ChunkDiskMapperRef - ok, chunkCreated, mmapRefs = series.insert(s.T, 0, s.H, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) + // TODO(krajorama): pass non zero ST if a.head.opts.EnableSTStorage is true. + ok, chunkCreated, mmapRefs = series.insert(0, s.T, 0, s.H, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) if chunkCreated { r, ok := acc.oooMmapMarkers[series.ref] if !ok || r != nil { @@ -1530,7 +1533,8 @@ func (a *headAppenderBase) commitHistograms(b *appendBatch, acc *appenderCommitC newlyStale = newlyStale && !value.IsStaleNaN(series.lastHistogramValue.Sum) staleToNonStale = value.IsStaleNaN(series.lastHistogramValue.Sum) && !value.IsStaleNaN(s.H.Sum) } - ok, chunkCreated = series.appendHistogram(s.T, s.H, a.appendID, acc.appendChunkOpts) + // TODO(krajorama): pass non zero ST if a.head.opts.EnableSTStorage is true. + ok, chunkCreated = series.appendHistogram(0, s.T, s.H, a.appendID, acc.appendChunkOpts) if ok { if s.T < acc.inOrderMint { acc.inOrderMint = s.T @@ -1591,7 +1595,8 @@ func (a *headAppenderBase) commitFloatHistograms(b *appendBatch, acc *appenderCo // Sample is OOO and OOO handling is enabled // and the delta is within the OOO tolerance. var mmapRefs []chunks.ChunkDiskMapperRef - ok, chunkCreated, mmapRefs = series.insert(s.T, 0, nil, s.FH, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) + // TODO(krajorama): pass non zero ST if a.head.opts.EnableSTStorage is true. + ok, chunkCreated, mmapRefs = series.insert(0, s.T, 0, nil, s.FH, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) if chunkCreated { r, ok := acc.oooMmapMarkers[series.ref] if !ok || r != nil { @@ -1639,7 +1644,8 @@ func (a *headAppenderBase) commitFloatHistograms(b *appendBatch, acc *appenderCo newlyStale = newlyStale && !value.IsStaleNaN(series.lastFloatHistogramValue.Sum) staleToNonStale = value.IsStaleNaN(series.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.FH.Sum) } - ok, chunkCreated = series.appendFloatHistogram(s.T, s.FH, a.appendID, acc.appendChunkOpts) + // TODO(krajorama): pass non zero ST if a.head.opts.EnableSTStorage is true. + ok, chunkCreated = series.appendFloatHistogram(0, s.T, s.FH, a.appendID, acc.appendChunkOpts) if ok { if s.T < acc.inOrderMint { acc.inOrderMint = s.T @@ -1786,7 +1792,7 @@ func (a *headAppenderBase) Commit() (err error) { } // insert is like append, except it inserts. Used for OOO samples. -func (s *memSeries) insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64, logger *slog.Logger) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) { +func (s *memSeries) insert(st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64, logger *slog.Logger) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) { if s.ooo == nil { s.ooo = &memSeriesOOOFields{} } @@ -1797,7 +1803,7 @@ func (s *memSeries) insert(t int64, v float64, h *histogram.Histogram, fh *histo chunkCreated = true } - ok := c.chunk.Insert(t, v, h, fh) + ok := c.chunk.Insert(st, t, v, h, fh) if ok { if chunkCreated || t < c.minTime { c.minTime = t @@ -1820,15 +1826,17 @@ type chunkOpts struct { // the appendID for isolation. (The appendID can be zero, which results in no // isolation for this append.) // Series lock must be held when calling. -func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { - c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, o) +func (s *memSeries) append(st, t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { + encoding := chunkenc.EncXOR + if st != 0 { + encoding = chunkenc.EncXORST + } + c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, encoding, o) if !sampleInOrder { return sampleInOrder, chunkCreated } - // TODO(krajorama): pass ST. - s.app.Append(0, t, v) - - c.maxTime = t + var newChunk chunkenc.Chunk + newChunk, s.app = s.app.Append(st, t, v) s.lastValue = v s.lastHistogramValue = nil @@ -1838,6 +1846,19 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sa s.txs.add(appendID) } + if newChunk == nil { // Sample was appended to existing chunk or is the first sample in a new chunk. + c.maxTime = t + return true, chunkCreated + } + + s.headChunks = &memChunk{ + chunk: newChunk, + minTime: t, + maxTime: t, + prev: s.headChunks, + } + s.nextAt = rangeForTimestamp(t, o.chunkRange) + return true, chunkCreated } @@ -1846,14 +1867,19 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sa // In case of recoding the existing chunk, a new chunk is allocated and the old chunk is dropped. // To keep the meaning of prometheus_tsdb_head_chunks and prometheus_tsdb_head_chunks_created_total // consistent, we return chunkCreated=false in this case. -func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { +func (s *memSeries) appendHistogram(st, t int64, h *histogram.Histogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { // Head controls the execution of recoding, so that we own the proper // chunk reference afterwards and mmap used up chunks. // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. prevApp, _ := s.app.(*chunkenc.HistogramAppender) - c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.EncHistogram, o) + encoding := chunkenc.EncHistogram + if st != 0 { + // TODO(krajorama): handle ST != 0 case. + encoding = chunkenc.EncHistogram // ST + } + c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, encoding, o) if !sampleInOrder { return sampleInOrder, chunkCreated } @@ -1868,8 +1894,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui prevApp = nil } - // TODO(krajorama): pass ST. - newChunk, recoded, s.app, _ = s.app.AppendHistogram(prevApp, 0, t, h, false) // false=request a new chunk if needed + newChunk, recoded, s.app, _ = s.app.AppendHistogram(prevApp, st, t, h, false) // false=request a new chunk if needed s.lastHistogramValue = h s.lastFloatHistogramValue = nil @@ -1904,14 +1929,19 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui // In case of recoding the existing chunk, a new chunk is allocated and the old chunk is dropped. // To keep the meaning of prometheus_tsdb_head_chunks and prometheus_tsdb_head_chunks_created_total // consistent, we return chunkCreated=false in this case. -func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { +func (s *memSeries) appendFloatHistogram(st, t int64, fh *histogram.FloatHistogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { // Head controls the execution of recoding, so that we own the proper // chunk reference afterwards and mmap used up chunks. // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. prevApp, _ := s.app.(*chunkenc.FloatHistogramAppender) - c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.EncFloatHistogram, o) + encoding := chunkenc.EncFloatHistogram + if st != 0 { + // TODO(krajorama): handle ST != 0 case. + encoding = chunkenc.EncFloatHistogram // ST + } + c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, encoding, o) if !sampleInOrder { return sampleInOrder, chunkCreated } @@ -1926,8 +1956,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, prevApp = nil } - // TODO(krajorama): pass ST. - newChunk, recoded, s.app, _ = s.app.AppendFloatHistogram(prevApp, 0, t, fh, false) // False means request a new chunk if needed. + newChunk, recoded, s.app, _ = s.app.AppendFloatHistogram(prevApp, st, t, fh, false) // False means request a new chunk if needed. s.lastHistogramValue = nil s.lastFloatHistogramValue = fh @@ -1990,9 +2019,11 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts chunkCreated = true } - if c.chunk.Encoding() != e { - // The chunk encoding expected by this append is different than the head chunk's - // encoding. So we cut a new chunk with the expected encoding. + if !c.chunk.Encoding().Compatible(e) { + // The chunk encoding expected by this append is different than the head + // chunk's encoding. Or start timestamp is requested, but cannot be + // stored in the existing chunk encoding. + // So we cut a new chunk with the expected encoding. c = s.cutNewHeadChunk(t, e, o.chunkRange) chunkCreated = true } @@ -2047,9 +2078,11 @@ func (s *memSeries) histogramsAppendPreprocessor(t int64, e chunkenc.Encoding, o return c, false, chunkCreated } - if c.chunk.Encoding() != e { - // The chunk encoding expected by this append is different than the head chunk's - // encoding. So we cut a new chunk with the expected encoding. + if !c.chunk.Encoding().Compatible(e) { + // The chunk encoding expected by this append is different than the head + // chunk's encoding. Or start timestamp is requested, but cannot be + // stored in the existing chunk encoding. + // So we cut a new chunk with the expected encoding. c = s.cutNewHeadChunk(t, e, o.chunkRange) chunkCreated = true } @@ -2134,7 +2167,11 @@ func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange panic(err) // This should never happen. } } else { - s.headChunks.chunk = chunkenc.NewXORChunk() + var err error + s.headChunks.chunk, err = chunkenc.NewEmptyChunk(chunkenc.EncXORST) + if err != nil { + panic(err) // This should never happen. + } } // Set upper bound on when the next chunk must be started. An earlier timestamp diff --git a/tsdb/head_read_test.go b/tsdb/head_read_test.go index cf55973a01..0849c257b5 100644 --- a/tsdb/head_read_test.go +++ b/tsdb/head_read_test.go @@ -33,7 +33,7 @@ func TestMemSeries_chunk(t *testing.T) { appendSamples := func(t *testing.T, s *memSeries, start, end int64, cdm *chunks.ChunkDiskMapper) { for i := start; i < end; i += chunkStep { - ok, _ := s.append(i, float64(i), 0, chunkOpts{ + ok, _ := s.append(0, i, float64(i), 0, chunkOpts{ chunkDiskMapper: cdm, chunkRange: chunkRange, samplesPerChunk: DefaultSamplesPerChunk, diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 493f938860..f40074e994 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -346,7 +346,7 @@ func BenchmarkLoadWLs(b *testing.B) { for k := 0; k < c.batches*c.seriesPerBatch; k++ { // Create one mmapped chunk per series, with one sample at the given time. s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, 0, defaultIsolationDisabled, false) - s.append(c.mmappedChunkT, 42, 0, cOpts) + s.append(0, c.mmappedChunkT, 42, 0, cOpts) // There's only one head chunk because only a single sample is appended. mmapChunks() // ignores the latest chunk, so we need to cut a new head chunk to guarantee the chunk with // the sample at c.mmappedChunkT is mmapped. @@ -1438,7 +1438,7 @@ func TestMemSeries_truncateChunks(t *testing.T) { s := newMemSeries(labels.FromStrings("a", "b"), 1, 0, defaultIsolationDisabled, false) for i := 0; i < 4000; i += 5 { - ok, _ := s.append(int64(i), float64(i), 0, cOpts) + ok, _ := s.append(0, int64(i), float64(i), 0, cOpts) require.True(t, ok, "sample append failed") } s.mmapChunks(chunkDiskMapper) @@ -1588,7 +1588,7 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) { if tc.mmappedChunks > 0 { headStart = (tc.mmappedChunks + 1) * chunkRange for i := 0; i < (tc.mmappedChunks+1)*chunkRange; i += chunkStep { - ok, _ := series.append(int64(i), float64(i), 0, cOpts) + ok, _ := series.append(0, int64(i), float64(i), 0, cOpts) require.True(t, ok, "sample append failed") } series.mmapChunks(chunkDiskMapper) @@ -1598,7 +1598,7 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) { series.headChunks = nil } else { for i := headStart; i < chunkRange*(tc.mmappedChunks+tc.headChunks); i += chunkStep { - ok, _ := series.append(int64(i), float64(i), 0, cOpts) + ok, _ := series.append(0, int64(i), float64(i), 0, cOpts) require.True(t, ok, "sample append failed: %d", i) } } @@ -2146,20 +2146,20 @@ func TestMemSeries_append(t *testing.T) { // Add first two samples at the very end of a chunk range and the next two // on and after it. // New chunk must correctly be cut at 1000. - ok, chunkCreated := s.append(998, 1, 0, cOpts) + ok, chunkCreated := s.append(0, 998, 1, 0, cOpts) require.True(t, ok, "append failed") require.True(t, chunkCreated, "first sample created chunk") - ok, chunkCreated = s.append(999, 2, 0, cOpts) + ok, chunkCreated = s.append(0, 999, 2, 0, cOpts) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") s.mmapChunks(chunkDiskMapper) - ok, chunkCreated = s.append(1000, 3, 0, cOpts) + ok, chunkCreated = s.append(0, 1000, 3, 0, cOpts) require.True(t, ok, "append failed") require.True(t, chunkCreated, "expected new chunk on boundary") - ok, chunkCreated = s.append(1001, 4, 0, cOpts) + ok, chunkCreated = s.append(0, 1001, 4, 0, cOpts) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") @@ -2173,7 +2173,7 @@ func TestMemSeries_append(t *testing.T) { // Fill the range [1000,2000) with many samples. Intermediate chunks should be cut // at approximately 120 samples per chunk. for i := 1; i < 1000; i++ { - ok, _ := s.append(1001+int64(i), float64(i), 0, cOpts) + ok, _ := s.append(0, 1001+int64(i), float64(i), 0, cOpts) require.True(t, ok, "append failed") } s.mmapChunks(chunkDiskMapper) @@ -2188,6 +2188,87 @@ func TestMemSeries_append(t *testing.T) { } } +func TestMemSeries_appendST(t *testing.T) { + t.Run("float samples", func(t *testing.T) { + testMemSeriesAppendST(t, chunkenc.ValFloat, chunkenc.EncXOR, chunkenc.EncXORST) + }) + t.Run("histogram samples", func(t *testing.T) { + testMemSeriesAppendST(t, chunkenc.ValHistogram, chunkenc.EncHistogram, chunkenc.EncHistogram) + }) + t.Run("float histogram samples", func(t *testing.T) { + testMemSeriesAppendST(t, chunkenc.ValFloatHistogram, chunkenc.EncFloatHistogram, chunkenc.EncFloatHistogram) + }) +} + +func testMemSeriesAppendST(t *testing.T, valueType chunkenc.ValueType, noSTenc, stEnc chunkenc.Encoding) { + // Once we switch to ST encoding, we stay on it, until chunk is full. + expectedMixEncoding := []chunkenc.Encoding{noSTenc, stEnc} + if stEnc == noSTenc { + // TODO(krajorama): Remove this code when ST encoding for histograms is implemented. + expectedMixEncoding = []chunkenc.Encoding{noSTenc} + } + testCases := []struct { + name string + sts []int64 + expectedEncoding []chunkenc.Encoding + }{ + { + name: "no st", + sts: []int64{0, 0, 0, 0}, + expectedEncoding: []chunkenc.Encoding{noSTenc}, + }, + { + name: "with st", + sts: []int64{1, 1, 1, 1}, + expectedEncoding: []chunkenc.Encoding{stEnc}, + }, + { + name: "mixed st", + sts: []int64{0, 1, 0, 1, 0, 1}, + expectedEncoding: expectedMixEncoding, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cOpts := chunkOpts{ + chunkRange: int64(1000), + samplesPerChunk: DefaultSamplesPerChunk, + } + + s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled, false) + + for i, st := range tc.sts { + var ok bool + switch valueType { + case chunkenc.ValFloat: + ok, _ = s.append(st, int64(i), float64(i), 0, cOpts) + case chunkenc.ValHistogram: + hist := tsdbutil.GenerateTestHistograms(1)[0] + ok, _ = s.appendHistogram(st, int64(i), hist, 0, cOpts) + case chunkenc.ValFloatHistogram: + fhist := tsdbutil.GenerateTestFloatHistograms(1)[0] + ok, _ = s.appendFloatHistogram(st, int64(i), fhist, 0, cOpts) + default: + require.Fail(t, "unsupported value type") + } + require.Truef(t, ok, "append failed at index %d", i) + } + + chunks := []*memChunk{} + chk := s.headChunks + for chk != nil { + chunks = append(chunks, chk) + chk = chk.prev + } + slices.Reverse(chunks) + require.Len(t, chunks, len(tc.expectedEncoding), "expected number of chunks") + for i, enc := range tc.expectedEncoding { + require.Equal(t, enc, chunks[i].chunk.Encoding(), "unexpected chunk encoding at index %d", i) + } + }) + } +} + func TestMemSeries_appendHistogram(t *testing.T) { dir := t.TempDir() // This is usually taken from the Head, but passing manually here. @@ -2214,19 +2295,19 @@ func TestMemSeries_appendHistogram(t *testing.T) { // Add first two samples at the very end of a chunk range and the next two // on and after it. // New chunk must correctly be cut at 1000. - ok, chunkCreated := s.appendHistogram(998, histograms[0], 0, cOpts) + ok, chunkCreated := s.appendHistogram(0, 998, histograms[0], 0, cOpts) require.True(t, ok, "append failed") require.True(t, chunkCreated, "first sample created chunk") - ok, chunkCreated = s.appendHistogram(999, histograms[1], 0, cOpts) + ok, chunkCreated = s.appendHistogram(0, 999, histograms[1], 0, cOpts) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") - ok, chunkCreated = s.appendHistogram(1000, histograms[2], 0, cOpts) + ok, chunkCreated = s.appendHistogram(0, 1000, histograms[2], 0, cOpts) require.True(t, ok, "append failed") require.True(t, chunkCreated, "expected new chunk on boundary") - ok, chunkCreated = s.appendHistogram(1001, histograms[3], 0, cOpts) + ok, chunkCreated = s.appendHistogram(0, 1001, histograms[3], 0, cOpts) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") @@ -2237,7 +2318,7 @@ func TestMemSeries_appendHistogram(t *testing.T) { require.Equal(t, int64(1000), s.headChunks.minTime, "wrong chunk range") require.Equal(t, int64(1001), s.headChunks.maxTime, "wrong chunk range") - ok, chunkCreated = s.appendHistogram(1002, histogramWithOneMoreBucket, 0, cOpts) + ok, chunkCreated = s.appendHistogram(0, 1002, histogramWithOneMoreBucket, 0, cOpts) require.True(t, ok, "append failed") require.False(t, chunkCreated, "third sample should trigger a re-encoded chunk") @@ -2272,7 +2353,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { var nextTs int64 var totalAppendedSamples int for i := range samplesPerChunk / 4 { - ok, _ := s.append(nextTs, float64(i), 0, cOpts) + ok, _ := s.append(0, nextTs, float64(i), 0, cOpts) require.Truef(t, ok, "slow sample %d was not appended", i) nextTs += slowRate totalAppendedSamples++ @@ -2281,12 +2362,12 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { // Suddenly, the rate increases and we receive a sample every millisecond. for i := range math.MaxUint16 { - ok, _ := s.append(nextTs, float64(i), 0, cOpts) + ok, _ := s.append(0, nextTs, float64(i), 0, cOpts) require.Truef(t, ok, "quick sample %d was not appended", i) nextTs++ totalAppendedSamples++ } - ok, chunkCreated := s.append(DefaultBlockDuration, float64(0), 0, cOpts) + ok, chunkCreated := s.append(0, DefaultBlockDuration, float64(0), 0, cOpts) require.True(t, ok, "new chunk sample was not appended") require.True(t, chunkCreated, "sample at block duration timestamp should create a new chunk") @@ -2315,18 +2396,18 @@ func TestGCChunkAccess(t *testing.T) { s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false) // Appending 2 samples for the first chunk. - ok, chunkCreated := s.append(0, 0, 0, cOpts) + ok, chunkCreated := s.append(0, 0, 0, 0, cOpts) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(999, 999, 0, cOpts) + ok, chunkCreated = s.append(0, 999, 999, 0, cOpts) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") // A new chunks should be created here as it's beyond the chunk range. - ok, chunkCreated = s.append(1000, 1000, 0, cOpts) + ok, chunkCreated = s.append(0, 1000, 1000, 0, cOpts) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(1999, 1999, 0, cOpts) + ok, chunkCreated = s.append(0, 1999, 1999, 0, cOpts) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") @@ -2371,18 +2452,18 @@ func TestGCSeriesAccess(t *testing.T) { s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false) // Appending 2 samples for the first chunk. - ok, chunkCreated := s.append(0, 0, 0, cOpts) + ok, chunkCreated := s.append(0, 0, 0, 0, cOpts) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(999, 999, 0, cOpts) + ok, chunkCreated = s.append(0, 999, 999, 0, cOpts) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") // A new chunks should be created here as it's beyond the chunk range. - ok, chunkCreated = s.append(1000, 1000, 0, cOpts) + ok, chunkCreated = s.append(0, 1000, 1000, 0, cOpts) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(1999, 1999, 0, cOpts) + ok, chunkCreated = s.append(0, 1999, 1999, 0, cOpts) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") @@ -2713,10 +2794,10 @@ func TestHeadReadWriterRepair(t *testing.T) { require.True(t, created, "series was not created") for i := range 7 { - ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, cOpts) + ok, chunkCreated := s.append(0, int64(i*chunkRange), float64(i*chunkRange), 0, cOpts) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunk was not created") - ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, cOpts) + ok, chunkCreated = s.append(0, int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, cOpts) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunk was created") h.chunkDiskMapper.CutNewFile() @@ -3056,7 +3137,7 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) { s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false) - ok, _ := s.append(0, 0, 0, cOpts) + ok, _ := s.append(0, 0, 0, 0, cOpts) require.True(t, ok, "Series append failed.") require.Equal(t, 0, int(s.txs.txIDCount), "Series should not have an appendID after append with appendID=0.") } @@ -3616,7 +3697,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) { s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled, false) for i := range 7 { - ok, _ := s.append(int64(i), float64(i), 0, cOpts) + ok, _ := s.append(0, int64(i), float64(i), 0, cOpts) require.True(t, ok, "sample append failed") } diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index b323f0dbf6..07ce86854e 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -667,7 +667,8 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp h.numStaleSeries.Dec() } - if _, chunkCreated := ms.append(s.T, s.V, 0, appendChunkOpts); chunkCreated { + // TODO(krajorama): Pass ST when available in WAL records. + if _, chunkCreated := ms.append(0, s.T, s.V, 0, appendChunkOpts); chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() _ = ms.mmapChunks(h.chunkDiskMapper) @@ -704,14 +705,16 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp newlyStale = newlyStale && !value.IsStaleNaN(ms.lastHistogramValue.Sum) staleToNonStale = value.IsStaleNaN(ms.lastHistogramValue.Sum) && !value.IsStaleNaN(s.h.Sum) } - _, chunkCreated = ms.appendHistogram(s.t, s.h, 0, appendChunkOpts) + // TODO(krajorama): Pass ST when available in WAL records. + _, chunkCreated = ms.appendHistogram(0, s.t, s.h, 0, appendChunkOpts) } else { newlyStale = value.IsStaleNaN(s.fh.Sum) if ms.lastFloatHistogramValue != nil { newlyStale = newlyStale && !value.IsStaleNaN(ms.lastFloatHistogramValue.Sum) staleToNonStale = value.IsStaleNaN(ms.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.fh.Sum) } - _, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, appendChunkOpts) + // TODO(krajorama): Pass ST when available in WAL records. + _, chunkCreated = ms.appendFloatHistogram(0, s.t, s.fh, 0, appendChunkOpts) } if newlyStale { h.numStaleSeries.Inc() @@ -1097,7 +1100,8 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (map[chunks.HeadSeriesR missingSeries[s.Ref] = struct{}{} continue } - ok, chunkCreated, _ := ms.insert(s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger) + // TODO(krajorama): Pass ST when available in WBL records. + ok, chunkCreated, _ := ms.insert(0, s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger) if chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() @@ -1125,9 +1129,11 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (map[chunks.HeadSeriesR var chunkCreated bool var ok bool if s.h != nil { - ok, chunkCreated, _ = ms.insert(s.t, 0, s.h, nil, h.chunkDiskMapper, oooCapMax, h.logger) + // TODO(krajorama): Pass ST when available in WBL records. + ok, chunkCreated, _ = ms.insert(0, s.t, 0, s.h, nil, h.chunkDiskMapper, oooCapMax, h.logger) } else { - ok, chunkCreated, _ = ms.insert(s.t, 0, nil, s.fh, h.chunkDiskMapper, oooCapMax, h.logger) + // TODO(krajorama): Pass ST when available in WBL records. + ok, chunkCreated, _ = ms.insert(0, s.t, 0, nil, s.fh, h.chunkDiskMapper, oooCapMax, h.logger) } if chunkCreated { h.metrics.chunksCreated.Inc() diff --git a/tsdb/ooo_head.go b/tsdb/ooo_head.go index f9746c4c61..d5797d9c1e 100644 --- a/tsdb/ooo_head.go +++ b/tsdb/ooo_head.go @@ -34,14 +34,13 @@ func NewOOOChunk() *OOOChunk { // Insert inserts the sample such that order is maintained. // Returns false if insert was not possible due to the same timestamp already existing. -func (o *OOOChunk) Insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram) bool { +func (o *OOOChunk) Insert(st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram) bool { // Although out-of-order samples can be out-of-order amongst themselves, we // are opinionated and expect them to be usually in-order meaning we could // try to append at the end first if the new timestamp is higher than the // last known timestamp. if len(o.samples) == 0 || t > o.samples[len(o.samples)-1].t { - // TODO(krajorama): pass ST. - o.samples = append(o.samples, sample{0, t, v, h, fh}) + o.samples = append(o.samples, sample{st, t, v, h, fh}) return true } @@ -50,8 +49,7 @@ func (o *OOOChunk) Insert(t int64, v float64, h *histogram.Histogram, fh *histog if i >= len(o.samples) { // none found. append it at the end - // TODO(krajorama): pass ST. - o.samples = append(o.samples, sample{0, t, v, h, fh}) + o.samples = append(o.samples, sample{st, t, v, h, fh}) return true } @@ -63,8 +61,7 @@ func (o *OOOChunk) Insert(t int64, v float64, h *histogram.Histogram, fh *histog // Expand length by 1 to make room. use a zero sample, we will overwrite it anyway. o.samples = append(o.samples, sample{}) copy(o.samples[i+1:], o.samples[i:]) - // TODO(krajorama): pass ST. - o.samples[i] = sample{0, t, v, h, fh} + o.samples[i] = sample{st, t, v, h, fh} return true } @@ -97,29 +94,26 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error break } encoding := chunkenc.EncXOR - if s.h != nil { - encoding = chunkenc.EncHistogram - } else if s.fh != nil { + switch { + case s.fh != nil: encoding = chunkenc.EncFloatHistogram + case s.h != nil: + encoding = chunkenc.EncHistogram + case s.st != 0: + encoding = chunkenc.EncXORST } // prevApp is the appender for the previous sample. prevApp := app - if encoding != prevEncoding { // For the first sample, this will always be true as EncNone != EncXOR | EncHistogram | EncFloatHistogram + if !prevEncoding.Compatible(encoding) { // For the first sample, this will always be true as EncNone != anything. if prevEncoding != chunkenc.EncNone { chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) } cmint = s.t - switch encoding { - case chunkenc.EncXOR: - chunk = chunkenc.NewXORChunk() - case chunkenc.EncHistogram: - chunk = chunkenc.NewHistogramChunk() - case chunkenc.EncFloatHistogram: - chunk = chunkenc.NewFloatHistogramChunk() - default: - chunk = chunkenc.NewXORChunk() + chunk, err = chunkenc.NewEmptyChunk(encoding) + if err != nil { + return chks, err } app, err = chunk.Appender() if err != nil { @@ -127,9 +121,14 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error } } switch encoding { - case chunkenc.EncXOR: - // TODO(krajorama): pass ST. - app.Append(0, s.t, s.f) + case chunkenc.EncXOR, chunkenc.EncXORST: + var newChunk chunkenc.Chunk + newChunk, app = app.Append(s.st, s.t, s.f) + if newChunk != nil { // A new chunk was allocated. + chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) + cmint = s.t + chunk = newChunk + } case chunkenc.EncHistogram: // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. prevHApp, _ := prevApp.(*chunkenc.HistogramAppender) @@ -137,8 +136,7 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error newChunk chunkenc.Chunk recoded bool ) - // TODO(krajorama): pass ST. - newChunk, recoded, app, _ = app.AppendHistogram(prevHApp, 0, s.t, s.h, false) + newChunk, recoded, app, _ = app.AppendHistogram(prevHApp, s.st, s.t, s.h, false) if newChunk != nil { // A new chunk was allocated. if !recoded { chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) @@ -153,8 +151,7 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error newChunk chunkenc.Chunk recoded bool ) - // TODO(krajorama): pass ST. - newChunk, recoded, app, _ = app.AppendFloatHistogram(prevHApp, 0, s.t, s.fh, false) + newChunk, recoded, app, _ = app.AppendFloatHistogram(prevHApp, s.st, s.t, s.fh, false) if newChunk != nil { // A new chunk was allocated. if !recoded { chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) diff --git a/tsdb/ooo_head_test.go b/tsdb/ooo_head_test.go index 99cd357a30..0718576e86 100644 --- a/tsdb/ooo_head_test.go +++ b/tsdb/ooo_head_test.go @@ -85,7 +85,7 @@ func testOOOInsert(t *testing.T, chunk.samples = make([]sample, numPreExisting) chunk.samples = makeEvenSampleSlice(numPreExisting, sampleFunc) newSample := sampleFunc(valOdd(insertPos)) - chunk.Insert(newSample.t, newSample.f, newSample.h, newSample.fh) + chunk.Insert(0, newSample.t, newSample.f, newSample.h, newSample.fh) var expSamples []sample // Our expected new samples slice, will be first the original samples. @@ -131,6 +131,13 @@ func TestOOOInsertDuplicate(t *testing.T) { t.Run(name, func(t *testing.T) { testOOOInsertDuplicate(t, scenario.sampleFunc) }) + t.Run(name+"_with_ST", func(t *testing.T) { + testOOOInsertDuplicate(t, func(ts int64) sample { + s := scenario.sampleFunc(ts) + s.st = ts + 1000 // Arbitrary ST to differ from t. + return s + }) + }) } } @@ -144,8 +151,9 @@ func testOOOInsertDuplicate(t *testing.T, dupSample := chunk.samples[dupPos] dupSample.f = 0.123 + dupSample.st += 10 // Change ST to ensure we are only testing timestamp duplication. - ok := chunk.Insert(dupSample.t, dupSample.f, dupSample.h, dupSample.fh) + ok := chunk.Insert(dupSample.st, dupSample.t, dupSample.f, dupSample.h, dupSample.fh) expSamples := makeEvenSampleSlice(num, sampleFunc) // We expect no change. require.False(t, ok) @@ -241,6 +249,75 @@ func TestOOOChunks_ToEncodedChunks(t *testing.T) { {encoding: chunkenc.EncHistogram, minTime: 0, maxTime: 1}, }, }, + "floats with ST": { + samples: []sample{ + {st: 5, t: 1000, f: 43.0}, + {st: 1005, t: 1100, f: 42.0}, + }, + expectedCounterResets: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.UnknownCounterReset}, + expectedChunks: []chunkVerify{ + {encoding: chunkenc.EncXORST, minTime: 1000, maxTime: 1100}, + }, + }, + "histograms with ST": { + samples: []sample{ + {st: 5, t: 1000, h: h1}, + {st: 1005, t: 1100, h: h2}, + }, + expectedCounterResets: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.NotCounterReset}, + expectedChunks: []chunkVerify{ + // TODO(krajorama): Change when ST encoding for histograms is implemented. + {encoding: chunkenc.EncHistogram, minTime: 1000, maxTime: 1100}, + }, + }, + "float histograms with ST": { + samples: []sample{ + {st: 5, t: 1000, fh: tsdbutil.GenerateTestFloatHistogram(1)}, + {st: 1005, t: 1100, fh: tsdbutil.GenerateTestFloatHistogram(2)}, + }, + expectedCounterResets: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.NotCounterReset}, + expectedChunks: []chunkVerify{ + // TODO(krajorama): Change when ST encoding for float histograms is implemented. + {encoding: chunkenc.EncFloatHistogram, minTime: 1000, maxTime: 1100}, + }, + }, + "floats with mixed ST": { + samples: []sample{ + {t: 1000, f: 43.0}, + {st: 1005, t: 1100, f: 42.0}, + {t: 1200, f: 41.0}, + }, + expectedCounterResets: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.UnknownCounterReset, histogram.UnknownCounterReset}, + expectedChunks: []chunkVerify{ + {encoding: chunkenc.EncXOR, minTime: 1000, maxTime: 1000}, + // Once we switched to XORST encoding, we stay on it. + {encoding: chunkenc.EncXORST, minTime: 1100, maxTime: 1200}, + }, + }, + "histograms with mixed ST": { + samples: []sample{ + {t: 1000, h: tsdbutil.GenerateTestHistogram(1)}, + {st: 1005, t: 1100, h: tsdbutil.GenerateTestHistogram(2)}, + {t: 1200, h: tsdbutil.GenerateTestHistogram(3)}, + }, + expectedCounterResets: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.NotCounterReset, histogram.NotCounterReset}, + expectedChunks: []chunkVerify{ + // TODO(krajorama): Change when ST encoding for histograms is implemented. + {encoding: chunkenc.EncHistogram, minTime: 1000, maxTime: 1200}, + }, + }, + "float histograms with mixed ST": { + samples: []sample{ + {t: 1000, fh: tsdbutil.GenerateTestFloatHistogram(1)}, + {st: 1005, t: 1100, fh: tsdbutil.GenerateTestFloatHistogram(2)}, + {t: 1200, fh: tsdbutil.GenerateTestFloatHistogram(3)}, + }, + expectedCounterResets: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.NotCounterReset, histogram.NotCounterReset}, + expectedChunks: []chunkVerify{ + // TODO(krajorama): Change when ST encoding for float histograms is implemented. + {encoding: chunkenc.EncFloatHistogram, minTime: 1000, maxTime: 1200}, + }, + }, } for name, tc := range testCases { @@ -252,11 +329,11 @@ func TestOOOChunks_ToEncodedChunks(t *testing.T) { for _, s := range tc.samples { switch s.Type() { case chunkenc.ValFloat: - oooChunk.Insert(s.t, s.f, nil, nil) + oooChunk.Insert(s.st, s.t, s.f, nil, nil) case chunkenc.ValHistogram: - oooChunk.Insert(s.t, 0, s.h.Copy(), nil) + oooChunk.Insert(s.st, s.t, 0, s.h.Copy(), nil) case chunkenc.ValFloatHistogram: - oooChunk.Insert(s.t, 0, nil, s.fh.Copy()) + oooChunk.Insert(s.st, s.t, 0, nil, s.fh.Copy()) default: t.Fatalf("unexpected sample type %d", s.Type()) } @@ -284,6 +361,15 @@ func TestOOOChunks_ToEncodedChunks(t *testing.T) { // XOR chunks don't have counter reset hints, so we shouldn't expect anything else than UnknownCounterReset. require.Equal(t, histogram.UnknownCounterReset, tc.expectedCounterResets[sampleIndex+j], "sample reset hint %d", sampleIndex+j) require.Equal(t, tc.samples[sampleIndex+j].f, s.F(), "sample %d", sampleIndex+j) + require.Equal(t, int64(0), s.ST(), "sample ST %d", sampleIndex+j) + } + case chunkenc.EncXORST: + for j, s := range samples { + require.Equal(t, chunkenc.ValFloat, s.Type()) + // XOR chunks don't have counter reset hints, so we shouldn't expect anything else than UnknownCounterReset. + require.Equal(t, histogram.UnknownCounterReset, tc.expectedCounterResets[sampleIndex+j], "sample reset hint %d", sampleIndex+j) + require.Equal(t, tc.samples[sampleIndex+j].f, s.F(), "sample %d", sampleIndex+j) + require.Equal(t, tc.samples[sampleIndex+j].st, s.ST(), "sample ST %d", sampleIndex+j) } case chunkenc.EncHistogram: for j, s := range samples { diff --git a/tsdb/querier.go b/tsdb/querier.go index ce0292bf24..34411dd19d 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -867,7 +867,6 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool { // populateCurrForSingleChunk sets the fields within p.currMetaWithChunk. This // should be called if the samples in p.currDelIter only form one chunk. -// TODO(krajorama): test ST when chunks support it. func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool { valueType := p.currDelIter.Next() if valueType == chunkenc.ValNone { @@ -906,7 +905,11 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool { } } case chunkenc.ValFloat: - newChunk = chunkenc.NewXORChunk() + if p.currMeta.Chunk.Encoding() == chunkenc.EncXOR { + newChunk = chunkenc.NewXORChunk() + } else { + newChunk = chunkenc.NewXORSTChunk() + } if app, err = newChunk.Appender(); err != nil { break } @@ -918,7 +921,11 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool { var v float64 t, v = p.currDelIter.At() st = p.currDelIter.AtST() - app.Append(st, t, v) + newNewChunk, _ := app.Append(st, t, v) + if newNewChunk != nil { + err = errors.New("unexpected chunk split when re-encoding float chunk") + break + } } case chunkenc.ValFloatHistogram: newChunk = chunkenc.NewFloatHistogramChunk() @@ -959,7 +966,6 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool { // populateChunksFromIterable reads the samples from currDelIter to create // chunks for chunksFromIterable. It also sets p.currMetaWithChunk to the first // chunk. -// TODO(krajorama): test ST when chunks support it. func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool { p.chunksFromIterable = p.chunksFromIterable[:0] p.chunksFromIterableIdx = -1 @@ -983,15 +989,17 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool { app chunkenc.Appender - newChunk chunkenc.Chunk - recoded bool - err error ) prevValueType := chunkenc.ValNone for currentValueType := firstValueType; currentValueType != chunkenc.ValNone; currentValueType = p.currDelIter.Next() { + var ( + newChunk chunkenc.Chunk + recoded bool + ) + // Check if the encoding has changed (i.e. we need to create a new // chunk as chunks can't have multiple encoding types). // For the first sample, the following condition will always be true as @@ -1015,7 +1023,7 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool { var v float64 t, v = p.currDelIter.At() st = p.currDelIter.AtST() - app.Append(st, t, v) + newChunk, app = app.Append(st, t, v) } case chunkenc.ValHistogram: { diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 4387635959..176cbaba3e 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -160,7 +160,8 @@ func createIdxChkReaders(t *testing.T, tc []seriesSamples) (IndexReader, ChunkRe for _, smpl := range chk { require.Nil(t, smpl.h, "chunk can only contain one type of sample") require.Nil(t, smpl.fh, "chunk can only contain one type of sample") - app.Append(0, smpl.t, smpl.f) + // Not checking for new chunk as we supply ST==0 always. + _, _ = app.Append(0, smpl.t, smpl.f) } chkReader[chunkRef] = chunk } @@ -2100,7 +2101,8 @@ func TestDeletedIterator(t *testing.T) { for i := range 1000 { act[i].t = int64(i) act[i].f = rand.Float64() - app.Append(0, act[i].t, act[i].f) + // Not checking for new chunk as we supply ST==0 always. + _, _ = app.Append(0, act[i].t, act[i].f) } cases := []struct { @@ -2160,7 +2162,8 @@ func TestDeletedIterator_WithSeek(t *testing.T) { for i := range 1000 { act[i].t = int64(i) act[i].f = float64(i) - app.Append(0, act[i].t, act[i].f) + // Not checking for new chunk as we supply ST==0 always. + _, _ = app.Append(0, act[i].t, act[i].f) } cases := []struct {