From 8f6d5dcd4516ec9ef99b079a6f8b786b91534995 Mon Sep 17 00:00:00 2001 From: Abhijit Mukherjee Date: Thu, 16 Mar 2023 15:53:47 +0530 Subject: [PATCH] Fix: getting rid of EncOOOXOR chunk encoding (#12111) Signed-off-by: mabhi --- tsdb/chunkenc/chunk.go | 19 +++-------- tsdb/chunkenc/xor.go | 9 ----- tsdb/chunks/chunk_write_queue.go | 5 +-- tsdb/chunks/chunk_write_queue_test.go | 10 +++--- tsdb/chunks/head_chunks.go | 49 +++++++++++++++++++++------ tsdb/chunks/head_chunks_test.go | 25 ++++++++------ tsdb/head.go | 3 +- tsdb/head_append.go | 5 ++- tsdb/head_test.go | 4 +-- 9 files changed, 70 insertions(+), 59 deletions(-) diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index b7d240123b..c550cbc78e 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -47,20 +47,9 @@ func (e Encoding) String() string { return "" } -// Chunk encodings for out-of-order chunks. -// These encodings must be only used by the Head block for its internal bookkeeping. -const ( - OutOfOrderMask = 0b10000000 - EncOOOXOR = EncXOR | OutOfOrderMask -) - -func IsOutOfOrderChunk(e Encoding) bool { - return (e & OutOfOrderMask) != 0 -} - // IsValidEncoding returns true for supported encodings. func IsValidEncoding(e Encoding) bool { - return e == EncXOR || e == EncOOOXOR || e == EncHistogram || e == EncFloatHistogram + return e == EncXOR || e == EncHistogram || e == EncFloatHistogram } // Chunk holds a sequence of sample pairs that can be iterated over and appended to. @@ -262,7 +251,7 @@ func NewPool() Pool { func (p *pool) Get(e Encoding, b []byte) (Chunk, error) { switch e { - case EncXOR, EncOOOXOR: + case EncXOR: c := p.xor.Get().(*XORChunk) c.b.stream = b c.b.count = 0 @@ -283,7 +272,7 @@ func (p *pool) Get(e Encoding, b []byte) (Chunk, error) { func (p *pool) Put(c Chunk) error { switch c.Encoding() { - case EncXOR, EncOOOXOR: + case EncXOR: xc, ok := c.(*XORChunk) // This may happen often with wrapped chunks. Nothing we can really do about // it but returning an error would cause a lot of allocations again. Thus, @@ -327,7 +316,7 @@ func (p *pool) Put(c Chunk) error { // bytes. func FromData(e Encoding, d []byte) (Chunk, error) { switch e { - case EncXOR, EncOOOXOR: + case EncXOR: return &XORChunk{b: bstream{count: 0, stream: d}}, nil case EncHistogram: return &HistogramChunk{b: bstream{count: 0, stream: d}}, nil diff --git a/tsdb/chunkenc/xor.go b/tsdb/chunkenc/xor.go index 62e90cbaae..2fa2f613cb 100644 --- a/tsdb/chunkenc/xor.go +++ b/tsdb/chunkenc/xor.go @@ -506,12 +506,3 @@ func xorRead(br *bstreamReader, value *float64, leading, trailing *uint8) error *value = math.Float64frombits(vbits) return nil } - -// OOOXORChunk holds a XORChunk and overrides the Encoding() method. -type OOOXORChunk struct { - *XORChunk -} - -func (c *OOOXORChunk) Encoding() Encoding { - return EncOOOXOR -} diff --git a/tsdb/chunks/chunk_write_queue.go b/tsdb/chunks/chunk_write_queue.go index ab34eb06c7..6d2dc743b0 100644 --- a/tsdb/chunks/chunk_write_queue.go +++ b/tsdb/chunks/chunk_write_queue.go @@ -42,6 +42,7 @@ type chunkWriteJob struct { maxt int64 chk chunkenc.Chunk ref ChunkDiskMapperRef + isOOO bool callback func(error) } @@ -76,7 +77,7 @@ type chunkWriteQueue struct { } // writeChunkF is a function which writes chunks, it is dynamic to allow mocking in tests. -type writeChunkF func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool) error +type writeChunkF func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool, bool) error func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChunkF) *chunkWriteQueue { counters := prometheus.NewCounterVec( @@ -133,7 +134,7 @@ func (c *chunkWriteQueue) start() { } func (c *chunkWriteQueue) processJob(job chunkWriteJob) { - err := c.writeChunk(job.seriesRef, job.mint, job.maxt, job.chk, job.ref, job.cutFile) + err := c.writeChunk(job.seriesRef, job.mint, job.maxt, job.chk, job.ref, job.isOOO, job.cutFile) if job.callback != nil { job.callback(err) } diff --git a/tsdb/chunks/chunk_write_queue_test.go b/tsdb/chunks/chunk_write_queue_test.go index a55896a6d6..c908d47f5b 100644 --- a/tsdb/chunks/chunk_write_queue_test.go +++ b/tsdb/chunks/chunk_write_queue_test.go @@ -31,7 +31,7 @@ func TestChunkWriteQueue_GettingChunkFromQueue(t *testing.T) { blockWriterWg.Add(1) // blockingChunkWriter blocks until blockWriterWg is done. - blockingChunkWriter := func(_ HeadSeriesRef, _, _ int64, _ chunkenc.Chunk, _ ChunkDiskMapperRef, _ bool) error { + blockingChunkWriter := func(_ HeadSeriesRef, _, _ int64, _ chunkenc.Chunk, _ ChunkDiskMapperRef, _, _ bool) error { blockWriterWg.Wait() return nil } @@ -63,7 +63,7 @@ func TestChunkWriteQueue_WritingThroughQueue(t *testing.T) { gotCutFile bool ) - blockingChunkWriter := func(seriesRef HeadSeriesRef, mint, maxt int64, chunk chunkenc.Chunk, ref ChunkDiskMapperRef, cutFile bool) error { + blockingChunkWriter := func(seriesRef HeadSeriesRef, mint, maxt int64, chunk chunkenc.Chunk, ref ChunkDiskMapperRef, isOOO, cutFile bool) error { gotSeriesRef = seriesRef gotMint = mint gotMaxt = maxt @@ -101,7 +101,7 @@ func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) { unblockChunkWriterCh := make(chan struct{}, sizeLimit) // blockingChunkWriter blocks until the unblockChunkWriterCh channel returns a value. - blockingChunkWriter := func(seriesRef HeadSeriesRef, mint, maxt int64, chunk chunkenc.Chunk, ref ChunkDiskMapperRef, cutFile bool) error { + blockingChunkWriter := func(seriesRef HeadSeriesRef, mint, maxt int64, chunk chunkenc.Chunk, ref ChunkDiskMapperRef, isOOO, cutFile bool) error { <-unblockChunkWriterCh return nil } @@ -184,7 +184,7 @@ func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) { func TestChunkWriteQueue_HandlerErrorViaCallback(t *testing.T) { testError := errors.New("test error") - chunkWriter := func(_ HeadSeriesRef, _, _ int64, _ chunkenc.Chunk, _ ChunkDiskMapperRef, _ bool) error { + chunkWriter := func(_ HeadSeriesRef, _, _ int64, _ chunkenc.Chunk, _ ChunkDiskMapperRef, _, _ bool) error { return testError } @@ -212,7 +212,7 @@ func BenchmarkChunkWriteQueue_addJob(b *testing.B) { for _, concurrentWrites := range []int{1, 10, 100, 1000} { b.Run(fmt.Sprintf("%d concurrent writes", concurrentWrites), func(b *testing.B) { issueReadSignal := make(chan struct{}) - q := newChunkWriteQueue(nil, 1000, func(ref HeadSeriesRef, i, i2 int64, chunk chunkenc.Chunk, ref2 ChunkDiskMapperRef, b bool) error { + q := newChunkWriteQueue(nil, 1000, func(ref HeadSeriesRef, i, i2 int64, chunk chunkenc.Chunk, ref2 ChunkDiskMapperRef, ooo, b bool) error { if withReads { select { case issueReadSignal <- struct{}{}: diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index a0bd735b8b..a7ff90475e 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -273,6 +273,26 @@ func NewChunkDiskMapper(reg prometheus.Registerer, dir string, pool chunkenc.Poo return m, m.openMMapFiles() } +// Chunk encodings for out-of-order chunks. +// These encodings must be only used by the Head block for its internal bookkeeping. +const ( + OutOfOrderMask = uint8(0b10000000) +) + +func (cdm *ChunkDiskMapper) ApplyOutOfOrderMask(sourceEncoding chunkenc.Encoding) chunkenc.Encoding { + enc := uint8(sourceEncoding) | OutOfOrderMask + return chunkenc.Encoding(enc) +} + +func (cdm *ChunkDiskMapper) IsOutOfOrderChunk(e chunkenc.Encoding) bool { + return (uint8(e) & OutOfOrderMask) != 0 +} + +func (cdm *ChunkDiskMapper) RemoveMasks(sourceEncoding chunkenc.Encoding) chunkenc.Encoding { + restored := uint8(sourceEncoding) & (^OutOfOrderMask) + return chunkenc.Encoding(restored) +} + // openMMapFiles opens all files within dir for mmapping. func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) { cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{} @@ -403,17 +423,17 @@ func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr erro // WriteChunk writes the chunk to the disk. // The returned chunk ref is the reference from where the chunk encoding starts for the chunk. -func (cdm *ChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef ChunkDiskMapperRef) { +func (cdm *ChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, isOOO bool, callback func(err error)) (chkRef ChunkDiskMapperRef) { // cdm.evtlPosMtx must be held to serialize the calls to cdm.evtlPos.getNextChunkRef() and the writing of the chunk (either with or without queue). cdm.evtlPosMtx.Lock() defer cdm.evtlPosMtx.Unlock() ref, cutFile := cdm.evtlPos.getNextChunkRef(chk) if cdm.writeQueue != nil { - return cdm.writeChunkViaQueue(ref, cutFile, seriesRef, mint, maxt, chk, callback) + return cdm.writeChunkViaQueue(ref, isOOO, cutFile, seriesRef, mint, maxt, chk, callback) } - err := cdm.writeChunk(seriesRef, mint, maxt, chk, ref, cutFile) + err := cdm.writeChunk(seriesRef, mint, maxt, chk, ref, isOOO, cutFile) if callback != nil { callback(err) } @@ -421,7 +441,7 @@ func (cdm *ChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64 return ref } -func (cdm *ChunkDiskMapper) writeChunkViaQueue(ref ChunkDiskMapperRef, cutFile bool, seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef ChunkDiskMapperRef) { +func (cdm *ChunkDiskMapper) writeChunkViaQueue(ref ChunkDiskMapperRef, isOOO, cutFile bool, seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef ChunkDiskMapperRef) { var err error if callback != nil { defer func() { @@ -438,13 +458,14 @@ func (cdm *ChunkDiskMapper) writeChunkViaQueue(ref ChunkDiskMapperRef, cutFile b maxt: maxt, chk: chk, ref: ref, + isOOO: isOOO, callback: callback, }) return ref } -func (cdm *ChunkDiskMapper) writeChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, ref ChunkDiskMapperRef, cutFile bool) (err error) { +func (cdm *ChunkDiskMapper) writeChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, ref ChunkDiskMapperRef, isOOO, cutFile bool) (err error) { cdm.writePathMtx.Lock() defer cdm.writePathMtx.Unlock() @@ -476,7 +497,11 @@ func (cdm *ChunkDiskMapper) writeChunk(seriesRef HeadSeriesRef, mint, maxt int64 bytesWritten += MintMaxtSize binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(maxt)) bytesWritten += MintMaxtSize - cdm.byteBuf[bytesWritten] = byte(chk.Encoding()) + enc := chk.Encoding() + if isOOO { + enc = cdm.ApplyOutOfOrderMask(enc) + } + cdm.byteBuf[bytesWritten] = byte(enc) bytesWritten += ChunkEncodingSize n := binary.PutUvarint(cdm.byteBuf[bytesWritten:], uint64(len(chk.Bytes()))) bytesWritten += n @@ -696,7 +721,9 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error // Encoding. chkEnc := mmapFile.byteSlice.Range(chkStart, chkStart+ChunkEncodingSize)[0] - + sourceChkEnc := chunkenc.Encoding(chkEnc) + // Extract the encoding from the byte. ChunkDiskMapper uses only the last 7 bits for the encoding. + chkEnc = byte(cdm.RemoveMasks(sourceChkEnc)) // Data length. // With the minimum chunk length this should never cause us reading // over the end of the slice. @@ -762,7 +789,7 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error // and runs the provided function with information about each chunk. It returns on the first error encountered. // NOTE: This method needs to be called at least once after creating ChunkDiskMapper // to set the maxt of all the file. -func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error) (err error) { +func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding, isOOO bool) error) (err error) { cdm.writePathMtx.Lock() defer cdm.writePathMtx.Unlock() @@ -860,8 +887,10 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu if maxt > mmapFile.maxt { mmapFile.maxt = maxt } - - if err := f(seriesRef, chunkRef, mint, maxt, numSamples, chkEnc); err != nil { + isOOO := cdm.IsOutOfOrderChunk(chkEnc) + // Extract the encoding from the byte. ChunkDiskMapper uses only the last 7 bits for the encoding. + chkEnc = cdm.RemoveMasks(chkEnc) + if err := f(seriesRef, chunkRef, mint, maxt, numSamples, chkEnc, isOOO); err != nil { if cerr, ok := err.(*CorruptionErr); ok { cerr.Dir = cdm.dir.Name() cerr.FileIndex = segID diff --git a/tsdb/chunks/head_chunks_test.go b/tsdb/chunks/head_chunks_test.go index 0b5bc460d2..ac89ae3e59 100644 --- a/tsdb/chunks/head_chunks_test.go +++ b/tsdb/chunks/head_chunks_test.go @@ -98,7 +98,11 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { bytesWritten += MintMaxtSize binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(maxt)) bytesWritten += MintMaxtSize - buf[bytesWritten] = byte(chunk.Encoding()) + enc := chunk.Encoding() + if isOOO { + enc = hrw.ApplyOutOfOrderMask(enc) + } + buf[bytesWritten] = byte(enc) bytesWritten += ChunkEncodingSize n := binary.PutUvarint(buf[bytesWritten:], uint64(len(chunk.Bytes()))) bytesWritten += n @@ -149,7 +153,7 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { hrw = createChunkDiskMapper(t, dir) idx := 0 - require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error { + require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding, isOOO bool) error { t.Helper() expData := expectedData[idx] @@ -158,7 +162,7 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { require.Equal(t, expData.maxt, maxt) require.Equal(t, expData.maxt, maxt) require.Equal(t, expData.numSamples, numSamples) - require.Equal(t, expData.isOOO, chunkenc.IsOutOfOrderChunk(encoding)) + require.Equal(t, expData.isOOO, isOOO) actChunk, err := hrw.Chunk(expData.chunkRef) require.NoError(t, err) @@ -188,7 +192,7 @@ func TestChunkDiskMapper_Truncate(t *testing.T) { mint, maxt := timeRange+1, timeRange+step-1 var err error awaitCb := make(chan struct{}) - hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(cbErr error) { + hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), false, func(cbErr error) { err = cbErr close(awaitCb) }) @@ -282,7 +286,7 @@ func TestChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) { step := 100 mint, maxt := timeRange+1, timeRange+step-1 - hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) { + hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), false, func(err error) { close(awaitCb) require.NoError(t, err) }) @@ -363,7 +367,7 @@ func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) { // Write a chunks to iterate on it later. var err error awaitCb := make(chan struct{}) - hrw.WriteChunk(1, 0, 1000, randomChunk(t), func(cbErr error) { + hrw.WriteChunk(1, 0, 1000, randomChunk(t), false, func(cbErr error) { err = cbErr close(awaitCb) }) @@ -377,7 +381,7 @@ func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) { hrw = createChunkDiskMapper(t, dir) // Forcefully failing IterateAllChunks. - require.Error(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { + require.Error(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding, _ bool) error { return errors.New("random error") })) @@ -396,7 +400,7 @@ func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) { mint, maxt := timeRange+1, timeRange+step-1 var err error awaitCb := make(chan struct{}) - hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(cbErr error) { + hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), false, func(cbErr error) { err = cbErr close(awaitCb) }) @@ -489,7 +493,7 @@ func createChunkDiskMapper(t *testing.T, dir string) *ChunkDiskMapper { hrw, err := NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), DefaultWriteBufferSize, writeQueueSize) require.NoError(t, err) require.False(t, hrw.fileMaxtSet) - require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { + require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding, _ bool) error { return nil })) require.True(t, hrw.fileMaxtSet) @@ -517,9 +521,8 @@ func createChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef HeadSer awaitCb := make(chan struct{}) if rand.Intn(2) == 0 { isOOO = true - chunk = &chunkenc.OOOXORChunk{XORChunk: chunk.(*chunkenc.XORChunk)} } - chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(cbErr error) { + chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, isOOO, func(cbErr error) { require.NoError(t, err) close(awaitCb) }) diff --git a/tsdb/head.go b/tsdb/head.go index b28f5aca5e..b5239bdf84 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -784,10 +784,9 @@ func (h *Head) loadMmappedChunks(refSeries map[chunks.HeadSeriesRef]*memSeries) mmappedChunks := map[chunks.HeadSeriesRef][]*mmappedChunk{} oooMmappedChunks := map[chunks.HeadSeriesRef][]*mmappedChunk{} var lastRef, secondLastRef chunks.ChunkDiskMapperRef - if err := h.chunkDiskMapper.IterateAllChunks(func(seriesRef chunks.HeadSeriesRef, chunkRef chunks.ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error { + if err := h.chunkDiskMapper.IterateAllChunks(func(seriesRef chunks.HeadSeriesRef, chunkRef chunks.ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding, isOOO bool) error { secondLastRef = lastRef lastRef = chunkRef - isOOO := chunkenc.IsOutOfOrderChunk(encoding) if !isOOO && maxt < h.minValidTime.Load() { return nil } diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 8a622fafe5..e3beaae17b 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1453,8 +1453,7 @@ func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMap return 0 } xor, _ := s.ooo.oooHeadChunk.chunk.ToXOR() // Encode to XorChunk which is more compact and implements all of the needed functionality. - oooXor := &chunkenc.OOOXORChunk{XORChunk: xor} - chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.ooo.oooHeadChunk.minTime, s.ooo.oooHeadChunk.maxTime, oooXor, handleChunkWriteError) + chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.ooo.oooHeadChunk.minTime, s.ooo.oooHeadChunk.maxTime, xor, true, handleChunkWriteError) s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks, &mmappedChunk{ ref: chunkRef, numSamples: uint16(xor.NumSamples()), @@ -1471,7 +1470,7 @@ func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper return } - chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.headChunk.minTime, s.headChunk.maxTime, s.headChunk.chunk, handleChunkWriteError) + chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.headChunk.minTime, s.headChunk.maxTime, s.headChunk.chunk, false, handleChunkWriteError) s.mmappedChunks = append(s.mmappedChunks, &mmappedChunk{ ref: chunkRef, numSamples: uint16(s.headChunk.chunk.NumSamples()), diff --git a/tsdb/head_test.go b/tsdb/head_test.go index b5afed64b3..1a0558cce3 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -69,7 +69,7 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL, oooEnabled bool) ( h, err := NewHead(nil, nil, wal, nil, opts, nil) require.NoError(t, err) - require.NoError(t, h.chunkDiskMapper.IterateAllChunks(func(_ chunks.HeadSeriesRef, _ chunks.ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { + require.NoError(t, h.chunkDiskMapper.IterateAllChunks(func(_ chunks.HeadSeriesRef, _ chunks.ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding, _ bool) error { return nil })) @@ -4177,7 +4177,7 @@ func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) { uc := newUnsupportedChunk() // Make this chunk not overlap with the previous and the next - h.chunkDiskMapper.WriteChunk(chunks.HeadSeriesRef(seriesRef), 500, 600, uc, func(err error) { require.NoError(t, err) }) + h.chunkDiskMapper.WriteChunk(chunks.HeadSeriesRef(seriesRef), 500, 600, uc, false, func(err error) { require.NoError(t, err) }) app = h.Appender(ctx) for i := 700; i < 1200; i++ {