diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 8843777b93..ab5246fa3c 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -17,6 +17,7 @@ import ( "context" "errors" "fmt" + "log/slog" "math" "math/rand" "os" @@ -29,6 +30,7 @@ import ( "time" "github.com/oklog/ulid/v2" + "github.com/prometheus/client_golang/prometheus" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/promslog" "github.com/stretchr/testify/require" @@ -1367,69 +1369,76 @@ func TestDisableAutoCompactions(t *testing.T) { // TestCancelCompactions ensures that when the db is closed // any running compaction is cancelled to unblock closing the db. func TestCancelCompactions(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + compactionStarted := make(chan struct{}) + compactionCanceled := make(chan struct{}) + + opts := DefaultOptions() + opts.NewCompactorFunc = func(ctx context.Context, _ prometheus.Registerer, _ *slog.Logger, _ []int64, _ chunkenc.Pool, _ *Options) (Compactor, error) { + return &mockCompactorFn{ + planFn: func() ([]string, error) { + return []string{"block-a", "block-b"}, nil + }, + compactFn: func() ([]ulid.ULID, error) { + close(compactionStarted) + <-ctx.Done() + close(compactionCanceled) + return nil, ctx.Err() + }, + // writeFn is unused: this test never reaches the head, OOO, + // or stale-series compaction paths that would call Write. + writeFn: func() ([]ulid.ULID, error) { + return nil, nil + }, + }, nil + } + db := newTestDB(t, withOpts(opts)) + + db.compactc <- struct{}{} + <-compactionStarted + + require.NoError(t, db.Close()) + <-compactionCanceled + + // Wrapped context.Canceled must not be counted as a real compaction + // failure (verifies errors.Is at every level of the chain). + require.Equal(t, 0.0, prom_testutil.ToFloat64(db.metrics.compactionsFailed)) + }) +} + +type blockPopulatorFunc func(context.Context, *CompactorMetrics, *slog.Logger, chunkenc.Pool, storage.VerticalChunkSeriesMergeFunc, []BlockReader, *BlockMeta, IndexWriter, ChunkWriter, IndexReaderPostingsFunc) error + +func (f blockPopulatorFunc) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger *slog.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter, postingsFunc IndexReaderPostingsFunc) error { + return f(ctx, metrics, logger, chunkPool, mergeFunc, blocks, meta, indexw, chunkw, postingsFunc) +} + +// TestCanceledCompactionDoesNotMarkBlocksFailed ensures that a compaction +// aborted via context cancellation does not mark its source blocks as +// Compaction.Failed. The context.Canceled error must be detected with +// errors.Is so wrapped values are still recognized at every level. +func TestCanceledCompactionDoesNotMarkBlocksFailed(t *testing.T) { t.Parallel() + tmpdir := t.TempDir() + blockDirs := []string{ + createBlock(t, tmpdir, genSeries(1, 1, 0, 100)), + createBlock(t, tmpdir, genSeries(1, 1, 100, 200)), + } - // Create some blocks to fall within the compaction range. - createBlock(t, tmpdir, genSeries(1, 10000, 0, 1000)) - createBlock(t, tmpdir, genSeries(1, 10000, 1000, 2000)) - createBlock(t, tmpdir, genSeries(1, 1, 2000, 2001)) // The most recent block is ignored so can be e small one. - - // Copy the db so we have an exact copy to compare compaction times. - tmpdirCopy := t.TempDir() - err := fileutil.CopyDirs(tmpdir, tmpdirCopy) + compactor, err := NewLeveledCompactor(t.Context(), nil, promslog.NewNopLogger(), []int64{200}, nil, nil) require.NoError(t, err) - // Measure the compaction time without interrupting it. - var timeCompactionUninterrupted time.Duration - { - db, err := open(tmpdir, promslog.NewNopLogger(), nil, DefaultOptions(), []int64{1, 2000}, nil) + _, err = compactor.CompactWithBlockPopulator(tmpdir, blockDirs, nil, blockPopulatorFunc( + func(context.Context, *CompactorMetrics, *slog.Logger, chunkenc.Pool, storage.VerticalChunkSeriesMergeFunc, []BlockReader, *BlockMeta, IndexWriter, ChunkWriter, IndexReaderPostingsFunc) error { + return context.Canceled + }, + )) + require.ErrorIs(t, err, context.Canceled) + + for _, dir := range blockDirs { + meta, _, err := readMetaFile(dir) require.NoError(t, err) - require.Len(t, db.Blocks(), 3, "initial block count mismatch") - require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran), "initial compaction counter mismatch") - db.compactc <- struct{}{} // Trigger a compaction. - for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.PopulatingBlocks) <= 0 { - time.Sleep(3 * time.Millisecond) - } - - start := time.Now() - for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran) != 1 { - time.Sleep(3 * time.Millisecond) - } - timeCompactionUninterrupted = time.Since(start) - - require.NoError(t, db.Close()) - } - // Measure the compaction time when closing the db in the middle of compaction. - { - db, err := open(tmpdirCopy, promslog.NewNopLogger(), nil, DefaultOptions(), []int64{1, 2000}, nil) - require.NoError(t, err) - require.Len(t, db.Blocks(), 3, "initial block count mismatch") - require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran), "initial compaction counter mismatch") - db.compactc <- struct{}{} // Trigger a compaction. - - for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.PopulatingBlocks) <= 0 { - time.Sleep(3 * time.Millisecond) - } - - start := time.Now() - require.NoError(t, db.Close()) - actT := time.Since(start) - - expT := timeCompactionUninterrupted / 2 // Closing the db in the middle of compaction should less than half the time. - require.Less(t, actT, expT, "closing the db took more than expected. exp: <%v, act: %v", expT, actT) - - // Make sure that no blocks were marked as compaction failed. - // This checks that the `context.Canceled` error is properly checked at all levels: - // - callers should check with errors.Is() instead of ==. - readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, "", promslog.NewNopLogger()) - require.NoError(t, err) - blocks, err := readOnlyDB.Blocks() - require.NoError(t, err) - for i, b := range blocks { - require.Falsef(t, b.Meta().Compaction.Failed, "block %d (%s) should not be marked as compaction failed", i, b.Meta().ULID) - } - require.NoError(t, readOnlyDB.Close()) + require.Falsef(t, meta.Compaction.Failed, "block %s should not be marked as compaction failed", meta.ULID) } }