feat(tsdb): allow appending start timestamp

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
György Krajcsovits 2026-02-02 12:42:58 +01:00
parent cd76a2cec7
commit fb9c07de51
No known key found for this signature in database
GPG key ID: 47A8F9CE80FD7C7F
24 changed files with 546 additions and 148 deletions

View file

@ -3755,7 +3755,8 @@ func TestHistogramRateWithFloatStaleness(t *testing.T) {
app, err = c2.Appender()
require.NoError(t, err)
app.Append(0, 20, math.Float64frombits(value.StaleNaN))
newChunk, _ := app.Append(0, 20, math.Float64frombits(value.StaleNaN))
require.Nil(t, newChunk)
// Make a chunk with two normal histograms that have zero value.
h2 := histogram.Histogram{

View file

@ -1146,7 +1146,8 @@ func buildTestChunks(t *testing.T) []prompb.Chunk {
minTimeMs := time
for j := range numSamplesPerTestChunk {
a.Append(0, time, float64(i+j))
newChunk, _ := a.Append(0, time, float64(i+j))
require.Nil(t, newChunk)
time += int64(1000)
}

View file

@ -342,10 +342,11 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
seriesIter := s.Series.Iterator(nil)
lastType := chunkenc.ValNone
for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() {
st := seriesIter.AtST()
if typ != lastType || i >= seriesToChunkEncoderSplit {
// Create a new chunk if the sample type changed or too many samples in the current one.
chks = appendChunk(chks, mint, maxt, chk)
chk, err = chunkenc.NewEmptyChunk(typ.ChunkEncoding())
chk, err = chunkenc.NewEmptyChunk(typ.ChunkEncoding(st != 0))
if err != nil {
return errChunksIterator{err: err}
}
@ -360,19 +361,24 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
lastType = typ
var (
st, t int64
v float64
h *histogram.Histogram
fh *histogram.FloatHistogram
t int64
v float64
h *histogram.Histogram
fh *histogram.FloatHistogram
)
switch typ {
case chunkenc.ValFloat:
t, v = seriesIter.At()
st = seriesIter.AtST()
app.Append(st, t, v)
newChk, app = app.Append(st, t, v)
if newChk != nil {
chks = appendChunk(chks, mint, maxt, chk)
mint = int64(math.MaxInt64)
// maxt is immediately overwritten below which is why setting it here won't make a difference.
i = 0
chk = newChk
}
case chunkenc.ValHistogram:
t, h = seriesIter.AtHistogram(nil)
st = seriesIter.AtST()
newChk, recoded, app, err = app.AppendHistogram(nil, st, t, h, false)
if err != nil {
return errChunksIterator{err: err}
@ -388,7 +394,6 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
}
case chunkenc.ValFloatHistogram:
t, fh = seriesIter.AtFloatHistogram(nil)
st = seriesIter.AtST()
newChk, recoded, app, err = app.AppendFloatHistogram(nil, st, t, fh, false)
if err != nil {
return errChunksIterator{err: err}

View file

@ -276,6 +276,15 @@ For profiles:
*/
func BenchmarkAppender(b *testing.B) {
foreachFmtSampleCase(b, func(b *testing.B, f fmtCase, s sampleCase) {
samples := make([]triple, len(s.samples))
copy(samples, s.samples)
if f.stUnsupported {
// If the format does not support ST, zero them out for appending.
for i := range samples {
samples[i].st = 0
}
}
b.ReportAllocs()
for b.Loop() {
@ -285,8 +294,10 @@ func BenchmarkAppender(b *testing.B) {
if err != nil {
b.Fatalf("get appender: %s", err)
}
for _, p := range s.samples {
a.Append(p.st, p.t, p.v)
for _, p := range samples {
// We are ignoring potential new chunk return here, as we'll
// check the number of samples in the chunk after all appends.
_, _ = a.Append(p.st, p.t, p.v)
}
// NOTE: Some buffered implementations only encode on Bytes().
b.ReportMetric(float64(len(c.Bytes())), "B/chunk")
@ -317,6 +328,15 @@ For profiles:
*/
func BenchmarkIterator(b *testing.B) {
foreachFmtSampleCase(b, func(b *testing.B, f fmtCase, s sampleCase) {
samples := make([]triple, len(s.samples))
copy(samples, s.samples)
if f.stUnsupported {
// If the format does not support ST, zero them out for appending.
for i := range samples {
samples[i].st = 0
}
}
floatEquals := func(a, b float64) bool {
return a == b
}
@ -333,8 +353,11 @@ func BenchmarkIterator(b *testing.B) {
if err != nil {
b.Fatalf("get appender: %s", err)
}
for _, p := range s.samples {
a.Append(p.st, p.t, p.v)
for _, p := range samples {
newChunk, _ := a.Append(p.st, p.t, p.v)
if newChunk != nil {
b.Fatalf("unexpected new chunk allocation")
}
}
// Some chunk implementations might be buffered. Reset to ensure we don't reuse
@ -352,16 +375,8 @@ func BenchmarkIterator(b *testing.B) {
if err := it.Err(); err != nil && !errors.Is(err, io.EOF) {
require.NoError(b, err)
}
expectedSamples := s.samples
if f.stUnsupported {
// If the format does not support ST, zero them out for comparison.
expectedSamples = make([]triple, len(s.samples))
copy(expectedSamples, s.samples)
for i := range s.samples {
expectedSamples[i].st = 0
}
}
if diff := cmp.Diff(expectedSamples, got, cmp.AllowUnexported(triple{}), cmp.Comparer(floatEquals)); diff != "" {
if diff := cmp.Diff(samples, got, cmp.AllowUnexported(triple{}), cmp.Comparer(floatEquals)); diff != "" {
b.Fatalf("mismatch (-want +got):\n%s", diff)
}

View file

@ -49,6 +49,14 @@ func (e Encoding) String() string {
return "<unknown>"
}
// Compatible reports whether next encoding is compatible with the current
// encoding e. It's true if they are the same or the current encoding supports
// start timestamps and the next encoding is the same encoding without start
// timestamps.
func (e Encoding) Compatible(next Encoding) bool {
return e == next || (e == EncXORST && next == EncXOR)
}
// IsValidEncoding returns true for supported encodings.
func IsValidEncoding(e Encoding) bool {
return e == EncXOR || e == EncHistogram || e == EncFloatHistogram || e == EncXORST
@ -104,7 +112,10 @@ type Iterable interface {
// Appender adds sample with start timestamp, timestamp, and value to a chunk.
type Appender interface {
Append(st, t int64, v float64)
// Append appends a sample with start timestamp st, timestamp t, and value v to the chunk.
// Returns a new Chunk if the sample could not be appended to the current
// chunk because ST storage is required but not supported by the current chunk.
Append(st, t int64, v float64) (c Chunk, app Appender)
// AppendHistogram and AppendFloatHistogram append a histogram sample to a histogram or float histogram chunk.
// Appending a histogram may require creating a completely new chunk or recoding (changing) the current chunk.
@ -189,9 +200,12 @@ func (v ValueType) String() string {
}
}
func (v ValueType) ChunkEncoding() Encoding {
func (v ValueType) ChunkEncoding(storeST bool) Encoding {
switch v {
case ValFloat:
if storeST {
return EncXORST
}
return EncXOR
case ValHistogram:
return EncHistogram

View file

@ -70,11 +70,13 @@ func testChunk(t *testing.T, c Chunk, supportsST bool) {
require.NoError(t, err)
}
app.Append(ts-100, ts, v)
expST := int64(0)
if supportsST {
expST = ts - 100
}
newChunk, _ := app.Append(expST, ts, v)
require.Nil(t, newChunk, "unexpected new chunk allocation")
exp = append(exp, triple{st: expST, t: ts, v: v})
}

View file

@ -195,7 +195,7 @@ func (a *FloatHistogramAppender) NumSamples() int {
// Append implements Appender. This implementation panics because normal float
// samples must never be appended to a histogram chunk.
func (*FloatHistogramAppender) Append(int64, int64, float64) {
func (*FloatHistogramAppender) Append(int64, int64, float64) (Chunk, Appender) {
panic("appended a float sample to a histogram chunk")
}

View file

@ -219,7 +219,7 @@ func (a *HistogramAppender) NumSamples() int {
// Append implements Appender. This implementation panics because normal float
// samples must never be appended to a histogram chunk.
func (*HistogramAppender) Append(int64, int64, float64) {
func (*HistogramAppender) Append(int64, int64, float64) (Chunk, Appender) {
panic("appended a float sample to a histogram chunk")
}

View file

@ -30,7 +30,8 @@ func testChunkSTHandling(t *testing.T, vt ValueType, chunkFactory func() Chunk)
sampleAppend := func(app Appender, vt ValueType, st, ts int64, v float64) {
switch vt {
case ValFloat:
app.Append(st, ts, v)
newChunk, _ := app.Append(st, ts, v)
require.Nil(t, newChunk)
case ValHistogram:
_, recoded, _, err := app.AppendHistogram(nil, st, ts, &histogram.Histogram{Sum: v, Count: uint64(v * 10)}, false)
require.NoError(t, err)

View file

@ -158,7 +158,18 @@ type xorAppender struct {
trailing uint8
}
func (a *xorAppender) Append(_, t int64, v float64) {
func (a *xorAppender) Append(st, t int64, v float64) (Chunk, Appender) {
if st != 0 {
c := NewXORSTChunk()
app, err := c.Appender()
if err != nil {
// This should never happen, we just created the chunk.
panic("unexpected error creating XORST appender: " + err.Error())
}
app.Append(st, t, v)
return c, app
}
var tDelta uint64
num := binary.BigEndian.Uint16(a.b.bytes())
switch num {
@ -213,6 +224,7 @@ func (a *xorAppender) Append(_, t int64, v float64) {
a.v = v
binary.BigEndian.PutUint16(a.b.bytes(), num+1)
a.tDelta = tDelta
return nil, a
}
// bitRange returns whether the given integer can be represented by nbits.

View file

@ -14,6 +14,7 @@
package chunkenc
import (
"fmt"
"testing"
"github.com/stretchr/testify/require"
@ -24,7 +25,8 @@ func BenchmarkXorRead(b *testing.B) {
app, err := c.Appender()
require.NoError(b, err)
for i := int64(0); i < 120*1000; i += 1000 {
app.Append(0, i, float64(i)+float64(i)/10+float64(i)/100+float64(i)/1000)
newChunk, _ := app.Append(0, i, float64(i)+float64(i)/10+float64(i)/100+float64(i)/1000)
require.Nil(b, newChunk)
}
b.ReportAllocs()
@ -40,3 +42,52 @@ func BenchmarkXorRead(b *testing.B) {
_, _ = ts, v
}
}
func TestXORChunk_AppendST(t *testing.T) {
for stStartAt := range 5 {
t.Run(fmt.Sprintf("start ST at sample %d", stStartAt), func(t *testing.T) {
c := NewXORChunk()
chunks := []Chunk{c}
app, err := c.Appender()
require.NoError(t, err)
timestamp := func(i int) int64 { return int64((i + 1) * 5000) }
st := func(i int) int64 {
if i >= stStartAt {
return 1000
}
return 0
}
for i := range 4 {
newChunk, newApp := app.Append(st(i), timestamp(i), float64(i))
if i == stStartAt {
require.NotNil(t, newChunk, "expected new chunk allocation")
require.NotEqual(t, app, newApp, "expected new app allocation")
app = newApp
chunks = append(chunks, newChunk)
} else {
require.Nil(t, newChunk, "unexpected new chunk allocation")
require.Equal(t, app, newApp, "unexpected new app allocation")
}
}
if stStartAt < 4 {
require.Len(t, chunks, 2, "expected two chunks to be created")
} else {
require.Len(t, chunks, 1, "expected only one chunk to be created")
}
// Verify samples.
count := 0
for _, chk := range chunks {
var it Iterator
it = chk.Iterator(it)
for it.Next() != ValNone {
ts, v := it.At()
require.Equal(t, float64(count), v, "value mismatch at timestamp %d", ts)
require.Equal(t, timestamp(count), ts, "timestamp mismatch at count %d", count)
require.Equal(t, st(count), it.AtST(), "ST mismatch at timestamp %d", ts)
count++
}
}
})
}
}

View file

@ -228,7 +228,7 @@ func (it *xorSTIterator) Reset(b []byte) {
it.err = nil
}
func (a *xorSTAppender) Append(st, t int64, v float64) {
func (a *xorSTAppender) Append(st, t int64, v float64) (Chunk, Appender) {
var (
stDelta int64
tDelta uint64
@ -348,6 +348,7 @@ func (a *xorSTAppender) Append(st, t int64, v float64) {
a.numTotal++
binary.BigEndian.PutUint16(a.b.bytes(), a.numTotal)
return nil, a
}
func (it *xorSTIterator) retErr(err error) ValueType {

View file

@ -33,7 +33,8 @@ func TestXorOptSTChunk_MoreThan127Samples(t *testing.T) {
app, err := chunk.Appender()
require.NoError(t, err)
for i := range afterMax {
app.Append(0, int64(i*10+1), float64(i)*1.5)
newChunk, _ := app.Append(0, int64(i*10+1), float64(i)*1.5)
require.Nil(t, newChunk)
}
it := chunk.Iterator(nil)
@ -59,7 +60,8 @@ func TestXorOptSTChunk_MoreThan127Samples(t *testing.T) {
if i == afterMax-1 {
st = int64((afterMax - 1) * 10)
}
app.Append(st, int64(i*10+1), float64(i)*1.5)
newChunk, _ := app.Append(st, int64(i*10+1), float64(i)*1.5)
require.Nil(t, newChunk)
}
it := chunk.Iterator(nil)

View file

@ -135,12 +135,12 @@ type Meta struct {
}
// ChunkFromSamples requires all samples to have the same type.
// TODO(krajorama): test with ST when chunk formats support it.
func ChunkFromSamples(s []Sample) (Meta, error) {
return ChunkFromSamplesGeneric(SampleSlice(s))
}
// ChunkFromSamplesGeneric requires all samples to have the same type.
// Should only be used in tests.
func ChunkFromSamplesGeneric(s Samples) (Meta, error) {
emptyChunk := Meta{Chunk: chunkenc.NewXORChunk()}
mint, maxt := int64(0), int64(0)
@ -153,8 +153,17 @@ func ChunkFromSamplesGeneric(s Samples) (Meta, error) {
return emptyChunk, nil
}
// Check if any sample has a non-zero start time (ST).
storeST := false
for i := 0; i < s.Len(); i++ {
if s.Get(i).ST() != 0 {
storeST = true
break
}
}
sampleType := s.Get(0).Type()
c, err := chunkenc.NewEmptyChunk(sampleType.ChunkEncoding())
c, err := chunkenc.NewEmptyChunk(sampleType.ChunkEncoding(storeST))
if err != nil {
return Meta{}, err
}
@ -165,7 +174,10 @@ func ChunkFromSamplesGeneric(s Samples) (Meta, error) {
for i := 0; i < s.Len(); i++ {
switch sampleType {
case chunkenc.ValFloat:
ca.Append(s.Get(i).ST(), s.Get(i).T(), s.Get(i).F())
newChunk, ca = ca.Append(s.Get(i).ST(), s.Get(i).T(), s.Get(i).F())
if newChunk != nil {
return emptyChunk, errors.New("did not expect to start a second chunk")
}
case chunkenc.ValHistogram:
newChunk, _, ca, err = ca.AppendHistogram(nil, s.Get(i).ST(), s.Get(i).T(), s.Get(i).H(), false)
if err != nil {

View file

@ -19,6 +19,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
)
@ -58,3 +59,64 @@ func TestWriterWithDefaultSegmentSize(t *testing.T) {
require.NoError(t, err)
require.Len(t, d, 1, "expected only one segment to be created to hold both chunks")
}
func TestChunkFromSamplesGenericWithST(t *testing.T) {
testCases := []struct {
name string
samples []Sample
}{
{
name: "all samples have ST==0",
samples: []Sample{
sample{t: 10, f: 1.0},
sample{t: 20, f: 2.0},
sample{t: 30, f: 3.0},
sample{t: 40, f: 4.0},
},
},
{
name: "all samples have ST!=0",
samples: []Sample{
sample{st: 5, t: 10, f: 1.0},
sample{st: 15, t: 20, f: 2.0},
sample{st: 25, t: 30, f: 3.0},
sample{st: 35, t: 40, f: 4.0},
},
},
{
name: "half samples have ST and half not",
samples: []Sample{
sample{t: 10, f: 1.0},
sample{st: 15, t: 20, f: 2.0},
sample{t: 30, f: 3.0},
sample{st: 35, t: 40, f: 4.0},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
meta, err := ChunkFromSamplesGeneric(SampleSlice(tc.samples))
require.NoError(t, err)
require.NotNil(t, meta.Chunk, "expected a single chunk to be returned")
// Verify MinTime and MaxTime
require.Equal(t, tc.samples[0].T(), meta.MinTime)
require.Equal(t, tc.samples[len(tc.samples)-1].T(), meta.MaxTime)
// Iterate through the chunk and verify values
it := meta.Chunk.Iterator(nil)
idx := 0
for it.Next() != chunkenc.ValNone {
ts, val := it.At()
st := it.AtST()
require.Equal(t, tc.samples[idx].ST(), st, "ST mismatch at index %d", idx)
require.Equal(t, tc.samples[idx].T(), ts, "T mismatch at index %d", idx)
require.Equal(t, tc.samples[idx].F(), val, "F mismatch at index %d", idx)
idx++
}
require.NoError(t, it.Err())
require.Equal(t, len(tc.samples), idx, "number of samples mismatch")
})
}
}

View file

@ -559,7 +559,8 @@ func randomChunk(t *testing.T) chunkenc.Chunk {
app, err := chunk.Appender()
require.NoError(t, err)
for range length {
app.Append(0, rand.Int63(), rand.Float64())
// Not checking for new chunk as we supply ST==0 always.
_, _ = app.Append(0, rand.Int63(), rand.Float64())
}
return chunk
}

View file

@ -1377,7 +1377,8 @@ func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitConte
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRefs []chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, nil, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger)
// TODO(krajorama): pass non zero ST if a.head.opts.EnableSTStorage is true.
ok, chunkCreated, mmapRefs = series.insert(0, s.T, s.V, nil, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger)
if chunkCreated {
r, ok := acc.oooMmapMarkers[series.ref]
if !ok || r != nil {
@ -1421,7 +1422,8 @@ func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitConte
default:
newlyStale := !value.IsStaleNaN(series.lastValue) && value.IsStaleNaN(s.V)
staleToNonStale := value.IsStaleNaN(series.lastValue) && !value.IsStaleNaN(s.V)
ok, chunkCreated = series.append(s.T, s.V, a.appendID, acc.appendChunkOpts)
// TODO(krajorama): pass non zero ST if a.head.opts.EnableSTStorage is true.
ok, chunkCreated = series.append(0, s.T, s.V, a.appendID, acc.appendChunkOpts)
if ok {
if s.T < acc.inOrderMint {
acc.inOrderMint = s.T
@ -1482,7 +1484,8 @@ func (a *headAppenderBase) commitHistograms(b *appendBatch, acc *appenderCommitC
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRefs []chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRefs = series.insert(s.T, 0, s.H, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger)
// TODO(krajorama): pass non zero ST if a.head.opts.EnableSTStorage is true.
ok, chunkCreated, mmapRefs = series.insert(0, s.T, 0, s.H, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger)
if chunkCreated {
r, ok := acc.oooMmapMarkers[series.ref]
if !ok || r != nil {
@ -1530,7 +1533,8 @@ func (a *headAppenderBase) commitHistograms(b *appendBatch, acc *appenderCommitC
newlyStale = newlyStale && !value.IsStaleNaN(series.lastHistogramValue.Sum)
staleToNonStale = value.IsStaleNaN(series.lastHistogramValue.Sum) && !value.IsStaleNaN(s.H.Sum)
}
ok, chunkCreated = series.appendHistogram(s.T, s.H, a.appendID, acc.appendChunkOpts)
// TODO(krajorama): pass non zero ST if a.head.opts.EnableSTStorage is true.
ok, chunkCreated = series.appendHistogram(0, s.T, s.H, a.appendID, acc.appendChunkOpts)
if ok {
if s.T < acc.inOrderMint {
acc.inOrderMint = s.T
@ -1591,7 +1595,8 @@ func (a *headAppenderBase) commitFloatHistograms(b *appendBatch, acc *appenderCo
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRefs []chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRefs = series.insert(s.T, 0, nil, s.FH, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger)
// TODO(krajorama): pass non zero ST if a.head.opts.EnableSTStorage is true.
ok, chunkCreated, mmapRefs = series.insert(0, s.T, 0, nil, s.FH, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger)
if chunkCreated {
r, ok := acc.oooMmapMarkers[series.ref]
if !ok || r != nil {
@ -1639,7 +1644,8 @@ func (a *headAppenderBase) commitFloatHistograms(b *appendBatch, acc *appenderCo
newlyStale = newlyStale && !value.IsStaleNaN(series.lastFloatHistogramValue.Sum)
staleToNonStale = value.IsStaleNaN(series.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.FH.Sum)
}
ok, chunkCreated = series.appendFloatHistogram(s.T, s.FH, a.appendID, acc.appendChunkOpts)
// TODO(krajorama): pass non zero ST if a.head.opts.EnableSTStorage is true.
ok, chunkCreated = series.appendFloatHistogram(0, s.T, s.FH, a.appendID, acc.appendChunkOpts)
if ok {
if s.T < acc.inOrderMint {
acc.inOrderMint = s.T
@ -1786,7 +1792,7 @@ func (a *headAppenderBase) Commit() (err error) {
}
// insert is like append, except it inserts. Used for OOO samples.
func (s *memSeries) insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64, logger *slog.Logger) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) {
func (s *memSeries) insert(st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64, logger *slog.Logger) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) {
if s.ooo == nil {
s.ooo = &memSeriesOOOFields{}
}
@ -1797,7 +1803,7 @@ func (s *memSeries) insert(t int64, v float64, h *histogram.Histogram, fh *histo
chunkCreated = true
}
ok := c.chunk.Insert(t, v, h, fh)
ok := c.chunk.Insert(st, t, v, h, fh)
if ok {
if chunkCreated || t < c.minTime {
c.minTime = t
@ -1820,15 +1826,17 @@ type chunkOpts struct {
// the appendID for isolation. (The appendID can be zero, which results in no
// isolation for this append.)
// Series lock must be held when calling.
func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, o)
func (s *memSeries) append(st, t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
encoding := chunkenc.EncXOR
if st != 0 {
encoding = chunkenc.EncXORST
}
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, encoding, o)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
// TODO(krajorama): pass ST.
s.app.Append(0, t, v)
c.maxTime = t
var newChunk chunkenc.Chunk
newChunk, s.app = s.app.Append(st, t, v)
s.lastValue = v
s.lastHistogramValue = nil
@ -1838,6 +1846,19 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sa
s.txs.add(appendID)
}
if newChunk == nil { // Sample was appended to existing chunk or is the first sample in a new chunk.
c.maxTime = t
return true, chunkCreated
}
s.headChunks = &memChunk{
chunk: newChunk,
minTime: t,
maxTime: t,
prev: s.headChunks,
}
s.nextAt = rangeForTimestamp(t, o.chunkRange)
return true, chunkCreated
}
@ -1846,14 +1867,19 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sa
// In case of recoding the existing chunk, a new chunk is allocated and the old chunk is dropped.
// To keep the meaning of prometheus_tsdb_head_chunks and prometheus_tsdb_head_chunks_created_total
// consistent, we return chunkCreated=false in this case.
func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
func (s *memSeries) appendHistogram(st, t int64, h *histogram.Histogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
// Head controls the execution of recoding, so that we own the proper
// chunk reference afterwards and mmap used up chunks.
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevApp, _ := s.app.(*chunkenc.HistogramAppender)
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.EncHistogram, o)
encoding := chunkenc.EncHistogram
if st != 0 {
// TODO(krajorama): handle ST != 0 case.
encoding = chunkenc.EncHistogram // ST
}
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, encoding, o)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
@ -1868,8 +1894,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
prevApp = nil
}
// TODO(krajorama): pass ST.
newChunk, recoded, s.app, _ = s.app.AppendHistogram(prevApp, 0, t, h, false) // false=request a new chunk if needed
newChunk, recoded, s.app, _ = s.app.AppendHistogram(prevApp, st, t, h, false) // false=request a new chunk if needed
s.lastHistogramValue = h
s.lastFloatHistogramValue = nil
@ -1904,14 +1929,19 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
// In case of recoding the existing chunk, a new chunk is allocated and the old chunk is dropped.
// To keep the meaning of prometheus_tsdb_head_chunks and prometheus_tsdb_head_chunks_created_total
// consistent, we return chunkCreated=false in this case.
func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
func (s *memSeries) appendFloatHistogram(st, t int64, fh *histogram.FloatHistogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
// Head controls the execution of recoding, so that we own the proper
// chunk reference afterwards and mmap used up chunks.
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevApp, _ := s.app.(*chunkenc.FloatHistogramAppender)
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.EncFloatHistogram, o)
encoding := chunkenc.EncFloatHistogram
if st != 0 {
// TODO(krajorama): handle ST != 0 case.
encoding = chunkenc.EncFloatHistogram // ST
}
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, encoding, o)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
@ -1926,8 +1956,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram,
prevApp = nil
}
// TODO(krajorama): pass ST.
newChunk, recoded, s.app, _ = s.app.AppendFloatHistogram(prevApp, 0, t, fh, false) // False means request a new chunk if needed.
newChunk, recoded, s.app, _ = s.app.AppendFloatHistogram(prevApp, st, t, fh, false) // False means request a new chunk if needed.
s.lastHistogramValue = nil
s.lastFloatHistogramValue = fh
@ -1990,9 +2019,11 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts
chunkCreated = true
}
if c.chunk.Encoding() != e {
// The chunk encoding expected by this append is different than the head chunk's
// encoding. So we cut a new chunk with the expected encoding.
if !c.chunk.Encoding().Compatible(e) {
// The chunk encoding expected by this append is different than the head
// chunk's encoding. Or start timestamp is requested, but cannot be
// stored in the existing chunk encoding.
// So we cut a new chunk with the expected encoding.
c = s.cutNewHeadChunk(t, e, o.chunkRange)
chunkCreated = true
}
@ -2047,9 +2078,11 @@ func (s *memSeries) histogramsAppendPreprocessor(t int64, e chunkenc.Encoding, o
return c, false, chunkCreated
}
if c.chunk.Encoding() != e {
// The chunk encoding expected by this append is different than the head chunk's
// encoding. So we cut a new chunk with the expected encoding.
if !c.chunk.Encoding().Compatible(e) {
// The chunk encoding expected by this append is different than the head
// chunk's encoding. Or start timestamp is requested, but cannot be
// stored in the existing chunk encoding.
// So we cut a new chunk with the expected encoding.
c = s.cutNewHeadChunk(t, e, o.chunkRange)
chunkCreated = true
}
@ -2134,7 +2167,11 @@ func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange
panic(err) // This should never happen.
}
} else {
s.headChunks.chunk = chunkenc.NewXORChunk()
var err error
s.headChunks.chunk, err = chunkenc.NewEmptyChunk(chunkenc.EncXORST)
if err != nil {
panic(err) // This should never happen.
}
}
// Set upper bound on when the next chunk must be started. An earlier timestamp

View file

@ -33,7 +33,7 @@ func TestMemSeries_chunk(t *testing.T) {
appendSamples := func(t *testing.T, s *memSeries, start, end int64, cdm *chunks.ChunkDiskMapper) {
for i := start; i < end; i += chunkStep {
ok, _ := s.append(i, float64(i), 0, chunkOpts{
ok, _ := s.append(0, i, float64(i), 0, chunkOpts{
chunkDiskMapper: cdm,
chunkRange: chunkRange,
samplesPerChunk: DefaultSamplesPerChunk,

View file

@ -346,7 +346,7 @@ func BenchmarkLoadWLs(b *testing.B) {
for k := 0; k < c.batches*c.seriesPerBatch; k++ {
// Create one mmapped chunk per series, with one sample at the given time.
s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, 0, defaultIsolationDisabled, false)
s.append(c.mmappedChunkT, 42, 0, cOpts)
s.append(0, c.mmappedChunkT, 42, 0, cOpts)
// There's only one head chunk because only a single sample is appended. mmapChunks()
// ignores the latest chunk, so we need to cut a new head chunk to guarantee the chunk with
// the sample at c.mmappedChunkT is mmapped.
@ -1438,7 +1438,7 @@ func TestMemSeries_truncateChunks(t *testing.T) {
s := newMemSeries(labels.FromStrings("a", "b"), 1, 0, defaultIsolationDisabled, false)
for i := 0; i < 4000; i += 5 {
ok, _ := s.append(int64(i), float64(i), 0, cOpts)
ok, _ := s.append(0, int64(i), float64(i), 0, cOpts)
require.True(t, ok, "sample append failed")
}
s.mmapChunks(chunkDiskMapper)
@ -1588,7 +1588,7 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) {
if tc.mmappedChunks > 0 {
headStart = (tc.mmappedChunks + 1) * chunkRange
for i := 0; i < (tc.mmappedChunks+1)*chunkRange; i += chunkStep {
ok, _ := series.append(int64(i), float64(i), 0, cOpts)
ok, _ := series.append(0, int64(i), float64(i), 0, cOpts)
require.True(t, ok, "sample append failed")
}
series.mmapChunks(chunkDiskMapper)
@ -1598,7 +1598,7 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) {
series.headChunks = nil
} else {
for i := headStart; i < chunkRange*(tc.mmappedChunks+tc.headChunks); i += chunkStep {
ok, _ := series.append(int64(i), float64(i), 0, cOpts)
ok, _ := series.append(0, int64(i), float64(i), 0, cOpts)
require.True(t, ok, "sample append failed: %d", i)
}
}
@ -2146,20 +2146,20 @@ func TestMemSeries_append(t *testing.T) {
// Add first two samples at the very end of a chunk range and the next two
// on and after it.
// New chunk must correctly be cut at 1000.
ok, chunkCreated := s.append(998, 1, 0, cOpts)
ok, chunkCreated := s.append(0, 998, 1, 0, cOpts)
require.True(t, ok, "append failed")
require.True(t, chunkCreated, "first sample created chunk")
ok, chunkCreated = s.append(999, 2, 0, cOpts)
ok, chunkCreated = s.append(0, 999, 2, 0, cOpts)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "second sample should use same chunk")
s.mmapChunks(chunkDiskMapper)
ok, chunkCreated = s.append(1000, 3, 0, cOpts)
ok, chunkCreated = s.append(0, 1000, 3, 0, cOpts)
require.True(t, ok, "append failed")
require.True(t, chunkCreated, "expected new chunk on boundary")
ok, chunkCreated = s.append(1001, 4, 0, cOpts)
ok, chunkCreated = s.append(0, 1001, 4, 0, cOpts)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "second sample should use same chunk")
@ -2173,7 +2173,7 @@ func TestMemSeries_append(t *testing.T) {
// Fill the range [1000,2000) with many samples. Intermediate chunks should be cut
// at approximately 120 samples per chunk.
for i := 1; i < 1000; i++ {
ok, _ := s.append(1001+int64(i), float64(i), 0, cOpts)
ok, _ := s.append(0, 1001+int64(i), float64(i), 0, cOpts)
require.True(t, ok, "append failed")
}
s.mmapChunks(chunkDiskMapper)
@ -2188,6 +2188,87 @@ func TestMemSeries_append(t *testing.T) {
}
}
func TestMemSeries_appendST(t *testing.T) {
t.Run("float samples", func(t *testing.T) {
testMemSeriesAppendST(t, chunkenc.ValFloat, chunkenc.EncXOR, chunkenc.EncXORST)
})
t.Run("histogram samples", func(t *testing.T) {
testMemSeriesAppendST(t, chunkenc.ValHistogram, chunkenc.EncHistogram, chunkenc.EncHistogram)
})
t.Run("float histogram samples", func(t *testing.T) {
testMemSeriesAppendST(t, chunkenc.ValFloatHistogram, chunkenc.EncFloatHistogram, chunkenc.EncFloatHistogram)
})
}
func testMemSeriesAppendST(t *testing.T, valueType chunkenc.ValueType, noSTenc, stEnc chunkenc.Encoding) {
// Once we switch to ST encoding, we stay on it, until chunk is full.
expectedMixEncoding := []chunkenc.Encoding{noSTenc, stEnc}
if stEnc == noSTenc {
// TODO(krajorama): Remove this code when ST encoding for histograms is implemented.
expectedMixEncoding = []chunkenc.Encoding{noSTenc}
}
testCases := []struct {
name string
sts []int64
expectedEncoding []chunkenc.Encoding
}{
{
name: "no st",
sts: []int64{0, 0, 0, 0},
expectedEncoding: []chunkenc.Encoding{noSTenc},
},
{
name: "with st",
sts: []int64{1, 1, 1, 1},
expectedEncoding: []chunkenc.Encoding{stEnc},
},
{
name: "mixed st",
sts: []int64{0, 1, 0, 1, 0, 1},
expectedEncoding: expectedMixEncoding,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cOpts := chunkOpts{
chunkRange: int64(1000),
samplesPerChunk: DefaultSamplesPerChunk,
}
s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled, false)
for i, st := range tc.sts {
var ok bool
switch valueType {
case chunkenc.ValFloat:
ok, _ = s.append(st, int64(i), float64(i), 0, cOpts)
case chunkenc.ValHistogram:
hist := tsdbutil.GenerateTestHistograms(1)[0]
ok, _ = s.appendHistogram(st, int64(i), hist, 0, cOpts)
case chunkenc.ValFloatHistogram:
fhist := tsdbutil.GenerateTestFloatHistograms(1)[0]
ok, _ = s.appendFloatHistogram(st, int64(i), fhist, 0, cOpts)
default:
require.Fail(t, "unsupported value type")
}
require.Truef(t, ok, "append failed at index %d", i)
}
chunks := []*memChunk{}
chk := s.headChunks
for chk != nil {
chunks = append(chunks, chk)
chk = chk.prev
}
slices.Reverse(chunks)
require.Len(t, chunks, len(tc.expectedEncoding), "expected number of chunks")
for i, enc := range tc.expectedEncoding {
require.Equal(t, enc, chunks[i].chunk.Encoding(), "unexpected chunk encoding at index %d", i)
}
})
}
}
func TestMemSeries_appendHistogram(t *testing.T) {
dir := t.TempDir()
// This is usually taken from the Head, but passing manually here.
@ -2214,19 +2295,19 @@ func TestMemSeries_appendHistogram(t *testing.T) {
// Add first two samples at the very end of a chunk range and the next two
// on and after it.
// New chunk must correctly be cut at 1000.
ok, chunkCreated := s.appendHistogram(998, histograms[0], 0, cOpts)
ok, chunkCreated := s.appendHistogram(0, 998, histograms[0], 0, cOpts)
require.True(t, ok, "append failed")
require.True(t, chunkCreated, "first sample created chunk")
ok, chunkCreated = s.appendHistogram(999, histograms[1], 0, cOpts)
ok, chunkCreated = s.appendHistogram(0, 999, histograms[1], 0, cOpts)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "second sample should use same chunk")
ok, chunkCreated = s.appendHistogram(1000, histograms[2], 0, cOpts)
ok, chunkCreated = s.appendHistogram(0, 1000, histograms[2], 0, cOpts)
require.True(t, ok, "append failed")
require.True(t, chunkCreated, "expected new chunk on boundary")
ok, chunkCreated = s.appendHistogram(1001, histograms[3], 0, cOpts)
ok, chunkCreated = s.appendHistogram(0, 1001, histograms[3], 0, cOpts)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "second sample should use same chunk")
@ -2237,7 +2318,7 @@ func TestMemSeries_appendHistogram(t *testing.T) {
require.Equal(t, int64(1000), s.headChunks.minTime, "wrong chunk range")
require.Equal(t, int64(1001), s.headChunks.maxTime, "wrong chunk range")
ok, chunkCreated = s.appendHistogram(1002, histogramWithOneMoreBucket, 0, cOpts)
ok, chunkCreated = s.appendHistogram(0, 1002, histogramWithOneMoreBucket, 0, cOpts)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "third sample should trigger a re-encoded chunk")
@ -2272,7 +2353,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
var nextTs int64
var totalAppendedSamples int
for i := range samplesPerChunk / 4 {
ok, _ := s.append(nextTs, float64(i), 0, cOpts)
ok, _ := s.append(0, nextTs, float64(i), 0, cOpts)
require.Truef(t, ok, "slow sample %d was not appended", i)
nextTs += slowRate
totalAppendedSamples++
@ -2281,12 +2362,12 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
// Suddenly, the rate increases and we receive a sample every millisecond.
for i := range math.MaxUint16 {
ok, _ := s.append(nextTs, float64(i), 0, cOpts)
ok, _ := s.append(0, nextTs, float64(i), 0, cOpts)
require.Truef(t, ok, "quick sample %d was not appended", i)
nextTs++
totalAppendedSamples++
}
ok, chunkCreated := s.append(DefaultBlockDuration, float64(0), 0, cOpts)
ok, chunkCreated := s.append(0, DefaultBlockDuration, float64(0), 0, cOpts)
require.True(t, ok, "new chunk sample was not appended")
require.True(t, chunkCreated, "sample at block duration timestamp should create a new chunk")
@ -2315,18 +2396,18 @@ func TestGCChunkAccess(t *testing.T) {
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false)
// Appending 2 samples for the first chunk.
ok, chunkCreated := s.append(0, 0, 0, cOpts)
ok, chunkCreated := s.append(0, 0, 0, 0, cOpts)
require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(999, 999, 0, cOpts)
ok, chunkCreated = s.append(0, 999, 999, 0, cOpts)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunks was created")
// A new chunks should be created here as it's beyond the chunk range.
ok, chunkCreated = s.append(1000, 1000, 0, cOpts)
ok, chunkCreated = s.append(0, 1000, 1000, 0, cOpts)
require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(1999, 1999, 0, cOpts)
ok, chunkCreated = s.append(0, 1999, 1999, 0, cOpts)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunks was created")
@ -2371,18 +2452,18 @@ func TestGCSeriesAccess(t *testing.T) {
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false)
// Appending 2 samples for the first chunk.
ok, chunkCreated := s.append(0, 0, 0, cOpts)
ok, chunkCreated := s.append(0, 0, 0, 0, cOpts)
require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(999, 999, 0, cOpts)
ok, chunkCreated = s.append(0, 999, 999, 0, cOpts)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunks was created")
// A new chunks should be created here as it's beyond the chunk range.
ok, chunkCreated = s.append(1000, 1000, 0, cOpts)
ok, chunkCreated = s.append(0, 1000, 1000, 0, cOpts)
require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(1999, 1999, 0, cOpts)
ok, chunkCreated = s.append(0, 1999, 1999, 0, cOpts)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunks was created")
@ -2713,10 +2794,10 @@ func TestHeadReadWriterRepair(t *testing.T) {
require.True(t, created, "series was not created")
for i := range 7 {
ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, cOpts)
ok, chunkCreated := s.append(0, int64(i*chunkRange), float64(i*chunkRange), 0, cOpts)
require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunk was not created")
ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, cOpts)
ok, chunkCreated = s.append(0, int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, cOpts)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunk was created")
h.chunkDiskMapper.CutNewFile()
@ -3056,7 +3137,7 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) {
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false)
ok, _ := s.append(0, 0, 0, cOpts)
ok, _ := s.append(0, 0, 0, 0, cOpts)
require.True(t, ok, "Series append failed.")
require.Equal(t, 0, int(s.txs.txIDCount), "Series should not have an appendID after append with appendID=0.")
}
@ -3616,7 +3697,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) {
s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled, false)
for i := range 7 {
ok, _ := s.append(int64(i), float64(i), 0, cOpts)
ok, _ := s.append(0, int64(i), float64(i), 0, cOpts)
require.True(t, ok, "sample append failed")
}

View file

@ -667,7 +667,8 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
h.numStaleSeries.Dec()
}
if _, chunkCreated := ms.append(s.T, s.V, 0, appendChunkOpts); chunkCreated {
// TODO(krajorama): Pass ST when available in WAL records.
if _, chunkCreated := ms.append(0, s.T, s.V, 0, appendChunkOpts); chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
_ = ms.mmapChunks(h.chunkDiskMapper)
@ -704,14 +705,16 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
newlyStale = newlyStale && !value.IsStaleNaN(ms.lastHistogramValue.Sum)
staleToNonStale = value.IsStaleNaN(ms.lastHistogramValue.Sum) && !value.IsStaleNaN(s.h.Sum)
}
_, chunkCreated = ms.appendHistogram(s.t, s.h, 0, appendChunkOpts)
// TODO(krajorama): Pass ST when available in WAL records.
_, chunkCreated = ms.appendHistogram(0, s.t, s.h, 0, appendChunkOpts)
} else {
newlyStale = value.IsStaleNaN(s.fh.Sum)
if ms.lastFloatHistogramValue != nil {
newlyStale = newlyStale && !value.IsStaleNaN(ms.lastFloatHistogramValue.Sum)
staleToNonStale = value.IsStaleNaN(ms.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.fh.Sum)
}
_, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, appendChunkOpts)
// TODO(krajorama): Pass ST when available in WAL records.
_, chunkCreated = ms.appendFloatHistogram(0, s.t, s.fh, 0, appendChunkOpts)
}
if newlyStale {
h.numStaleSeries.Inc()
@ -1097,7 +1100,8 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (map[chunks.HeadSeriesR
missingSeries[s.Ref] = struct{}{}
continue
}
ok, chunkCreated, _ := ms.insert(s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger)
// TODO(krajorama): Pass ST when available in WBL records.
ok, chunkCreated, _ := ms.insert(0, s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger)
if chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
@ -1125,9 +1129,11 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (map[chunks.HeadSeriesR
var chunkCreated bool
var ok bool
if s.h != nil {
ok, chunkCreated, _ = ms.insert(s.t, 0, s.h, nil, h.chunkDiskMapper, oooCapMax, h.logger)
// TODO(krajorama): Pass ST when available in WBL records.
ok, chunkCreated, _ = ms.insert(0, s.t, 0, s.h, nil, h.chunkDiskMapper, oooCapMax, h.logger)
} else {
ok, chunkCreated, _ = ms.insert(s.t, 0, nil, s.fh, h.chunkDiskMapper, oooCapMax, h.logger)
// TODO(krajorama): Pass ST when available in WBL records.
ok, chunkCreated, _ = ms.insert(0, s.t, 0, nil, s.fh, h.chunkDiskMapper, oooCapMax, h.logger)
}
if chunkCreated {
h.metrics.chunksCreated.Inc()

View file

@ -34,14 +34,13 @@ func NewOOOChunk() *OOOChunk {
// Insert inserts the sample such that order is maintained.
// Returns false if insert was not possible due to the same timestamp already existing.
func (o *OOOChunk) Insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram) bool {
func (o *OOOChunk) Insert(st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram) bool {
// Although out-of-order samples can be out-of-order amongst themselves, we
// are opinionated and expect them to be usually in-order meaning we could
// try to append at the end first if the new timestamp is higher than the
// last known timestamp.
if len(o.samples) == 0 || t > o.samples[len(o.samples)-1].t {
// TODO(krajorama): pass ST.
o.samples = append(o.samples, sample{0, t, v, h, fh})
o.samples = append(o.samples, sample{st, t, v, h, fh})
return true
}
@ -50,8 +49,7 @@ func (o *OOOChunk) Insert(t int64, v float64, h *histogram.Histogram, fh *histog
if i >= len(o.samples) {
// none found. append it at the end
// TODO(krajorama): pass ST.
o.samples = append(o.samples, sample{0, t, v, h, fh})
o.samples = append(o.samples, sample{st, t, v, h, fh})
return true
}
@ -63,8 +61,7 @@ func (o *OOOChunk) Insert(t int64, v float64, h *histogram.Histogram, fh *histog
// Expand length by 1 to make room. use a zero sample, we will overwrite it anyway.
o.samples = append(o.samples, sample{})
copy(o.samples[i+1:], o.samples[i:])
// TODO(krajorama): pass ST.
o.samples[i] = sample{0, t, v, h, fh}
o.samples[i] = sample{st, t, v, h, fh}
return true
}
@ -97,29 +94,26 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
break
}
encoding := chunkenc.EncXOR
if s.h != nil {
encoding = chunkenc.EncHistogram
} else if s.fh != nil {
switch {
case s.fh != nil:
encoding = chunkenc.EncFloatHistogram
case s.h != nil:
encoding = chunkenc.EncHistogram
case s.st != 0:
encoding = chunkenc.EncXORST
}
// prevApp is the appender for the previous sample.
prevApp := app
if encoding != prevEncoding { // For the first sample, this will always be true as EncNone != EncXOR | EncHistogram | EncFloatHistogram
if !prevEncoding.Compatible(encoding) { // For the first sample, this will always be true as EncNone != anything.
if prevEncoding != chunkenc.EncNone {
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
}
cmint = s.t
switch encoding {
case chunkenc.EncXOR:
chunk = chunkenc.NewXORChunk()
case chunkenc.EncHistogram:
chunk = chunkenc.NewHistogramChunk()
case chunkenc.EncFloatHistogram:
chunk = chunkenc.NewFloatHistogramChunk()
default:
chunk = chunkenc.NewXORChunk()
chunk, err = chunkenc.NewEmptyChunk(encoding)
if err != nil {
return chks, err
}
app, err = chunk.Appender()
if err != nil {
@ -127,9 +121,14 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
}
}
switch encoding {
case chunkenc.EncXOR:
// TODO(krajorama): pass ST.
app.Append(0, s.t, s.f)
case chunkenc.EncXOR, chunkenc.EncXORST:
var newChunk chunkenc.Chunk
newChunk, app = app.Append(s.st, s.t, s.f)
if newChunk != nil { // A new chunk was allocated.
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
cmint = s.t
chunk = newChunk
}
case chunkenc.EncHistogram:
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevHApp, _ := prevApp.(*chunkenc.HistogramAppender)
@ -137,8 +136,7 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
newChunk chunkenc.Chunk
recoded bool
)
// TODO(krajorama): pass ST.
newChunk, recoded, app, _ = app.AppendHistogram(prevHApp, 0, s.t, s.h, false)
newChunk, recoded, app, _ = app.AppendHistogram(prevHApp, s.st, s.t, s.h, false)
if newChunk != nil { // A new chunk was allocated.
if !recoded {
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
@ -153,8 +151,7 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
newChunk chunkenc.Chunk
recoded bool
)
// TODO(krajorama): pass ST.
newChunk, recoded, app, _ = app.AppendFloatHistogram(prevHApp, 0, s.t, s.fh, false)
newChunk, recoded, app, _ = app.AppendFloatHistogram(prevHApp, s.st, s.t, s.fh, false)
if newChunk != nil { // A new chunk was allocated.
if !recoded {
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})

View file

@ -85,7 +85,7 @@ func testOOOInsert(t *testing.T,
chunk.samples = make([]sample, numPreExisting)
chunk.samples = makeEvenSampleSlice(numPreExisting, sampleFunc)
newSample := sampleFunc(valOdd(insertPos))
chunk.Insert(newSample.t, newSample.f, newSample.h, newSample.fh)
chunk.Insert(0, newSample.t, newSample.f, newSample.h, newSample.fh)
var expSamples []sample
// Our expected new samples slice, will be first the original samples.
@ -131,6 +131,13 @@ func TestOOOInsertDuplicate(t *testing.T) {
t.Run(name, func(t *testing.T) {
testOOOInsertDuplicate(t, scenario.sampleFunc)
})
t.Run(name+"_with_ST", func(t *testing.T) {
testOOOInsertDuplicate(t, func(ts int64) sample {
s := scenario.sampleFunc(ts)
s.st = ts + 1000 // Arbitrary ST to differ from t.
return s
})
})
}
}
@ -144,8 +151,9 @@ func testOOOInsertDuplicate(t *testing.T,
dupSample := chunk.samples[dupPos]
dupSample.f = 0.123
dupSample.st += 10 // Change ST to ensure we are only testing timestamp duplication.
ok := chunk.Insert(dupSample.t, dupSample.f, dupSample.h, dupSample.fh)
ok := chunk.Insert(dupSample.st, dupSample.t, dupSample.f, dupSample.h, dupSample.fh)
expSamples := makeEvenSampleSlice(num, sampleFunc) // We expect no change.
require.False(t, ok)
@ -241,6 +249,75 @@ func TestOOOChunks_ToEncodedChunks(t *testing.T) {
{encoding: chunkenc.EncHistogram, minTime: 0, maxTime: 1},
},
},
"floats with ST": {
samples: []sample{
{st: 5, t: 1000, f: 43.0},
{st: 1005, t: 1100, f: 42.0},
},
expectedCounterResets: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.UnknownCounterReset},
expectedChunks: []chunkVerify{
{encoding: chunkenc.EncXORST, minTime: 1000, maxTime: 1100},
},
},
"histograms with ST": {
samples: []sample{
{st: 5, t: 1000, h: h1},
{st: 1005, t: 1100, h: h2},
},
expectedCounterResets: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.NotCounterReset},
expectedChunks: []chunkVerify{
// TODO(krajorama): Change when ST encoding for histograms is implemented.
{encoding: chunkenc.EncHistogram, minTime: 1000, maxTime: 1100},
},
},
"float histograms with ST": {
samples: []sample{
{st: 5, t: 1000, fh: tsdbutil.GenerateTestFloatHistogram(1)},
{st: 1005, t: 1100, fh: tsdbutil.GenerateTestFloatHistogram(2)},
},
expectedCounterResets: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.NotCounterReset},
expectedChunks: []chunkVerify{
// TODO(krajorama): Change when ST encoding for float histograms is implemented.
{encoding: chunkenc.EncFloatHistogram, minTime: 1000, maxTime: 1100},
},
},
"floats with mixed ST": {
samples: []sample{
{t: 1000, f: 43.0},
{st: 1005, t: 1100, f: 42.0},
{t: 1200, f: 41.0},
},
expectedCounterResets: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.UnknownCounterReset, histogram.UnknownCounterReset},
expectedChunks: []chunkVerify{
{encoding: chunkenc.EncXOR, minTime: 1000, maxTime: 1000},
// Once we switched to XORST encoding, we stay on it.
{encoding: chunkenc.EncXORST, minTime: 1100, maxTime: 1200},
},
},
"histograms with mixed ST": {
samples: []sample{
{t: 1000, h: tsdbutil.GenerateTestHistogram(1)},
{st: 1005, t: 1100, h: tsdbutil.GenerateTestHistogram(2)},
{t: 1200, h: tsdbutil.GenerateTestHistogram(3)},
},
expectedCounterResets: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.NotCounterReset, histogram.NotCounterReset},
expectedChunks: []chunkVerify{
// TODO(krajorama): Change when ST encoding for histograms is implemented.
{encoding: chunkenc.EncHistogram, minTime: 1000, maxTime: 1200},
},
},
"float histograms with mixed ST": {
samples: []sample{
{t: 1000, fh: tsdbutil.GenerateTestFloatHistogram(1)},
{st: 1005, t: 1100, fh: tsdbutil.GenerateTestFloatHistogram(2)},
{t: 1200, fh: tsdbutil.GenerateTestFloatHistogram(3)},
},
expectedCounterResets: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.NotCounterReset, histogram.NotCounterReset},
expectedChunks: []chunkVerify{
// TODO(krajorama): Change when ST encoding for float histograms is implemented.
{encoding: chunkenc.EncFloatHistogram, minTime: 1000, maxTime: 1200},
},
},
}
for name, tc := range testCases {
@ -252,11 +329,11 @@ func TestOOOChunks_ToEncodedChunks(t *testing.T) {
for _, s := range tc.samples {
switch s.Type() {
case chunkenc.ValFloat:
oooChunk.Insert(s.t, s.f, nil, nil)
oooChunk.Insert(s.st, s.t, s.f, nil, nil)
case chunkenc.ValHistogram:
oooChunk.Insert(s.t, 0, s.h.Copy(), nil)
oooChunk.Insert(s.st, s.t, 0, s.h.Copy(), nil)
case chunkenc.ValFloatHistogram:
oooChunk.Insert(s.t, 0, nil, s.fh.Copy())
oooChunk.Insert(s.st, s.t, 0, nil, s.fh.Copy())
default:
t.Fatalf("unexpected sample type %d", s.Type())
}
@ -284,6 +361,15 @@ func TestOOOChunks_ToEncodedChunks(t *testing.T) {
// XOR chunks don't have counter reset hints, so we shouldn't expect anything else than UnknownCounterReset.
require.Equal(t, histogram.UnknownCounterReset, tc.expectedCounterResets[sampleIndex+j], "sample reset hint %d", sampleIndex+j)
require.Equal(t, tc.samples[sampleIndex+j].f, s.F(), "sample %d", sampleIndex+j)
require.Equal(t, int64(0), s.ST(), "sample ST %d", sampleIndex+j)
}
case chunkenc.EncXORST:
for j, s := range samples {
require.Equal(t, chunkenc.ValFloat, s.Type())
// XOR chunks don't have counter reset hints, so we shouldn't expect anything else than UnknownCounterReset.
require.Equal(t, histogram.UnknownCounterReset, tc.expectedCounterResets[sampleIndex+j], "sample reset hint %d", sampleIndex+j)
require.Equal(t, tc.samples[sampleIndex+j].f, s.F(), "sample %d", sampleIndex+j)
require.Equal(t, tc.samples[sampleIndex+j].st, s.ST(), "sample ST %d", sampleIndex+j)
}
case chunkenc.EncHistogram:
for j, s := range samples {

View file

@ -867,7 +867,6 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool {
// populateCurrForSingleChunk sets the fields within p.currMetaWithChunk. This
// should be called if the samples in p.currDelIter only form one chunk.
// TODO(krajorama): test ST when chunks support it.
func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool {
valueType := p.currDelIter.Next()
if valueType == chunkenc.ValNone {
@ -906,7 +905,11 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool {
}
}
case chunkenc.ValFloat:
newChunk = chunkenc.NewXORChunk()
if p.currMeta.Chunk.Encoding() == chunkenc.EncXOR {
newChunk = chunkenc.NewXORChunk()
} else {
newChunk = chunkenc.NewXORSTChunk()
}
if app, err = newChunk.Appender(); err != nil {
break
}
@ -918,7 +921,11 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool {
var v float64
t, v = p.currDelIter.At()
st = p.currDelIter.AtST()
app.Append(st, t, v)
newNewChunk, _ := app.Append(st, t, v)
if newNewChunk != nil {
err = errors.New("unexpected chunk split when re-encoding float chunk")
break
}
}
case chunkenc.ValFloatHistogram:
newChunk = chunkenc.NewFloatHistogramChunk()
@ -959,7 +966,6 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool {
// populateChunksFromIterable reads the samples from currDelIter to create
// chunks for chunksFromIterable. It also sets p.currMetaWithChunk to the first
// chunk.
// TODO(krajorama): test ST when chunks support it.
func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool {
p.chunksFromIterable = p.chunksFromIterable[:0]
p.chunksFromIterableIdx = -1
@ -983,15 +989,17 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool {
app chunkenc.Appender
newChunk chunkenc.Chunk
recoded bool
err error
)
prevValueType := chunkenc.ValNone
for currentValueType := firstValueType; currentValueType != chunkenc.ValNone; currentValueType = p.currDelIter.Next() {
var (
newChunk chunkenc.Chunk
recoded bool
)
// Check if the encoding has changed (i.e. we need to create a new
// chunk as chunks can't have multiple encoding types).
// For the first sample, the following condition will always be true as
@ -1015,7 +1023,7 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool {
var v float64
t, v = p.currDelIter.At()
st = p.currDelIter.AtST()
app.Append(st, t, v)
newChunk, app = app.Append(st, t, v)
}
case chunkenc.ValHistogram:
{

View file

@ -160,7 +160,8 @@ func createIdxChkReaders(t *testing.T, tc []seriesSamples) (IndexReader, ChunkRe
for _, smpl := range chk {
require.Nil(t, smpl.h, "chunk can only contain one type of sample")
require.Nil(t, smpl.fh, "chunk can only contain one type of sample")
app.Append(0, smpl.t, smpl.f)
// Not checking for new chunk as we supply ST==0 always.
_, _ = app.Append(0, smpl.t, smpl.f)
}
chkReader[chunkRef] = chunk
}
@ -2100,7 +2101,8 @@ func TestDeletedIterator(t *testing.T) {
for i := range 1000 {
act[i].t = int64(i)
act[i].f = rand.Float64()
app.Append(0, act[i].t, act[i].f)
// Not checking for new chunk as we supply ST==0 always.
_, _ = app.Append(0, act[i].t, act[i].f)
}
cases := []struct {
@ -2160,7 +2162,8 @@ func TestDeletedIterator_WithSeek(t *testing.T) {
for i := range 1000 {
act[i].t = int64(i)
act[i].f = float64(i)
app.Append(0, act[i].t, act[i].f)
// Not checking for new chunk as we supply ST==0 always.
_, _ = app.Append(0, act[i].t, act[i].f)
}
cases := []struct {