tests: Avoid deadlocks on DB closes with chunk mapper panics

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2026-02-04 10:47:05 +00:00
parent 7769495a4a
commit 5384814a74
11 changed files with 240 additions and 326 deletions

View file

@ -206,6 +206,8 @@ type AppenderTransaction interface {
// This is to support migration to AppenderV2.
// TODO(bwplotka): Remove once migration to AppenderV2 is fully complete.
type LimitedAppenderV1 interface {
AppenderTransaction
Append(ref SeriesRef, l labels.Labels, t int64, v float64) (SeriesRef, error)
AppendHistogram(ref SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error)
}

View file

@ -43,7 +43,6 @@ type chunkWriteJob struct {
chk chunkenc.Chunk
ref ChunkDiskMapperRef
isOOO bool
callback func(error)
}
// chunkWriteQueue is a queue for writing chunks to disk in a non-blocking fashion.
@ -77,7 +76,7 @@ type chunkWriteQueue struct {
}
// writeChunkF is a function which writes chunks, it is dynamic to allow mocking in tests.
type writeChunkF func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool, bool) error
type writeChunkF func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool, bool)
func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChunkF) *chunkWriteQueue {
counters := prometheus.NewCounterVec(
@ -131,10 +130,7 @@ func (c *chunkWriteQueue) start() {
}
func (c *chunkWriteQueue) processJob(job chunkWriteJob) {
err := c.writeChunk(job.seriesRef, job.mint, job.maxt, job.chk, job.ref, job.isOOO, job.cutFile)
if job.callback != nil {
job.callback(err)
}
c.writeChunk(job.seriesRef, job.mint, job.maxt, job.chk, job.ref, job.isOOO, job.cutFile)
c.chunkRefMapMtx.Lock()
defer c.chunkRefMapMtx.Unlock()

View file

@ -14,7 +14,6 @@
package chunks
import (
"errors"
"fmt"
"sync"
"testing"
@ -31,9 +30,8 @@ func TestChunkWriteQueue_GettingChunkFromQueue(t *testing.T) {
blockWriterWg.Add(1)
// blockingChunkWriter blocks until blockWriterWg is done.
blockingChunkWriter := func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool, bool) error {
blockingChunkWriter := func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool, bool) {
blockWriterWg.Wait()
return nil
}
q := newChunkWriteQueue(nil, 1000, blockingChunkWriter)
@ -63,14 +61,15 @@ func TestChunkWriteQueue_WritingThroughQueue(t *testing.T) {
gotCutFile bool
)
blockingChunkWriter := func(seriesRef HeadSeriesRef, mint, maxt int64, chunk chunkenc.Chunk, ref ChunkDiskMapperRef, _, cutFile bool) error {
awaitCb := make(chan struct{})
blockingChunkWriter := func(seriesRef HeadSeriesRef, mint, maxt int64, chunk chunkenc.Chunk, ref ChunkDiskMapperRef, _, cutFile bool) {
gotSeriesRef = seriesRef
gotMint = mint
gotMaxt = maxt
gotChunk = chunk
gotRef = ref
gotCutFile = cutFile
return nil
close(awaitCb)
}
q := newChunkWriteQueue(nil, 1000, blockingChunkWriter)
@ -81,10 +80,7 @@ func TestChunkWriteQueue_WritingThroughQueue(t *testing.T) {
chunk := chunkenc.NewXORChunk()
ref := newChunkDiskMapperRef(321, 123)
cutFile := true
awaitCb := make(chan struct{})
require.NoError(t, q.addJob(chunkWriteJob{seriesRef: seriesRef, mint: mint, maxt: maxt, chk: chunk, ref: ref, cutFile: cutFile, callback: func(error) {
close(awaitCb)
}}))
require.NoError(t, q.addJob(chunkWriteJob{seriesRef: seriesRef, mint: mint, maxt: maxt, chk: chunk, ref: ref, cutFile: cutFile}))
<-awaitCb
// Compare whether the write function has received all job attributes correctly.
@ -100,27 +96,24 @@ func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) {
sizeLimit := 100
unblockChunkWriterCh := make(chan struct{}, sizeLimit)
var callbackWg sync.WaitGroup
// blockingChunkWriter blocks until the unblockChunkWriterCh channel returns a value.
blockingChunkWriter := func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool, bool) error {
blockingChunkWriter := func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool, bool) {
<-unblockChunkWriterCh
return nil
callbackWg.Done()
}
q := newChunkWriteQueue(nil, sizeLimit, blockingChunkWriter)
defer q.stop()
// Unblock writers when shutting down.
defer close(unblockChunkWriterCh)
var chunkRef ChunkDiskMapperRef
var callbackWg sync.WaitGroup
addChunk := func() {
callbackWg.Add(1)
require.NoError(t, q.addJob(chunkWriteJob{
ref: chunkRef,
callback: func(error) {
callbackWg.Done()
},
}))
require.NoError(t, q.addJob(chunkWriteJob{ref: chunkRef}))
chunkRef++
}
@ -182,37 +175,13 @@ func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) {
require.Eventually(t, q.queueIsEmpty, 500*time.Millisecond, 50*time.Millisecond)
}
func TestChunkWriteQueue_HandlerErrorViaCallback(t *testing.T) {
testError := errors.New("test error")
chunkWriter := func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool, bool) error {
return testError
}
awaitCb := make(chan struct{})
var gotError error
callback := func(err error) {
gotError = err
close(awaitCb)
}
q := newChunkWriteQueue(nil, 1, chunkWriter)
defer q.stop()
job := chunkWriteJob{callback: callback}
require.NoError(t, q.addJob(job))
<-awaitCb
require.Equal(t, testError, gotError)
}
func BenchmarkChunkWriteQueue_addJob(b *testing.B) {
for _, withReads := range []bool{false, true} {
b.Run(fmt.Sprintf("with reads %t", withReads), func(b *testing.B) {
for _, concurrentWrites := range []int{1, 10, 100, 1000} {
b.Run(fmt.Sprintf("%d concurrent writes", concurrentWrites), func(b *testing.B) {
issueReadSignal := make(chan struct{})
q := newChunkWriteQueue(nil, 1000, func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool, bool) error {
q := newChunkWriteQueue(nil, 1000, func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool, bool) {
if withReads {
select {
case issueReadSignal <- struct{}{}:
@ -220,7 +189,6 @@ func BenchmarkChunkWriteQueue_addJob(b *testing.B) {
// Can't write to issueReadSignal, don't block but omit read instead.
}
}
return nil
})
b.Cleanup(func() {
// Stopped already, so no more writes will happen.

View file

@ -26,9 +26,11 @@ import (
"slices"
"strconv"
"sync"
"testing"
"github.com/dennwc/varint"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"github.com/prometheus/prometheus/tsdb/chunkenc"
@ -209,6 +211,9 @@ type ChunkDiskMapper struct {
chkWriter *bufio.Writer // Writer for the current open file.
crc32 hash.Hash
writePathMtx sync.Mutex
// asyncWrites is useful for tests to synchronize queue-based async writes.
// Locked with writePathMtx.
asyncWrites int64
// Reader.
// The int key in the map is the file number on the disk.
@ -228,7 +233,10 @@ type ChunkDiskMapper struct {
writeQueue *chunkWriteQueue
closed bool
// TODO(bwplotka): async err handling for queued writes. Consider better error handling.
// Current prod async handleAsyncErr panics on errors other than closed.
testAsyncHandleWriteChunkErr func(error)
closed bool
}
// mmappedChunkFile provides mmap access to an entire head chunks file that holds many chunks.
@ -267,7 +275,19 @@ func NewChunkDiskMapper(reg prometheus.Registerer, dir string, pool chunkenc.Poo
}
if writeQueueSize > 0 {
m.writeQueue = newChunkWriteQueue(reg, writeQueueSize, m.writeChunk)
m.writeQueue = newChunkWriteQueue(reg, writeQueueSize, func(
seriesRef HeadSeriesRef,
mint int64,
maxt int64,
chk chunkenc.Chunk,
ref ChunkDiskMapperRef,
isOOO bool,
cutFile bool,
) {
if err := m.writeChunk(seriesRef, mint, maxt, chk, ref, isOOO, cutFile); err != nil {
m.asyncHandleWriteChunkErr(err)
}
})
}
if m.pool == nil {
@ -277,6 +297,36 @@ func NewChunkDiskMapper(reg prometheus.Registerer, dir string, pool chunkenc.Poo
return m, m.openMMapFiles()
}
func (cdm *ChunkDiskMapper) SetTestAsyncHandleWriteChunkErr(fn func(error)) {
cdm.testAsyncHandleWriteChunkErr = fn
}
func (cdm *ChunkDiskMapper) getAsyncWrites() int64 {
cdm.writePathMtx.Lock()
defer cdm.writePathMtx.Unlock()
return cdm.asyncWrites
}
// asyncHandleWriteChunkErr allows async custom error handling for chunk write errors. It's async because
// some work is queued and done asynchronously.
//
// TODO(bwplotka): Don't be lazy. Propagate errors properly and return in some way (e.g. error channel),
// don't panic. We see those panics in tests on slow CIs and especially in tests, the panic then deadlocks DB closes
// (various locks are unlocked).
func (cdm *ChunkDiskMapper) asyncHandleWriteChunkErr(err error) {
if cdm.testAsyncHandleWriteChunkErr != nil {
cdm.testAsyncHandleWriteChunkErr(err)
return
}
if testing.Testing() {
panic("ensure testAsyncHandleWriteChunkErr is always mocked for clean test failures")
}
if errors.Is(err, ErrChunkDiskMapperClosed) {
return
}
panic(err)
}
// Chunk encodings for out-of-order chunks.
// These encodings must be only used by the Head block for its internal bookkeeping.
const (
@ -454,33 +504,29 @@ func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr erro
// WriteChunk writes the chunk to disk.
// The returned chunk ref is the reference from where the chunk encoding starts for the chunk.
func (cdm *ChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, isOOO bool, callback func(err error)) (chkRef ChunkDiskMapperRef) {
func (cdm *ChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, isOOO bool) (chkRef ChunkDiskMapperRef) {
// cdm.evtlPosMtx must be held to serialize the calls to cdm.evtlPos.getNextChunkRef() and the writing of the chunk (either with or without queue).
cdm.evtlPosMtx.Lock()
defer cdm.evtlPosMtx.Unlock()
ref, cutFile := cdm.evtlPos.getNextChunkRef(chk)
if cdm.writeQueue != nil {
return cdm.writeChunkViaQueue(ref, isOOO, cutFile, seriesRef, mint, maxt, chk, callback)
return cdm.writeChunkViaQueue(ref, isOOO, cutFile, seriesRef, mint, maxt, chk)
}
err := cdm.writeChunk(seriesRef, mint, maxt, chk, ref, isOOO, cutFile)
if callback != nil {
callback(err)
if err := cdm.writeChunk(seriesRef, mint, maxt, chk, ref, isOOO, cutFile); err != nil {
cdm.asyncHandleWriteChunkErr(err)
}
return ref
}
func (cdm *ChunkDiskMapper) writeChunkViaQueue(ref ChunkDiskMapperRef, isOOO, cutFile bool, seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef ChunkDiskMapperRef) {
func (cdm *ChunkDiskMapper) writeChunkViaQueue(ref ChunkDiskMapperRef, isOOO, cutFile bool, seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk) (chkRef ChunkDiskMapperRef) {
var err error
if callback != nil {
defer func() {
if err != nil {
callback(err)
}
}()
}
defer func() {
if err != nil {
cdm.asyncHandleWriteChunkErr(err)
}
}()
err = cdm.writeQueue.addJob(chunkWriteJob{
cutFile: cutFile,
@ -490,7 +536,6 @@ func (cdm *ChunkDiskMapper) writeChunkViaQueue(ref ChunkDiskMapperRef, isOOO, cu
chk: chk,
ref: ref,
isOOO: isOOO,
callback: callback,
})
return ref
@ -560,7 +605,7 @@ func (cdm *ChunkDiskMapper) writeChunk(seriesRef HeadSeriesRef, mint, maxt int64
return err
}
}
cdm.asyncWrites++
return nil
}
@ -1156,3 +1201,28 @@ func (cb *chunkBuffer) clear() {
cb.inBufferChunksMtxs[i].Unlock()
}
}
// NewTestChunkDiskMapper returns new chunk mapper for testing purposes.
// This is mostly needed for rigorous async error handling until we get rid of panics.
func NewTestChunkDiskMapper(t testing.TB, dir string, writeQueueSize int) *ChunkDiskMapper {
hrw, err := NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), DefaultWriteBufferSize, writeQueueSize)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, hrw.Close())
})
// TODO(bwplotka): At the moment normal async handling panics. This can get the tests stuck (recovery + further close
// with unlocked mutexes). Replace panics with something more robust, but for now handle those (expect no error)
// in tests.
hrw.testAsyncHandleWriteChunkErr = func(err error) {
t.Helper()
t.Error(err)
}
require.False(t, hrw.fileMaxtSet)
require.NoError(t, hrw.IterateAllChunks(func(HeadSeriesRef, ChunkDiskMapperRef, int64, int64, uint16, chunkenc.Encoding, bool) error {
return nil
}))
require.True(t, hrw.fileMaxtSet)
return hrw
}

View file

@ -19,18 +19,20 @@ import (
"math/rand"
"os"
"strconv"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/testutil/synctest"
)
var writeQueueSize int
func TestMain(m *testing.M) {
// TODO(bwplotka): This is too hidden; consider moving to explicit subtests.
// Run all tests with the chunk write queue disabled.
writeQueueSize = 0
exitVal := m.Run()
@ -44,11 +46,18 @@ func TestMain(m *testing.M) {
os.Exit(exitVal)
}
func syncWrite(t testing.TB, hrw *ChunkDiskMapper, writeFn func()) {
t.Helper()
before := hrw.getAsyncWrites()
writeFn()
require.Eventually(t, func() bool {
return hrw.getAsyncWrites() > before
}, 1*time.Minute, 10*time.Millisecond)
}
func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) {
hrw := createChunkDiskMapper(t, "")
defer func() {
require.NoError(t, hrw.Close())
}()
hrw := NewTestChunkDiskMapper(t, t.TempDir(), writeQueueSize)
expectedBytes := []byte{}
nextChunkOffset := uint64(HeadChunkFileHeaderSize)
@ -152,7 +161,7 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) {
// Testing IterateAllChunks method.
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
hrw = createChunkDiskMapper(t, dir)
hrw = NewTestChunkDiskMapper(t, dir, writeQueueSize)
idx := 0
require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, _, maxt int64, numSamples uint16, _ chunkenc.Encoding, isOOO bool) error {
@ -181,10 +190,7 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) {
// * The active file is not deleted even if the passed time makes it eligible to be deleted.
// * Non-empty current file leads to creation of another file after truncation.
func TestChunkDiskMapper_Truncate(t *testing.T) {
hrw := createChunkDiskMapper(t, "")
defer func() {
require.NoError(t, hrw.Close())
}()
hrw := NewTestChunkDiskMapper(t, t.TempDir(), writeQueueSize)
timeRange := 0
addChunk := func() {
@ -192,14 +198,9 @@ func TestChunkDiskMapper_Truncate(t *testing.T) {
step := 100
mint, maxt := timeRange+1, timeRange+step-1
var err error
awaitCb := make(chan struct{})
hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), false, func(cbErr error) {
err = cbErr
close(awaitCb)
syncWrite(t, hrw, func() {
_ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), false)
})
<-awaitCb
require.NoError(t, err)
timeRange += step
}
@ -237,7 +238,7 @@ func TestChunkDiskMapper_Truncate(t *testing.T) {
require.NoError(t, hrw.Close())
// Restarted.
hrw = createChunkDiskMapper(t, dir)
hrw = NewTestChunkDiskMapper(t, dir, writeQueueSize)
verifyFiles([]int{3, 4, 5, 6, 7, 8})
// New file is created after restart even if last file was empty.
@ -275,24 +276,18 @@ func TestChunkDiskMapper_Truncate(t *testing.T) {
// This test exposes https://github.com/prometheus/prometheus/issues/7412 where the truncation
// simply deleted all empty files instead of stopping once it encountered a non-empty file.
func TestChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) {
hrw := createChunkDiskMapper(t, "")
defer func() {
require.NoError(t, hrw.Close())
}()
hrw := NewTestChunkDiskMapper(t, t.TempDir(), writeQueueSize)
timeRange := 0
addChunk := func() {
t.Helper()
awaitCb := make(chan struct{})
step := 100
mint, maxt := timeRange+1, timeRange+step-1
hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), false, func(err error) {
close(awaitCb)
require.NoError(t, err)
syncWrite(t, hrw, func() {
_ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), false)
})
<-awaitCb
timeRange += step
}
@ -354,83 +349,76 @@ func TestChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) {
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
// Restarting checks for unsequential files.
hrw = createChunkDiskMapper(t, dir)
hrw = NewTestChunkDiskMapper(t, dir, writeQueueSize)
verifyFiles([]int{5, 6, 7})
}
func TestChunkDiskMapper_Truncate_WriteQueueRaceCondition(t *testing.T) {
hrw := createChunkDiskMapper(t, "")
t.Cleanup(func() {
require.NoError(t, hrw.Close())
})
hrw := NewTestChunkDiskMapper(t, t.TempDir(), writeQueueSize)
// This test should only run when the queue is enabled.
if hrw.writeQueue == nil {
t.Skip("This test should only run when the queue is enabled")
}
// Add an artificial delay in the writeChunk function to easily trigger the race condition.
origWriteChunk := hrw.writeQueue.writeChunk
hrw.writeQueue.writeChunk = func(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, ref ChunkDiskMapperRef, isOOO, cutFile bool) error {
time.Sleep(100 * time.Millisecond)
return origWriteChunk(seriesRef, mint, maxt, chk, ref, isOOO, cutFile)
}
synctest.Test(t, func(t *testing.T) {
// Add an artificial delay in the writeChunk function to trigger the race condition.
origWriteChunk := hrw.writeQueue.writeChunk
hrw.writeQueue.writeChunk = func(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, ref ChunkDiskMapperRef, isOOO, cutFile bool) {
time.Sleep(100 * time.Millisecond)
origWriteChunk(seriesRef, mint, maxt, chk, ref, isOOO, cutFile)
}
wg := sync.WaitGroup{}
wg.Add(2)
// Write a chunk. Since the queue is enabled, the chunk will be written asynchronously with our writeQueue
// that adds the artificial delay.
ref := hrw.WriteChunk(1, 0, 10, randomChunk(t), false)
// Write a chunk. Since the queue is enabled, the chunk will be written asynchronously (with the artificial delay).
ref := hrw.WriteChunk(1, 0, 10, randomChunk(t), false, func(err error) {
defer wg.Done()
require.NoError(t, err)
seq, _ := ref.Unpack()
require.Equal(t, 1, seq)
// Truncate, simulating that all chunks from segment files before 1 can be dropped.
require.NoError(t, hrw.Truncate(1))
// Unblock first write.
synctest.Wait()
// Request to cut a new file when writing the next chunk. If there's a race condition, cutting a new file will
// allow us to detect there's actually an issue with the sequence number (because it's checked when a new segment
// file is created).
hrw.CutNewFile()
// Write another chunk. This will cut a new file.
ref = hrw.WriteChunk(1, 0, 10, randomChunk(t), false)
seq, _ = ref.Unpack()
require.Equal(t, 2, seq)
// Unblock second write.
synctest.Wait()
})
seq, _ := ref.Unpack()
require.Equal(t, 1, seq)
// Truncate, simulating that all chunks from segment files before 1 can be dropped.
require.NoError(t, hrw.Truncate(1))
// Request to cut a new file when writing the next chunk. If there's a race condition, cutting a new file will
// allow us to detect there's actually an issue with the sequence number (because it's checked when a new segment
// file is created).
hrw.CutNewFile()
// Write another chunk. This will cut a new file.
ref = hrw.WriteChunk(1, 0, 10, randomChunk(t), false, func(err error) {
defer wg.Done()
require.NoError(t, err)
})
seq, _ = ref.Unpack()
require.Equal(t, 2, seq)
wg.Wait()
// Ensure writes eventually happened.
require.Eventually(t, func() bool {
return hrw.getAsyncWrites() == 2
}, 1*time.Minute, 10*time.Millisecond)
}
// TestHeadReadWriter_TruncateAfterFailedIterateChunks tests for
// https://github.com/prometheus/prometheus/issues/7753
func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) {
hrw := createChunkDiskMapper(t, "")
defer func() {
require.NoError(t, hrw.Close())
}()
hrw := NewTestChunkDiskMapper(t, t.TempDir(), writeQueueSize)
// Write a chunks to iterate on it later.
var err error
awaitCb := make(chan struct{})
hrw.WriteChunk(1, 0, 1000, randomChunk(t), false, func(cbErr error) {
err = cbErr
close(awaitCb)
syncWrite(t, hrw, func() {
hrw.WriteChunk(1, 0, 1000, randomChunk(t), false)
})
<-awaitCb
require.NoError(t, err)
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
// Restarting to recreate https://github.com/prometheus/prometheus/issues/7753.
hrw = createChunkDiskMapper(t, dir)
hrw = NewTestChunkDiskMapper(t, dir, writeQueueSize)
// Forcefully failing IterateAllChunks.
require.Error(t, hrw.IterateAllChunks(func(HeadSeriesRef, ChunkDiskMapperRef, int64, int64, uint16, chunkenc.Encoding, bool) error {
@ -442,7 +430,7 @@ func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) {
}
func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) {
hrw := createChunkDiskMapper(t, "")
hrw := NewTestChunkDiskMapper(t, t.TempDir(), writeQueueSize)
timeRange := 0
addChunk := func() {
@ -450,14 +438,11 @@ func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) {
step := 100
mint, maxt := timeRange+1, timeRange+step-1
var err error
awaitCb := make(chan struct{})
hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), false, func(cbErr error) {
err = cbErr
close(awaitCb)
// Write a chunks to iterate on it later.
syncWrite(t, hrw, func() {
hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), false)
})
<-awaitCb
require.NoError(t, err)
timeRange += step
}
nonEmptyFile := func() {
@ -497,7 +482,7 @@ func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) {
checkRepair := func() {
// Open chunk disk mapper again, corrupt file should be removed.
hrw = createChunkDiskMapper(t, dir)
hrw = NewTestChunkDiskMapper(t, dir, writeQueueSize)
// Removed from memory.
require.Len(t, hrw.mmappedChunkFiles, 3)
@ -537,22 +522,6 @@ func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) {
checkRepair()
}
func createChunkDiskMapper(t *testing.T, dir string) *ChunkDiskMapper {
if dir == "" {
dir = t.TempDir()
}
hrw, err := NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), DefaultWriteBufferSize, writeQueueSize)
require.NoError(t, err)
require.False(t, hrw.fileMaxtSet)
require.NoError(t, hrw.IterateAllChunks(func(HeadSeriesRef, ChunkDiskMapperRef, int64, int64, uint16, chunkenc.Encoding, bool) error {
return nil
}))
require.True(t, hrw.fileMaxtSet)
return hrw
}
func randomChunk(t *testing.T) chunkenc.Chunk {
chunk := chunkenc.NewXORChunk()
length := rand.Int() % 120
@ -565,19 +534,17 @@ func randomChunk(t *testing.T) chunkenc.Chunk {
}
func createChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk, isOOO bool) {
var err error
seriesRef = HeadSeriesRef(rand.Int63())
mint = int64((idx)*1000 + 1)
maxt = int64((idx + 1) * 1000)
chunk = randomChunk(t)
awaitCb := make(chan struct{})
if rand.Intn(2) == 0 {
isOOO = true
}
chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, isOOO, func(error) {
require.NoError(t, err)
close(awaitCb)
syncWrite(t, hrw, func() {
chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, isOOO)
})
<-awaitCb
return seriesRef, chunkRef, mint, maxt, chunk, isOOO
}

View file

@ -7049,97 +7049,6 @@ func testPanicOnApplyConfigAppendV2(t *testing.T, scenario sampleTypeScenario) {
require.NoError(t, err)
}
func TestDiskFillingUpAfterDisablingOOO_AppendV2(t *testing.T) {
t.Parallel()
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testDiskFillingUpAfterDisablingOOOAppenderV2(t, scenario)
})
}
}
func testDiskFillingUpAfterDisablingOOOAppenderV2(t *testing.T, scenario sampleTypeScenario) {
t.Parallel()
ctx := context.Background()
opts := DefaultOptions()
opts.OutOfOrderTimeWindow = 60 * time.Minute.Milliseconds()
db := newTestDB(t, withOpts(opts))
db.DisableCompactions()
series1 := labels.FromStrings("foo", "bar1")
var allSamples []chunks.Sample
addSamples := func(fromMins, toMins int64) {
app := db.AppenderV2(context.Background())
for m := fromMins; m <= toMins; m++ {
ts := m * time.Minute.Milliseconds()
_, s, err := scenario.appendFunc(storage.AppenderV2AsLimitedV1(app), series1, ts, ts)
require.NoError(t, err)
allSamples = append(allSamples, s)
}
require.NoError(t, app.Commit())
}
// In-order samples.
addSamples(290, 300)
// OOO samples.
addSamples(250, 299)
// Restart DB with OOO disabled.
require.NoError(t, db.Close())
opts.OutOfOrderTimeWindow = 0
db = newTestDB(t, withDir(db.Dir()), withOpts(opts))
db.DisableCompactions()
ms := db.head.series.getByHash(series1.Hash(), series1)
require.NotEmpty(t, ms.ooo.oooMmappedChunks, "OOO mmap chunk was not replayed")
checkMmapFileContents := func(contains, notContains []string) {
mmapDir := mmappedChunksDir(db.head.opts.ChunkDirRoot)
files, err := os.ReadDir(mmapDir)
require.NoError(t, err)
fnames := make([]string, 0, len(files))
for _, f := range files {
fnames = append(fnames, f.Name())
}
for _, f := range contains {
require.Contains(t, fnames, f)
}
for _, f := range notContains {
require.NotContains(t, fnames, f)
}
}
// Add in-order samples until ready for compaction..
addSamples(301, 500)
// Check that m-map files gets deleted properly after compactions.
db.head.mmapHeadChunks()
checkMmapFileContents([]string{"000001", "000002"}, nil)
require.NoError(t, db.Compact(ctx))
checkMmapFileContents([]string{"000002"}, []string{"000001"})
require.Nil(t, ms.ooo, "OOO mmap chunk was not compacted")
addSamples(501, 650)
db.head.mmapHeadChunks()
checkMmapFileContents([]string{"000002", "000003"}, []string{"000001"})
require.NoError(t, db.Compact(ctx))
checkMmapFileContents(nil, []string{"000001", "000002", "000003"})
// Verify that WBL is empty.
files, err := os.ReadDir(db.head.wbl.Dir())
require.NoError(t, err)
require.Len(t, files, 1) // Last empty file after compaction.
finfo, err := files[0].Info()
require.NoError(t, err)
require.Equal(t, int64(0), finfo.Size())
}
func TestHistogramAppendAndQuery_AppendV2(t *testing.T) {
t.Run("integer histograms", func(t *testing.T) {
testHistogramAppendAndQueryHelperAppendV2(t, false)

View file

@ -126,6 +126,15 @@ func newTestDB(t testing.TB, opts ...testDBOpt) (db *DB) {
db, err = open(o.dir, nil, nil, o.opts, o.rngs, nil)
}
require.NoError(t, err)
db.Head().chunkDiskMapper.SetTestAsyncHandleWriteChunkErr(func(err error) {
// TODO(bwplotka): At the moment normal async handling panics. This can get the tests stuck (recovery + further close
// with unlocked mutexes). Replace panics with something more robust, but for now handle those (expect no error)
// in tests.
t.Helper()
t.Error(err)
})
t.Cleanup(func() {
// Always close. DB is safe for close-after-close.
require.NoError(t, db.Close())
@ -8304,16 +8313,22 @@ func testPanicOnApplyConfig(t *testing.T, scenario sampleTypeScenario) {
func TestDiskFillingUpAfterDisablingOOO(t *testing.T) {
t.Parallel()
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testDiskFillingUpAfterDisablingOOO(t, scenario)
})
for _, appV2 := range []bool{true, false} {
for name, scenario := range sampleTypeScenarios {
t.Run(fmt.Sprintf("sample=%v/appV2=%v", name, appV2), func(t *testing.T) {
testDiskFillingUpAfterDisablingOOO(t, scenario, func(db *DB, ctx context.Context) storage.LimitedAppenderV1 {
if appV2 {
return storage.AppenderV2AsLimitedV1(db.AppenderV2(ctx))
}
return db.Appender(ctx)
})
})
}
}
}
func testDiskFillingUpAfterDisablingOOO(t *testing.T, scenario sampleTypeScenario) {
func testDiskFillingUpAfterDisablingOOO(t *testing.T, scenario sampleTypeScenario, appenderFn func(db *DB, ctx context.Context) storage.LimitedAppenderV1) {
t.Parallel()
ctx := context.Background()
opts := DefaultOptions()
opts.OutOfOrderTimeWindow = 60 * time.Minute.Milliseconds()
@ -8321,10 +8336,14 @@ func testDiskFillingUpAfterDisablingOOO(t *testing.T, scenario sampleTypeScenari
db := newTestDB(t, withOpts(opts))
db.DisableCompactions()
series1 := labels.FromStrings("foo", "bar1")
var allSamples []chunks.Sample
var (
ctx = t.Context()
series1 = labels.FromStrings("foo", "bar1")
allSamples []chunks.Sample
)
addSamples := func(fromMins, toMins int64) {
app := db.Appender(context.Background())
app := appenderFn(db, ctx)
for m := fromMins; m <= toMins; m++ {
ts := m * time.Minute.Milliseconds()
_, s, err := scenario.appendFunc(app, series1, ts, ts)
@ -8367,13 +8386,16 @@ func testDiskFillingUpAfterDisablingOOO(t *testing.T, scenario sampleTypeScenari
}
}
// Add in-order samples until ready for compaction..
// Add in-order samples until ready for compaction.
addSamples(301, 500)
// Check that m-map files gets deleted properly after compactions.
db.head.mmapHeadChunks()
checkMmapFileContents([]string{"000001", "000002"}, nil)
// NOTE: We are investigating flaky errors from this compaction on x383 architecture.
// See https://github.com/prometheus/prometheus/issues/17941#issuecomment-3846381263
require.NoError(t, db.Compact(ctx))
checkMmapFileContents([]string{"000002"}, []string{"000001"})
require.Nil(t, ms.ooo, "OOO mmap chunk was not compacted")

View file

@ -2181,8 +2181,8 @@ func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMap
}
chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64)
if err != nil {
handleChunkWriteError(err)
return nil
// TODO(bwplotka): Propagate error correctly.
panic(err)
}
chunkRefs := make([]chunks.ChunkDiskMapperRef, 0, len(chks))
for _, memchunk := range chks {
@ -2190,7 +2190,7 @@ func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMap
logger.Error("Too many OOO chunks, dropping data", "series", s.lset.String())
break
}
chunkRef := chunkDiskMapper.WriteChunk(s.ref, memchunk.minTime, memchunk.maxTime, memchunk.chunk, true, handleChunkWriteError)
chunkRef := chunkDiskMapper.WriteChunk(s.ref, memchunk.minTime, memchunk.maxTime, memchunk.chunk, true)
chunkRefs = append(chunkRefs, chunkRef)
s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks, &mmappedChunk{
ref: chunkRef,
@ -2215,7 +2215,7 @@ func (s *memSeries) mmapChunks(chunkDiskMapper *chunks.ChunkDiskMapper) (count i
// then we need to write chunks t0 to t3, but skip s.headChunks.
for i := s.headChunks.len() - 1; i > 0; i-- {
chk := s.headChunks.atOffset(i)
chunkRef := chunkDiskMapper.WriteChunk(s.ref, chk.minTime, chk.maxTime, chk.chunk, false, handleChunkWriteError)
chunkRef := chunkDiskMapper.WriteChunk(s.ref, chk.minTime, chk.maxTime, chk.chunk, false)
s.mmappedChunks = append(s.mmappedChunks, &mmappedChunk{
ref: chunkRef,
numSamples: uint16(chk.chunk.NumSamples()),
@ -2231,12 +2231,6 @@ func (s *memSeries) mmapChunks(chunkDiskMapper *chunks.ChunkDiskMapper) (count i
return count
}
func handleChunkWriteError(err error) {
if err != nil && !errors.Is(err, chunks.ErrChunkDiskMapperClosed) {
panic(err)
}
}
// Rollback removes the samples and exemplars from headAppender and writes any series to WAL.
func (a *headAppenderBase) Rollback() (err error) {
if a.closed {

View file

@ -3124,7 +3124,7 @@ func TestHead_Init_DiscardChunksWithUnsupportedEncoding(t *testing.T) {
uc := newUnsupportedChunk()
// Make this chunk not overlap with the previous and the next
h.chunkDiskMapper.WriteChunk(chunks.HeadSeriesRef(seriesRef), 500, 600, uc, false, func(err error) { require.NoError(t, err) })
h.chunkDiskMapper.WriteChunk(chunks.HeadSeriesRef(seriesRef), 500, 600, uc, false)
app = h.AppenderV2(ctx)
for i := 700; i < 1200; i++ {

View file

@ -93,6 +93,13 @@ func newTestHeadWithOptions(t testing.TB, compressWAL compression.Type, opts *He
_ = h.Close()
})
// TODO(bwplotka): At the moment normal async handling panics. This can get the tests stuck (recovery + further close
// with unlocked mutexes). Replace panics with something more robust, but for now handle those.
h.chunkDiskMapper.SetTestAsyncHandleWriteChunkErr(func(err error) {
t.Helper()
t.Error(err)
})
require.NoError(t, h.chunkDiskMapper.IterateAllChunks(func(chunks.HeadSeriesRef, chunks.ChunkDiskMapperRef, int64, int64, uint16, chunkenc.Encoding, bool) error {
return nil
}))
@ -338,8 +345,7 @@ func BenchmarkLoadWLs(b *testing.B) {
// Write mmapped chunks.
if c.mmappedChunkT != 0 {
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, mmappedChunksDir(dir), chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
require.NoError(b, err)
chunkDiskMapper := chunks.NewTestChunkDiskMapper(b, mmappedChunksDir(dir), chunks.DefaultWriteQueueSize)
cOpts := chunkOpts{
chunkDiskMapper: chunkDiskMapper,
chunkRange: c.mmappedChunkT,
@ -1467,11 +1473,8 @@ func TestHead_Truncate(t *testing.T) {
func TestMemSeries_truncateChunks(t *testing.T) {
dir := t.TempDir()
// This is usually taken from the Head, but passing manually here.
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
require.NoError(t, err)
defer func() {
require.NoError(t, chunkDiskMapper.Close())
}()
chunkDiskMapper := chunks.NewTestChunkDiskMapper(t, dir, chunks.DefaultWriteQueueSize)
cOpts := chunkOpts{
chunkDiskMapper: chunkDiskMapper,
chunkRange: 2000,
@ -1618,12 +1621,7 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
dir := t.TempDir()
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
require.NoError(t, err)
defer func() {
require.NoError(t, chunkDiskMapper.Close())
}()
chunkDiskMapper := chunks.NewTestChunkDiskMapper(t, t.TempDir(), chunks.DefaultWriteQueueSize)
series := newMemSeries(labels.EmptyLabels(), 1, 0, true, false)
@ -2179,11 +2177,8 @@ func TestComputeChunkEndTime(t *testing.T) {
func TestMemSeries_append(t *testing.T) {
dir := t.TempDir()
// This is usually taken from the Head, but passing manually here.
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
require.NoError(t, err)
defer func() {
require.NoError(t, chunkDiskMapper.Close())
}()
chunkDiskMapper := chunks.NewTestChunkDiskMapper(t, dir, chunks.DefaultWriteQueueSize)
cOpts := chunkOpts{
chunkDiskMapper: chunkDiskMapper,
chunkRange: 500,
@ -2240,11 +2235,8 @@ func TestMemSeries_append(t *testing.T) {
func TestMemSeries_appendHistogram(t *testing.T) {
dir := t.TempDir()
// This is usually taken from the Head, but passing manually here.
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
require.NoError(t, err)
defer func() {
require.NoError(t, chunkDiskMapper.Close())
}()
chunkDiskMapper := chunks.NewTestChunkDiskMapper(t, dir, chunks.DefaultWriteQueueSize)
cOpts := chunkOpts{
chunkDiskMapper: chunkDiskMapper,
chunkRange: int64(1000),
@ -2302,11 +2294,8 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
const samplesPerChunk = 120
dir := t.TempDir()
// This is usually taken from the Head, but passing manually here.
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, chunkDiskMapper.Close())
})
chunkDiskMapper := chunks.NewTestChunkDiskMapper(t, dir, chunks.DefaultWriteQueueSize)
cOpts := chunkOpts{
chunkDiskMapper: chunkDiskMapper,
chunkRange: DefaultBlockDuration,
@ -3651,11 +3640,8 @@ func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) {
func TestIteratorSeekIntoBuffer(t *testing.T) {
dir := t.TempDir()
// This is usually taken from the Head, but passing manually here.
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
require.NoError(t, err)
defer func() {
require.NoError(t, chunkDiskMapper.Close())
}()
chunkDiskMapper := chunks.NewTestChunkDiskMapper(t, dir, chunks.DefaultWriteQueueSize)
cOpts := chunkOpts{
chunkDiskMapper: chunkDiskMapper,
chunkRange: 500,
@ -5680,7 +5666,7 @@ func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) {
uc := newUnsupportedChunk()
// Make this chunk not overlap with the previous and the next
h.chunkDiskMapper.WriteChunk(chunks.HeadSeriesRef(seriesRef), 500, 600, uc, false, func(err error) { require.NoError(t, err) })
h.chunkDiskMapper.WriteChunk(chunks.HeadSeriesRef(seriesRef), 500, 600, uc, false)
app = h.Appender(ctx)
for i := 700; i < 1200; i++ {

View file

@ -108,8 +108,8 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap
if len(c.chunk.samples) > 0 { // Empty samples happens in tests, at least.
chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(c.minTime, c.maxTime)
if err != nil {
handleChunkWriteError(err)
return nil
// TODO(bwplotka): Propagate error correctly.
panic(err)
}
for _, chk := range chks {
addChunk(chk.minTime, chk.maxTime, ref, chk.chunk)