mirror of
https://github.com/prometheus/prometheus.git
synced 2026-06-09 00:22:19 -04:00
tsdb: Rewrite TestCancelCompactions to run faster (#18632)
* Rewrite TestCancelCompactions to run faster --------- Signed-off-by: Owen Williams <owen.williams@grafana.com> Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> Co-authored-by: Arve Knudsen <arve.knudsen@gmail.com>
This commit is contained in:
parent
0f5727f420
commit
5fe52643a0
1 changed files with 66 additions and 57 deletions
|
|
@ -17,6 +17,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"math"
|
||||
"math/rand"
|
||||
"os"
|
||||
|
|
@ -29,6 +30,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/oklog/ulid/v2"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
|
||||
"github.com/prometheus/common/promslog"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
|
@ -1367,69 +1369,76 @@ func TestDisableAutoCompactions(t *testing.T) {
|
|||
// TestCancelCompactions ensures that when the db is closed
|
||||
// any running compaction is cancelled to unblock closing the db.
|
||||
func TestCancelCompactions(t *testing.T) {
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
compactionStarted := make(chan struct{})
|
||||
compactionCanceled := make(chan struct{})
|
||||
|
||||
opts := DefaultOptions()
|
||||
opts.NewCompactorFunc = func(ctx context.Context, _ prometheus.Registerer, _ *slog.Logger, _ []int64, _ chunkenc.Pool, _ *Options) (Compactor, error) {
|
||||
return &mockCompactorFn{
|
||||
planFn: func() ([]string, error) {
|
||||
return []string{"block-a", "block-b"}, nil
|
||||
},
|
||||
compactFn: func() ([]ulid.ULID, error) {
|
||||
close(compactionStarted)
|
||||
<-ctx.Done()
|
||||
close(compactionCanceled)
|
||||
return nil, ctx.Err()
|
||||
},
|
||||
// writeFn is unused: this test never reaches the head, OOO,
|
||||
// or stale-series compaction paths that would call Write.
|
||||
writeFn: func() ([]ulid.ULID, error) {
|
||||
return nil, nil
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
db := newTestDB(t, withOpts(opts))
|
||||
|
||||
db.compactc <- struct{}{}
|
||||
<-compactionStarted
|
||||
|
||||
require.NoError(t, db.Close())
|
||||
<-compactionCanceled
|
||||
|
||||
// Wrapped context.Canceled must not be counted as a real compaction
|
||||
// failure (verifies errors.Is at every level of the chain).
|
||||
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.metrics.compactionsFailed))
|
||||
})
|
||||
}
|
||||
|
||||
type blockPopulatorFunc func(context.Context, *CompactorMetrics, *slog.Logger, chunkenc.Pool, storage.VerticalChunkSeriesMergeFunc, []BlockReader, *BlockMeta, IndexWriter, ChunkWriter, IndexReaderPostingsFunc) error
|
||||
|
||||
func (f blockPopulatorFunc) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger *slog.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter, postingsFunc IndexReaderPostingsFunc) error {
|
||||
return f(ctx, metrics, logger, chunkPool, mergeFunc, blocks, meta, indexw, chunkw, postingsFunc)
|
||||
}
|
||||
|
||||
// TestCanceledCompactionDoesNotMarkBlocksFailed ensures that a compaction
|
||||
// aborted via context cancellation does not mark its source blocks as
|
||||
// Compaction.Failed. The context.Canceled error must be detected with
|
||||
// errors.Is so wrapped values are still recognized at every level.
|
||||
func TestCanceledCompactionDoesNotMarkBlocksFailed(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tmpdir := t.TempDir()
|
||||
blockDirs := []string{
|
||||
createBlock(t, tmpdir, genSeries(1, 1, 0, 100)),
|
||||
createBlock(t, tmpdir, genSeries(1, 1, 100, 200)),
|
||||
}
|
||||
|
||||
// Create some blocks to fall within the compaction range.
|
||||
createBlock(t, tmpdir, genSeries(1, 10000, 0, 1000))
|
||||
createBlock(t, tmpdir, genSeries(1, 10000, 1000, 2000))
|
||||
createBlock(t, tmpdir, genSeries(1, 1, 2000, 2001)) // The most recent block is ignored so can be e small one.
|
||||
|
||||
// Copy the db so we have an exact copy to compare compaction times.
|
||||
tmpdirCopy := t.TempDir()
|
||||
err := fileutil.CopyDirs(tmpdir, tmpdirCopy)
|
||||
compactor, err := NewLeveledCompactor(t.Context(), nil, promslog.NewNopLogger(), []int64{200}, nil, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Measure the compaction time without interrupting it.
|
||||
var timeCompactionUninterrupted time.Duration
|
||||
{
|
||||
db, err := open(tmpdir, promslog.NewNopLogger(), nil, DefaultOptions(), []int64{1, 2000}, nil)
|
||||
_, err = compactor.CompactWithBlockPopulator(tmpdir, blockDirs, nil, blockPopulatorFunc(
|
||||
func(context.Context, *CompactorMetrics, *slog.Logger, chunkenc.Pool, storage.VerticalChunkSeriesMergeFunc, []BlockReader, *BlockMeta, IndexWriter, ChunkWriter, IndexReaderPostingsFunc) error {
|
||||
return context.Canceled
|
||||
},
|
||||
))
|
||||
require.ErrorIs(t, err, context.Canceled)
|
||||
|
||||
for _, dir := range blockDirs {
|
||||
meta, _, err := readMetaFile(dir)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, db.Blocks(), 3, "initial block count mismatch")
|
||||
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran), "initial compaction counter mismatch")
|
||||
db.compactc <- struct{}{} // Trigger a compaction.
|
||||
for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.PopulatingBlocks) <= 0 {
|
||||
time.Sleep(3 * time.Millisecond)
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran) != 1 {
|
||||
time.Sleep(3 * time.Millisecond)
|
||||
}
|
||||
timeCompactionUninterrupted = time.Since(start)
|
||||
|
||||
require.NoError(t, db.Close())
|
||||
}
|
||||
// Measure the compaction time when closing the db in the middle of compaction.
|
||||
{
|
||||
db, err := open(tmpdirCopy, promslog.NewNopLogger(), nil, DefaultOptions(), []int64{1, 2000}, nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, db.Blocks(), 3, "initial block count mismatch")
|
||||
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran), "initial compaction counter mismatch")
|
||||
db.compactc <- struct{}{} // Trigger a compaction.
|
||||
|
||||
for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.PopulatingBlocks) <= 0 {
|
||||
time.Sleep(3 * time.Millisecond)
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
require.NoError(t, db.Close())
|
||||
actT := time.Since(start)
|
||||
|
||||
expT := timeCompactionUninterrupted / 2 // Closing the db in the middle of compaction should less than half the time.
|
||||
require.Less(t, actT, expT, "closing the db took more than expected. exp: <%v, act: %v", expT, actT)
|
||||
|
||||
// Make sure that no blocks were marked as compaction failed.
|
||||
// This checks that the `context.Canceled` error is properly checked at all levels:
|
||||
// - callers should check with errors.Is() instead of ==.
|
||||
readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, "", promslog.NewNopLogger())
|
||||
require.NoError(t, err)
|
||||
blocks, err := readOnlyDB.Blocks()
|
||||
require.NoError(t, err)
|
||||
for i, b := range blocks {
|
||||
require.Falsef(t, b.Meta().Compaction.Failed, "block %d (%s) should not be marked as compaction failed", i, b.Meta().ULID)
|
||||
}
|
||||
require.NoError(t, readOnlyDB.Close())
|
||||
require.Falsef(t, meta.Compaction.Failed, "block %s should not be marked as compaction failed", meta.ULID)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue