From 5384814a7448161e05cbbd76349ce123d0081bb4 Mon Sep 17 00:00:00 2001 From: bwplotka Date: Wed, 4 Feb 2026 10:47:05 +0000 Subject: [PATCH] tests: Avoid deadlocks on DB closes with chunk mapper panics Signed-off-by: bwplotka --- storage/interface_append.go | 2 + tsdb/chunks/chunk_write_queue.go | 8 +- tsdb/chunks/chunk_write_queue_test.go | 58 ++------ tsdb/chunks/head_chunks.go | 106 ++++++++++++--- tsdb/chunks/head_chunks_test.go | 185 +++++++++++--------------- tsdb/db_append_v2_test.go | 91 ------------- tsdb/db_test.go | 42 ++++-- tsdb/head_append.go | 14 +- tsdb/head_append_v2_test.go | 2 +- tsdb/head_test.go | 54 +++----- tsdb/ooo_head_read.go | 4 +- 11 files changed, 240 insertions(+), 326 deletions(-) diff --git a/storage/interface_append.go b/storage/interface_append.go index beb17f9e16..3753544eb0 100644 --- a/storage/interface_append.go +++ b/storage/interface_append.go @@ -206,6 +206,8 @@ type AppenderTransaction interface { // This is to support migration to AppenderV2. // TODO(bwplotka): Remove once migration to AppenderV2 is fully complete. type LimitedAppenderV1 interface { + AppenderTransaction + Append(ref SeriesRef, l labels.Labels, t int64, v float64) (SeriesRef, error) AppendHistogram(ref SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) } diff --git a/tsdb/chunks/chunk_write_queue.go b/tsdb/chunks/chunk_write_queue.go index 1a046ea00a..61d6cdf1af 100644 --- a/tsdb/chunks/chunk_write_queue.go +++ b/tsdb/chunks/chunk_write_queue.go @@ -43,7 +43,6 @@ type chunkWriteJob struct { chk chunkenc.Chunk ref ChunkDiskMapperRef isOOO bool - callback func(error) } // chunkWriteQueue is a queue for writing chunks to disk in a non-blocking fashion. @@ -77,7 +76,7 @@ type chunkWriteQueue struct { } // writeChunkF is a function which writes chunks, it is dynamic to allow mocking in tests. -type writeChunkF func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool, bool) error +type writeChunkF func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool, bool) func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChunkF) *chunkWriteQueue { counters := prometheus.NewCounterVec( @@ -131,10 +130,7 @@ func (c *chunkWriteQueue) start() { } func (c *chunkWriteQueue) processJob(job chunkWriteJob) { - err := c.writeChunk(job.seriesRef, job.mint, job.maxt, job.chk, job.ref, job.isOOO, job.cutFile) - if job.callback != nil { - job.callback(err) - } + c.writeChunk(job.seriesRef, job.mint, job.maxt, job.chk, job.ref, job.isOOO, job.cutFile) c.chunkRefMapMtx.Lock() defer c.chunkRefMapMtx.Unlock() diff --git a/tsdb/chunks/chunk_write_queue_test.go b/tsdb/chunks/chunk_write_queue_test.go index 489ff74210..8e1a0366ef 100644 --- a/tsdb/chunks/chunk_write_queue_test.go +++ b/tsdb/chunks/chunk_write_queue_test.go @@ -14,7 +14,6 @@ package chunks import ( - "errors" "fmt" "sync" "testing" @@ -31,9 +30,8 @@ func TestChunkWriteQueue_GettingChunkFromQueue(t *testing.T) { blockWriterWg.Add(1) // blockingChunkWriter blocks until blockWriterWg is done. - blockingChunkWriter := func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool, bool) error { + blockingChunkWriter := func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool, bool) { blockWriterWg.Wait() - return nil } q := newChunkWriteQueue(nil, 1000, blockingChunkWriter) @@ -63,14 +61,15 @@ func TestChunkWriteQueue_WritingThroughQueue(t *testing.T) { gotCutFile bool ) - blockingChunkWriter := func(seriesRef HeadSeriesRef, mint, maxt int64, chunk chunkenc.Chunk, ref ChunkDiskMapperRef, _, cutFile bool) error { + awaitCb := make(chan struct{}) + blockingChunkWriter := func(seriesRef HeadSeriesRef, mint, maxt int64, chunk chunkenc.Chunk, ref ChunkDiskMapperRef, _, cutFile bool) { gotSeriesRef = seriesRef gotMint = mint gotMaxt = maxt gotChunk = chunk gotRef = ref gotCutFile = cutFile - return nil + close(awaitCb) } q := newChunkWriteQueue(nil, 1000, blockingChunkWriter) @@ -81,10 +80,7 @@ func TestChunkWriteQueue_WritingThroughQueue(t *testing.T) { chunk := chunkenc.NewXORChunk() ref := newChunkDiskMapperRef(321, 123) cutFile := true - awaitCb := make(chan struct{}) - require.NoError(t, q.addJob(chunkWriteJob{seriesRef: seriesRef, mint: mint, maxt: maxt, chk: chunk, ref: ref, cutFile: cutFile, callback: func(error) { - close(awaitCb) - }})) + require.NoError(t, q.addJob(chunkWriteJob{seriesRef: seriesRef, mint: mint, maxt: maxt, chk: chunk, ref: ref, cutFile: cutFile})) <-awaitCb // Compare whether the write function has received all job attributes correctly. @@ -100,27 +96,24 @@ func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) { sizeLimit := 100 unblockChunkWriterCh := make(chan struct{}, sizeLimit) + var callbackWg sync.WaitGroup + // blockingChunkWriter blocks until the unblockChunkWriterCh channel returns a value. - blockingChunkWriter := func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool, bool) error { + blockingChunkWriter := func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool, bool) { <-unblockChunkWriterCh - return nil + callbackWg.Done() } q := newChunkWriteQueue(nil, sizeLimit, blockingChunkWriter) + defer q.stop() // Unblock writers when shutting down. defer close(unblockChunkWriterCh) - var chunkRef ChunkDiskMapperRef - var callbackWg sync.WaitGroup + addChunk := func() { callbackWg.Add(1) - require.NoError(t, q.addJob(chunkWriteJob{ - ref: chunkRef, - callback: func(error) { - callbackWg.Done() - }, - })) + require.NoError(t, q.addJob(chunkWriteJob{ref: chunkRef})) chunkRef++ } @@ -182,37 +175,13 @@ func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) { require.Eventually(t, q.queueIsEmpty, 500*time.Millisecond, 50*time.Millisecond) } -func TestChunkWriteQueue_HandlerErrorViaCallback(t *testing.T) { - testError := errors.New("test error") - chunkWriter := func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool, bool) error { - return testError - } - - awaitCb := make(chan struct{}) - var gotError error - callback := func(err error) { - gotError = err - close(awaitCb) - } - - q := newChunkWriteQueue(nil, 1, chunkWriter) - defer q.stop() - - job := chunkWriteJob{callback: callback} - require.NoError(t, q.addJob(job)) - - <-awaitCb - - require.Equal(t, testError, gotError) -} - func BenchmarkChunkWriteQueue_addJob(b *testing.B) { for _, withReads := range []bool{false, true} { b.Run(fmt.Sprintf("with reads %t", withReads), func(b *testing.B) { for _, concurrentWrites := range []int{1, 10, 100, 1000} { b.Run(fmt.Sprintf("%d concurrent writes", concurrentWrites), func(b *testing.B) { issueReadSignal := make(chan struct{}) - q := newChunkWriteQueue(nil, 1000, func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool, bool) error { + q := newChunkWriteQueue(nil, 1000, func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool, bool) { if withReads { select { case issueReadSignal <- struct{}{}: @@ -220,7 +189,6 @@ func BenchmarkChunkWriteQueue_addJob(b *testing.B) { // Can't write to issueReadSignal, don't block but omit read instead. } } - return nil }) b.Cleanup(func() { // Stopped already, so no more writes will happen. diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index 809cd6b889..d9d05945b6 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -26,9 +26,11 @@ import ( "slices" "strconv" "sync" + "testing" "github.com/dennwc/varint" "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" "go.uber.org/atomic" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -209,6 +211,9 @@ type ChunkDiskMapper struct { chkWriter *bufio.Writer // Writer for the current open file. crc32 hash.Hash writePathMtx sync.Mutex + // asyncWrites is useful for tests to synchronize queue-based async writes. + // Locked with writePathMtx. + asyncWrites int64 // Reader. // The int key in the map is the file number on the disk. @@ -228,7 +233,10 @@ type ChunkDiskMapper struct { writeQueue *chunkWriteQueue - closed bool + // TODO(bwplotka): async err handling for queued writes. Consider better error handling. + // Current prod async handleAsyncErr panics on errors other than closed. + testAsyncHandleWriteChunkErr func(error) + closed bool } // mmappedChunkFile provides mmap access to an entire head chunks file that holds many chunks. @@ -267,7 +275,19 @@ func NewChunkDiskMapper(reg prometheus.Registerer, dir string, pool chunkenc.Poo } if writeQueueSize > 0 { - m.writeQueue = newChunkWriteQueue(reg, writeQueueSize, m.writeChunk) + m.writeQueue = newChunkWriteQueue(reg, writeQueueSize, func( + seriesRef HeadSeriesRef, + mint int64, + maxt int64, + chk chunkenc.Chunk, + ref ChunkDiskMapperRef, + isOOO bool, + cutFile bool, + ) { + if err := m.writeChunk(seriesRef, mint, maxt, chk, ref, isOOO, cutFile); err != nil { + m.asyncHandleWriteChunkErr(err) + } + }) } if m.pool == nil { @@ -277,6 +297,36 @@ func NewChunkDiskMapper(reg prometheus.Registerer, dir string, pool chunkenc.Poo return m, m.openMMapFiles() } +func (cdm *ChunkDiskMapper) SetTestAsyncHandleWriteChunkErr(fn func(error)) { + cdm.testAsyncHandleWriteChunkErr = fn +} + +func (cdm *ChunkDiskMapper) getAsyncWrites() int64 { + cdm.writePathMtx.Lock() + defer cdm.writePathMtx.Unlock() + return cdm.asyncWrites +} + +// asyncHandleWriteChunkErr allows async custom error handling for chunk write errors. It's async because +// some work is queued and done asynchronously. +// +// TODO(bwplotka): Don't be lazy. Propagate errors properly and return in some way (e.g. error channel), +// don't panic. We see those panics in tests on slow CIs and especially in tests, the panic then deadlocks DB closes +// (various locks are unlocked). +func (cdm *ChunkDiskMapper) asyncHandleWriteChunkErr(err error) { + if cdm.testAsyncHandleWriteChunkErr != nil { + cdm.testAsyncHandleWriteChunkErr(err) + return + } + if testing.Testing() { + panic("ensure testAsyncHandleWriteChunkErr is always mocked for clean test failures") + } + if errors.Is(err, ErrChunkDiskMapperClosed) { + return + } + panic(err) +} + // Chunk encodings for out-of-order chunks. // These encodings must be only used by the Head block for its internal bookkeeping. const ( @@ -454,33 +504,29 @@ func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr erro // WriteChunk writes the chunk to disk. // The returned chunk ref is the reference from where the chunk encoding starts for the chunk. -func (cdm *ChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, isOOO bool, callback func(err error)) (chkRef ChunkDiskMapperRef) { +func (cdm *ChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, isOOO bool) (chkRef ChunkDiskMapperRef) { // cdm.evtlPosMtx must be held to serialize the calls to cdm.evtlPos.getNextChunkRef() and the writing of the chunk (either with or without queue). cdm.evtlPosMtx.Lock() defer cdm.evtlPosMtx.Unlock() ref, cutFile := cdm.evtlPos.getNextChunkRef(chk) if cdm.writeQueue != nil { - return cdm.writeChunkViaQueue(ref, isOOO, cutFile, seriesRef, mint, maxt, chk, callback) + return cdm.writeChunkViaQueue(ref, isOOO, cutFile, seriesRef, mint, maxt, chk) } - err := cdm.writeChunk(seriesRef, mint, maxt, chk, ref, isOOO, cutFile) - if callback != nil { - callback(err) + if err := cdm.writeChunk(seriesRef, mint, maxt, chk, ref, isOOO, cutFile); err != nil { + cdm.asyncHandleWriteChunkErr(err) } - return ref } -func (cdm *ChunkDiskMapper) writeChunkViaQueue(ref ChunkDiskMapperRef, isOOO, cutFile bool, seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef ChunkDiskMapperRef) { +func (cdm *ChunkDiskMapper) writeChunkViaQueue(ref ChunkDiskMapperRef, isOOO, cutFile bool, seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk) (chkRef ChunkDiskMapperRef) { var err error - if callback != nil { - defer func() { - if err != nil { - callback(err) - } - }() - } + defer func() { + if err != nil { + cdm.asyncHandleWriteChunkErr(err) + } + }() err = cdm.writeQueue.addJob(chunkWriteJob{ cutFile: cutFile, @@ -490,7 +536,6 @@ func (cdm *ChunkDiskMapper) writeChunkViaQueue(ref ChunkDiskMapperRef, isOOO, cu chk: chk, ref: ref, isOOO: isOOO, - callback: callback, }) return ref @@ -560,7 +605,7 @@ func (cdm *ChunkDiskMapper) writeChunk(seriesRef HeadSeriesRef, mint, maxt int64 return err } } - + cdm.asyncWrites++ return nil } @@ -1156,3 +1201,28 @@ func (cb *chunkBuffer) clear() { cb.inBufferChunksMtxs[i].Unlock() } } + +// NewTestChunkDiskMapper returns new chunk mapper for testing purposes. +// This is mostly needed for rigorous async error handling until we get rid of panics. +func NewTestChunkDiskMapper(t testing.TB, dir string, writeQueueSize int) *ChunkDiskMapper { + hrw, err := NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), DefaultWriteBufferSize, writeQueueSize) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, hrw.Close()) + }) + + // TODO(bwplotka): At the moment normal async handling panics. This can get the tests stuck (recovery + further close + // with unlocked mutexes). Replace panics with something more robust, but for now handle those (expect no error) + // in tests. + hrw.testAsyncHandleWriteChunkErr = func(err error) { + t.Helper() + t.Error(err) + } + + require.False(t, hrw.fileMaxtSet) + require.NoError(t, hrw.IterateAllChunks(func(HeadSeriesRef, ChunkDiskMapperRef, int64, int64, uint16, chunkenc.Encoding, bool) error { + return nil + })) + require.True(t, hrw.fileMaxtSet) + return hrw +} diff --git a/tsdb/chunks/head_chunks_test.go b/tsdb/chunks/head_chunks_test.go index c3cbc5a618..c0a6eccdbc 100644 --- a/tsdb/chunks/head_chunks_test.go +++ b/tsdb/chunks/head_chunks_test.go @@ -19,18 +19,20 @@ import ( "math/rand" "os" "strconv" - "sync" "testing" "time" "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/util/testutil/synctest" ) var writeQueueSize int func TestMain(m *testing.M) { + // TODO(bwplotka): This is too hidden; consider moving to explicit subtests. + // Run all tests with the chunk write queue disabled. writeQueueSize = 0 exitVal := m.Run() @@ -44,11 +46,18 @@ func TestMain(m *testing.M) { os.Exit(exitVal) } +func syncWrite(t testing.TB, hrw *ChunkDiskMapper, writeFn func()) { + t.Helper() + + before := hrw.getAsyncWrites() + writeFn() + require.Eventually(t, func() bool { + return hrw.getAsyncWrites() > before + }, 1*time.Minute, 10*time.Millisecond) +} + func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { - hrw := createChunkDiskMapper(t, "") - defer func() { - require.NoError(t, hrw.Close()) - }() + hrw := NewTestChunkDiskMapper(t, t.TempDir(), writeQueueSize) expectedBytes := []byte{} nextChunkOffset := uint64(HeadChunkFileHeaderSize) @@ -152,7 +161,7 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { // Testing IterateAllChunks method. dir := hrw.dir.Name() require.NoError(t, hrw.Close()) - hrw = createChunkDiskMapper(t, dir) + hrw = NewTestChunkDiskMapper(t, dir, writeQueueSize) idx := 0 require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, _, maxt int64, numSamples uint16, _ chunkenc.Encoding, isOOO bool) error { @@ -181,10 +190,7 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { // * The active file is not deleted even if the passed time makes it eligible to be deleted. // * Non-empty current file leads to creation of another file after truncation. func TestChunkDiskMapper_Truncate(t *testing.T) { - hrw := createChunkDiskMapper(t, "") - defer func() { - require.NoError(t, hrw.Close()) - }() + hrw := NewTestChunkDiskMapper(t, t.TempDir(), writeQueueSize) timeRange := 0 addChunk := func() { @@ -192,14 +198,9 @@ func TestChunkDiskMapper_Truncate(t *testing.T) { step := 100 mint, maxt := timeRange+1, timeRange+step-1 - var err error - awaitCb := make(chan struct{}) - hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), false, func(cbErr error) { - err = cbErr - close(awaitCb) + syncWrite(t, hrw, func() { + _ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), false) }) - <-awaitCb - require.NoError(t, err) timeRange += step } @@ -237,7 +238,7 @@ func TestChunkDiskMapper_Truncate(t *testing.T) { require.NoError(t, hrw.Close()) // Restarted. - hrw = createChunkDiskMapper(t, dir) + hrw = NewTestChunkDiskMapper(t, dir, writeQueueSize) verifyFiles([]int{3, 4, 5, 6, 7, 8}) // New file is created after restart even if last file was empty. @@ -275,24 +276,18 @@ func TestChunkDiskMapper_Truncate(t *testing.T) { // This test exposes https://github.com/prometheus/prometheus/issues/7412 where the truncation // simply deleted all empty files instead of stopping once it encountered a non-empty file. func TestChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) { - hrw := createChunkDiskMapper(t, "") - defer func() { - require.NoError(t, hrw.Close()) - }() + hrw := NewTestChunkDiskMapper(t, t.TempDir(), writeQueueSize) timeRange := 0 addChunk := func() { t.Helper() - awaitCb := make(chan struct{}) - step := 100 mint, maxt := timeRange+1, timeRange+step-1 - hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), false, func(err error) { - close(awaitCb) - require.NoError(t, err) + + syncWrite(t, hrw, func() { + _ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), false) }) - <-awaitCb timeRange += step } @@ -354,83 +349,76 @@ func TestChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) { dir := hrw.dir.Name() require.NoError(t, hrw.Close()) // Restarting checks for unsequential files. - hrw = createChunkDiskMapper(t, dir) + hrw = NewTestChunkDiskMapper(t, dir, writeQueueSize) + verifyFiles([]int{5, 6, 7}) } func TestChunkDiskMapper_Truncate_WriteQueueRaceCondition(t *testing.T) { - hrw := createChunkDiskMapper(t, "") - t.Cleanup(func() { - require.NoError(t, hrw.Close()) - }) + hrw := NewTestChunkDiskMapper(t, t.TempDir(), writeQueueSize) // This test should only run when the queue is enabled. if hrw.writeQueue == nil { t.Skip("This test should only run when the queue is enabled") } - // Add an artificial delay in the writeChunk function to easily trigger the race condition. - origWriteChunk := hrw.writeQueue.writeChunk - hrw.writeQueue.writeChunk = func(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, ref ChunkDiskMapperRef, isOOO, cutFile bool) error { - time.Sleep(100 * time.Millisecond) - return origWriteChunk(seriesRef, mint, maxt, chk, ref, isOOO, cutFile) - } + synctest.Test(t, func(t *testing.T) { + // Add an artificial delay in the writeChunk function to trigger the race condition. + origWriteChunk := hrw.writeQueue.writeChunk + hrw.writeQueue.writeChunk = func(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, ref ChunkDiskMapperRef, isOOO, cutFile bool) { + time.Sleep(100 * time.Millisecond) + origWriteChunk(seriesRef, mint, maxt, chk, ref, isOOO, cutFile) + } - wg := sync.WaitGroup{} - wg.Add(2) + // Write a chunk. Since the queue is enabled, the chunk will be written asynchronously with our writeQueue + // that adds the artificial delay. + ref := hrw.WriteChunk(1, 0, 10, randomChunk(t), false) - // Write a chunk. Since the queue is enabled, the chunk will be written asynchronously (with the artificial delay). - ref := hrw.WriteChunk(1, 0, 10, randomChunk(t), false, func(err error) { - defer wg.Done() - require.NoError(t, err) + seq, _ := ref.Unpack() + require.Equal(t, 1, seq) + + // Truncate, simulating that all chunks from segment files before 1 can be dropped. + require.NoError(t, hrw.Truncate(1)) + + // Unblock first write. + synctest.Wait() + + // Request to cut a new file when writing the next chunk. If there's a race condition, cutting a new file will + // allow us to detect there's actually an issue with the sequence number (because it's checked when a new segment + // file is created). + hrw.CutNewFile() + + // Write another chunk. This will cut a new file. + ref = hrw.WriteChunk(1, 0, 10, randomChunk(t), false) + + seq, _ = ref.Unpack() + require.Equal(t, 2, seq) + + // Unblock second write. + synctest.Wait() }) - seq, _ := ref.Unpack() - require.Equal(t, 1, seq) - - // Truncate, simulating that all chunks from segment files before 1 can be dropped. - require.NoError(t, hrw.Truncate(1)) - - // Request to cut a new file when writing the next chunk. If there's a race condition, cutting a new file will - // allow us to detect there's actually an issue with the sequence number (because it's checked when a new segment - // file is created). - hrw.CutNewFile() - - // Write another chunk. This will cut a new file. - ref = hrw.WriteChunk(1, 0, 10, randomChunk(t), false, func(err error) { - defer wg.Done() - require.NoError(t, err) - }) - - seq, _ = ref.Unpack() - require.Equal(t, 2, seq) - - wg.Wait() + // Ensure writes eventually happened. + require.Eventually(t, func() bool { + return hrw.getAsyncWrites() == 2 + }, 1*time.Minute, 10*time.Millisecond) } // TestHeadReadWriter_TruncateAfterFailedIterateChunks tests for // https://github.com/prometheus/prometheus/issues/7753 func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) { - hrw := createChunkDiskMapper(t, "") - defer func() { - require.NoError(t, hrw.Close()) - }() + hrw := NewTestChunkDiskMapper(t, t.TempDir(), writeQueueSize) // Write a chunks to iterate on it later. - var err error - awaitCb := make(chan struct{}) - hrw.WriteChunk(1, 0, 1000, randomChunk(t), false, func(cbErr error) { - err = cbErr - close(awaitCb) + syncWrite(t, hrw, func() { + hrw.WriteChunk(1, 0, 1000, randomChunk(t), false) }) - <-awaitCb - require.NoError(t, err) dir := hrw.dir.Name() require.NoError(t, hrw.Close()) // Restarting to recreate https://github.com/prometheus/prometheus/issues/7753. - hrw = createChunkDiskMapper(t, dir) + hrw = NewTestChunkDiskMapper(t, dir, writeQueueSize) // Forcefully failing IterateAllChunks. require.Error(t, hrw.IterateAllChunks(func(HeadSeriesRef, ChunkDiskMapperRef, int64, int64, uint16, chunkenc.Encoding, bool) error { @@ -442,7 +430,7 @@ func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) { } func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) { - hrw := createChunkDiskMapper(t, "") + hrw := NewTestChunkDiskMapper(t, t.TempDir(), writeQueueSize) timeRange := 0 addChunk := func() { @@ -450,14 +438,11 @@ func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) { step := 100 mint, maxt := timeRange+1, timeRange+step-1 - var err error - awaitCb := make(chan struct{}) - hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), false, func(cbErr error) { - err = cbErr - close(awaitCb) + + // Write a chunks to iterate on it later. + syncWrite(t, hrw, func() { + hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), false) }) - <-awaitCb - require.NoError(t, err) timeRange += step } nonEmptyFile := func() { @@ -497,7 +482,7 @@ func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) { checkRepair := func() { // Open chunk disk mapper again, corrupt file should be removed. - hrw = createChunkDiskMapper(t, dir) + hrw = NewTestChunkDiskMapper(t, dir, writeQueueSize) // Removed from memory. require.Len(t, hrw.mmappedChunkFiles, 3) @@ -537,22 +522,6 @@ func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) { checkRepair() } -func createChunkDiskMapper(t *testing.T, dir string) *ChunkDiskMapper { - if dir == "" { - dir = t.TempDir() - } - - hrw, err := NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), DefaultWriteBufferSize, writeQueueSize) - require.NoError(t, err) - require.False(t, hrw.fileMaxtSet) - require.NoError(t, hrw.IterateAllChunks(func(HeadSeriesRef, ChunkDiskMapperRef, int64, int64, uint16, chunkenc.Encoding, bool) error { - return nil - })) - require.True(t, hrw.fileMaxtSet) - - return hrw -} - func randomChunk(t *testing.T) chunkenc.Chunk { chunk := chunkenc.NewXORChunk() length := rand.Int() % 120 @@ -565,19 +534,17 @@ func randomChunk(t *testing.T) chunkenc.Chunk { } func createChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk, isOOO bool) { - var err error seriesRef = HeadSeriesRef(rand.Int63()) mint = int64((idx)*1000 + 1) maxt = int64((idx + 1) * 1000) chunk = randomChunk(t) - awaitCb := make(chan struct{}) + if rand.Intn(2) == 0 { isOOO = true } - chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, isOOO, func(error) { - require.NoError(t, err) - close(awaitCb) + + syncWrite(t, hrw, func() { + chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, isOOO) }) - <-awaitCb return seriesRef, chunkRef, mint, maxt, chunk, isOOO } diff --git a/tsdb/db_append_v2_test.go b/tsdb/db_append_v2_test.go index 16134e8c93..e6bcfb696d 100644 --- a/tsdb/db_append_v2_test.go +++ b/tsdb/db_append_v2_test.go @@ -7049,97 +7049,6 @@ func testPanicOnApplyConfigAppendV2(t *testing.T, scenario sampleTypeScenario) { require.NoError(t, err) } -func TestDiskFillingUpAfterDisablingOOO_AppendV2(t *testing.T) { - t.Parallel() - for name, scenario := range sampleTypeScenarios { - t.Run(name, func(t *testing.T) { - testDiskFillingUpAfterDisablingOOOAppenderV2(t, scenario) - }) - } -} - -func testDiskFillingUpAfterDisablingOOOAppenderV2(t *testing.T, scenario sampleTypeScenario) { - t.Parallel() - ctx := context.Background() - - opts := DefaultOptions() - opts.OutOfOrderTimeWindow = 60 * time.Minute.Milliseconds() - - db := newTestDB(t, withOpts(opts)) - db.DisableCompactions() - - series1 := labels.FromStrings("foo", "bar1") - var allSamples []chunks.Sample - addSamples := func(fromMins, toMins int64) { - app := db.AppenderV2(context.Background()) - for m := fromMins; m <= toMins; m++ { - ts := m * time.Minute.Milliseconds() - _, s, err := scenario.appendFunc(storage.AppenderV2AsLimitedV1(app), series1, ts, ts) - require.NoError(t, err) - allSamples = append(allSamples, s) - } - require.NoError(t, app.Commit()) - } - - // In-order samples. - addSamples(290, 300) - // OOO samples. - addSamples(250, 299) - - // Restart DB with OOO disabled. - require.NoError(t, db.Close()) - - opts.OutOfOrderTimeWindow = 0 - db = newTestDB(t, withDir(db.Dir()), withOpts(opts)) - db.DisableCompactions() - - ms := db.head.series.getByHash(series1.Hash(), series1) - require.NotEmpty(t, ms.ooo.oooMmappedChunks, "OOO mmap chunk was not replayed") - - checkMmapFileContents := func(contains, notContains []string) { - mmapDir := mmappedChunksDir(db.head.opts.ChunkDirRoot) - files, err := os.ReadDir(mmapDir) - require.NoError(t, err) - - fnames := make([]string, 0, len(files)) - for _, f := range files { - fnames = append(fnames, f.Name()) - } - - for _, f := range contains { - require.Contains(t, fnames, f) - } - for _, f := range notContains { - require.NotContains(t, fnames, f) - } - } - - // Add in-order samples until ready for compaction.. - addSamples(301, 500) - - // Check that m-map files gets deleted properly after compactions. - - db.head.mmapHeadChunks() - checkMmapFileContents([]string{"000001", "000002"}, nil) - require.NoError(t, db.Compact(ctx)) - checkMmapFileContents([]string{"000002"}, []string{"000001"}) - require.Nil(t, ms.ooo, "OOO mmap chunk was not compacted") - - addSamples(501, 650) - db.head.mmapHeadChunks() - checkMmapFileContents([]string{"000002", "000003"}, []string{"000001"}) - require.NoError(t, db.Compact(ctx)) - checkMmapFileContents(nil, []string{"000001", "000002", "000003"}) - - // Verify that WBL is empty. - files, err := os.ReadDir(db.head.wbl.Dir()) - require.NoError(t, err) - require.Len(t, files, 1) // Last empty file after compaction. - finfo, err := files[0].Info() - require.NoError(t, err) - require.Equal(t, int64(0), finfo.Size()) -} - func TestHistogramAppendAndQuery_AppendV2(t *testing.T) { t.Run("integer histograms", func(t *testing.T) { testHistogramAppendAndQueryHelperAppendV2(t, false) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 403ce3636a..aa3a2d25e6 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -126,6 +126,15 @@ func newTestDB(t testing.TB, opts ...testDBOpt) (db *DB) { db, err = open(o.dir, nil, nil, o.opts, o.rngs, nil) } require.NoError(t, err) + + db.Head().chunkDiskMapper.SetTestAsyncHandleWriteChunkErr(func(err error) { + // TODO(bwplotka): At the moment normal async handling panics. This can get the tests stuck (recovery + further close + // with unlocked mutexes). Replace panics with something more robust, but for now handle those (expect no error) + // in tests. + t.Helper() + t.Error(err) + }) + t.Cleanup(func() { // Always close. DB is safe for close-after-close. require.NoError(t, db.Close()) @@ -8304,16 +8313,22 @@ func testPanicOnApplyConfig(t *testing.T, scenario sampleTypeScenario) { func TestDiskFillingUpAfterDisablingOOO(t *testing.T) { t.Parallel() - for name, scenario := range sampleTypeScenarios { - t.Run(name, func(t *testing.T) { - testDiskFillingUpAfterDisablingOOO(t, scenario) - }) + for _, appV2 := range []bool{true, false} { + for name, scenario := range sampleTypeScenarios { + t.Run(fmt.Sprintf("sample=%v/appV2=%v", name, appV2), func(t *testing.T) { + testDiskFillingUpAfterDisablingOOO(t, scenario, func(db *DB, ctx context.Context) storage.LimitedAppenderV1 { + if appV2 { + return storage.AppenderV2AsLimitedV1(db.AppenderV2(ctx)) + } + return db.Appender(ctx) + }) + }) + } } } -func testDiskFillingUpAfterDisablingOOO(t *testing.T, scenario sampleTypeScenario) { +func testDiskFillingUpAfterDisablingOOO(t *testing.T, scenario sampleTypeScenario, appenderFn func(db *DB, ctx context.Context) storage.LimitedAppenderV1) { t.Parallel() - ctx := context.Background() opts := DefaultOptions() opts.OutOfOrderTimeWindow = 60 * time.Minute.Milliseconds() @@ -8321,10 +8336,14 @@ func testDiskFillingUpAfterDisablingOOO(t *testing.T, scenario sampleTypeScenari db := newTestDB(t, withOpts(opts)) db.DisableCompactions() - series1 := labels.FromStrings("foo", "bar1") - var allSamples []chunks.Sample + var ( + ctx = t.Context() + series1 = labels.FromStrings("foo", "bar1") + allSamples []chunks.Sample + ) + addSamples := func(fromMins, toMins int64) { - app := db.Appender(context.Background()) + app := appenderFn(db, ctx) for m := fromMins; m <= toMins; m++ { ts := m * time.Minute.Milliseconds() _, s, err := scenario.appendFunc(app, series1, ts, ts) @@ -8367,13 +8386,16 @@ func testDiskFillingUpAfterDisablingOOO(t *testing.T, scenario sampleTypeScenari } } - // Add in-order samples until ready for compaction.. + // Add in-order samples until ready for compaction. addSamples(301, 500) // Check that m-map files gets deleted properly after compactions. db.head.mmapHeadChunks() checkMmapFileContents([]string{"000001", "000002"}, nil) + + // NOTE: We are investigating flaky errors from this compaction on x383 architecture. + // See https://github.com/prometheus/prometheus/issues/17941#issuecomment-3846381263 require.NoError(t, db.Compact(ctx)) checkMmapFileContents([]string{"000002"}, []string{"000001"}) require.Nil(t, ms.ooo, "OOO mmap chunk was not compacted") diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 005d20b720..64ecfcc287 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -2181,8 +2181,8 @@ func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMap } chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64) if err != nil { - handleChunkWriteError(err) - return nil + // TODO(bwplotka): Propagate error correctly. + panic(err) } chunkRefs := make([]chunks.ChunkDiskMapperRef, 0, len(chks)) for _, memchunk := range chks { @@ -2190,7 +2190,7 @@ func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMap logger.Error("Too many OOO chunks, dropping data", "series", s.lset.String()) break } - chunkRef := chunkDiskMapper.WriteChunk(s.ref, memchunk.minTime, memchunk.maxTime, memchunk.chunk, true, handleChunkWriteError) + chunkRef := chunkDiskMapper.WriteChunk(s.ref, memchunk.minTime, memchunk.maxTime, memchunk.chunk, true) chunkRefs = append(chunkRefs, chunkRef) s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks, &mmappedChunk{ ref: chunkRef, @@ -2215,7 +2215,7 @@ func (s *memSeries) mmapChunks(chunkDiskMapper *chunks.ChunkDiskMapper) (count i // then we need to write chunks t0 to t3, but skip s.headChunks. for i := s.headChunks.len() - 1; i > 0; i-- { chk := s.headChunks.atOffset(i) - chunkRef := chunkDiskMapper.WriteChunk(s.ref, chk.minTime, chk.maxTime, chk.chunk, false, handleChunkWriteError) + chunkRef := chunkDiskMapper.WriteChunk(s.ref, chk.minTime, chk.maxTime, chk.chunk, false) s.mmappedChunks = append(s.mmappedChunks, &mmappedChunk{ ref: chunkRef, numSamples: uint16(chk.chunk.NumSamples()), @@ -2231,12 +2231,6 @@ func (s *memSeries) mmapChunks(chunkDiskMapper *chunks.ChunkDiskMapper) (count i return count } -func handleChunkWriteError(err error) { - if err != nil && !errors.Is(err, chunks.ErrChunkDiskMapperClosed) { - panic(err) - } -} - // Rollback removes the samples and exemplars from headAppender and writes any series to WAL. func (a *headAppenderBase) Rollback() (err error) { if a.closed { diff --git a/tsdb/head_append_v2_test.go b/tsdb/head_append_v2_test.go index 082d756e60..6b207b51de 100644 --- a/tsdb/head_append_v2_test.go +++ b/tsdb/head_append_v2_test.go @@ -3124,7 +3124,7 @@ func TestHead_Init_DiscardChunksWithUnsupportedEncoding(t *testing.T) { uc := newUnsupportedChunk() // Make this chunk not overlap with the previous and the next - h.chunkDiskMapper.WriteChunk(chunks.HeadSeriesRef(seriesRef), 500, 600, uc, false, func(err error) { require.NoError(t, err) }) + h.chunkDiskMapper.WriteChunk(chunks.HeadSeriesRef(seriesRef), 500, 600, uc, false) app = h.AppenderV2(ctx) for i := 700; i < 1200; i++ { diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 7b8ae0ecbd..4b870b2228 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -93,6 +93,13 @@ func newTestHeadWithOptions(t testing.TB, compressWAL compression.Type, opts *He _ = h.Close() }) + // TODO(bwplotka): At the moment normal async handling panics. This can get the tests stuck (recovery + further close + // with unlocked mutexes). Replace panics with something more robust, but for now handle those. + h.chunkDiskMapper.SetTestAsyncHandleWriteChunkErr(func(err error) { + t.Helper() + t.Error(err) + }) + require.NoError(t, h.chunkDiskMapper.IterateAllChunks(func(chunks.HeadSeriesRef, chunks.ChunkDiskMapperRef, int64, int64, uint16, chunkenc.Encoding, bool) error { return nil })) @@ -338,8 +345,7 @@ func BenchmarkLoadWLs(b *testing.B) { // Write mmapped chunks. if c.mmappedChunkT != 0 { - chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, mmappedChunksDir(dir), chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize) - require.NoError(b, err) + chunkDiskMapper := chunks.NewTestChunkDiskMapper(b, mmappedChunksDir(dir), chunks.DefaultWriteQueueSize) cOpts := chunkOpts{ chunkDiskMapper: chunkDiskMapper, chunkRange: c.mmappedChunkT, @@ -1467,11 +1473,8 @@ func TestHead_Truncate(t *testing.T) { func TestMemSeries_truncateChunks(t *testing.T) { dir := t.TempDir() // This is usually taken from the Head, but passing manually here. - chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize) - require.NoError(t, err) - defer func() { - require.NoError(t, chunkDiskMapper.Close()) - }() + chunkDiskMapper := chunks.NewTestChunkDiskMapper(t, dir, chunks.DefaultWriteQueueSize) + cOpts := chunkOpts{ chunkDiskMapper: chunkDiskMapper, chunkRange: 2000, @@ -1618,12 +1621,7 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - dir := t.TempDir() - chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize) - require.NoError(t, err) - defer func() { - require.NoError(t, chunkDiskMapper.Close()) - }() + chunkDiskMapper := chunks.NewTestChunkDiskMapper(t, t.TempDir(), chunks.DefaultWriteQueueSize) series := newMemSeries(labels.EmptyLabels(), 1, 0, true, false) @@ -2179,11 +2177,8 @@ func TestComputeChunkEndTime(t *testing.T) { func TestMemSeries_append(t *testing.T) { dir := t.TempDir() // This is usually taken from the Head, but passing manually here. - chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize) - require.NoError(t, err) - defer func() { - require.NoError(t, chunkDiskMapper.Close()) - }() + chunkDiskMapper := chunks.NewTestChunkDiskMapper(t, dir, chunks.DefaultWriteQueueSize) + cOpts := chunkOpts{ chunkDiskMapper: chunkDiskMapper, chunkRange: 500, @@ -2240,11 +2235,8 @@ func TestMemSeries_append(t *testing.T) { func TestMemSeries_appendHistogram(t *testing.T) { dir := t.TempDir() // This is usually taken from the Head, but passing manually here. - chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize) - require.NoError(t, err) - defer func() { - require.NoError(t, chunkDiskMapper.Close()) - }() + chunkDiskMapper := chunks.NewTestChunkDiskMapper(t, dir, chunks.DefaultWriteQueueSize) + cOpts := chunkOpts{ chunkDiskMapper: chunkDiskMapper, chunkRange: int64(1000), @@ -2302,11 +2294,8 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { const samplesPerChunk = 120 dir := t.TempDir() // This is usually taken from the Head, but passing manually here. - chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize) - require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, chunkDiskMapper.Close()) - }) + chunkDiskMapper := chunks.NewTestChunkDiskMapper(t, dir, chunks.DefaultWriteQueueSize) + cOpts := chunkOpts{ chunkDiskMapper: chunkDiskMapper, chunkRange: DefaultBlockDuration, @@ -3651,11 +3640,8 @@ func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) { func TestIteratorSeekIntoBuffer(t *testing.T) { dir := t.TempDir() // This is usually taken from the Head, but passing manually here. - chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize) - require.NoError(t, err) - defer func() { - require.NoError(t, chunkDiskMapper.Close()) - }() + chunkDiskMapper := chunks.NewTestChunkDiskMapper(t, dir, chunks.DefaultWriteQueueSize) + cOpts := chunkOpts{ chunkDiskMapper: chunkDiskMapper, chunkRange: 500, @@ -5680,7 +5666,7 @@ func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) { uc := newUnsupportedChunk() // Make this chunk not overlap with the previous and the next - h.chunkDiskMapper.WriteChunk(chunks.HeadSeriesRef(seriesRef), 500, 600, uc, false, func(err error) { require.NoError(t, err) }) + h.chunkDiskMapper.WriteChunk(chunks.HeadSeriesRef(seriesRef), 500, 600, uc, false) app = h.Appender(ctx) for i := 700; i < 1200; i++ { diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 5d2347c2d7..0698de7882 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -108,8 +108,8 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap if len(c.chunk.samples) > 0 { // Empty samples happens in tests, at least. chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(c.minTime, c.maxTime) if err != nil { - handleChunkWriteError(err) - return nil + // TODO(bwplotka): Propagate error correctly. + panic(err) } for _, chk := range chks { addChunk(chk.minTime, chk.maxTime, ref, chk.chunk)