test: migrate TestDelayedCompaction to synctest to eliminate flakiness

The previous implementation relied on real wall-clock time and busy-loops
(time.Sleep + polling loops) to detect when compaction had finished, making
it both slow and flaky especially on busy CI envs and also  on Windows due to timer imprecision).

Now both the subtests run on windows.

The delay value can be increased (1s → 5s) at zero cost to test runtime

Also cleaned up shared logic into small helpers and split the no-delay and
delay-enabled cases into separate subtests for clarity.

Signed-off-by: machine424 <ayoubmrini424@gmail.com>
This commit is contained in:
machine424 2026-03-30 23:58:08 +02:00
parent dcfb8ce59c
commit 86215cf91f
No known key found for this signature in database
GPG key ID: A4B001A4FDEE017D

View file

@ -22,10 +22,10 @@ import (
"os"
"path"
"path/filepath"
"runtime"
"strconv"
"sync"
"testing"
"testing/synctest"
"time"
"github.com/oklog/ulid/v2"
@ -1978,162 +1978,127 @@ func TestCompactEmptyResultBlockWithTombstone(t *testing.T) {
}
func TestDelayedCompaction(t *testing.T) {
// The delay is chosen in such a way as to not slow down the tests, but also to make
// the effective compaction duration negligible compared to it, so that the duration comparisons make sense.
delay := 1000 * time.Millisecond
delay := 5 * time.Second
label := labels.FromStrings("foo", "bar")
waitUntilCompactedAndCheck := func(db *DB) {
t.Helper()
start := time.Now()
for db.head.compactable() {
// This simulates what happens at the end of commits, for less busy DB, a compaction
// is triggered every minute. This is to speed up the test.
select {
case db.compactc <- struct{}{}:
default:
}
time.Sleep(time.Millisecond)
appendSamples := func(t *testing.T, db *DB, timestamps ...int64) {
app := db.Appender(context.Background())
for _, ts := range timestamps {
_, err := app.Append(0, label, ts, 0)
require.NoError(t, err)
}
duration := time.Since(start)
// Only waited for one offset: offset<=delay<<<2*offset
require.Greater(t, duration, db.opts.CompactionDelay)
require.Less(t, duration, 2*db.opts.CompactionDelay)
require.NoError(t, app.Commit())
}
compactAndCheck := func(db *DB) {
t.Helper()
start := time.Now()
db.Compact(context.Background())
for db.head.compactable() {
time.Sleep(time.Millisecond)
compactorRanCount := func(db *DB) float64 {
return prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)
}
t.Run("delay not enabled", func(t *testing.T) {
t.Parallel()
db := newTestDB(t, withRngs(10))
compactAndCheck := func() {
start := time.Now()
db.Compact(context.Background())
require.False(t, db.head.compactable())
require.Less(t, time.Since(start), delay)
}
duration := time.Since(start)
require.Less(t, duration, delay)
}
cases := []struct {
name string
// The delays are chosen in such a way as to not slow down the tests, but also in a way to make the
// effective compaction duration negligible compared to them, so that the duration comparisons make sense.
compactionDelay time.Duration
}{
{
"delayed compaction not enabled",
0,
},
{
"delayed compaction enabled",
delay,
},
}
db.DisableCompactions()
appendSamples(t, db, 0, 11, 21)
compactAndCheck()
require.Equal(t, 1.0, compactorRanCount(db))
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("Time imprecision on windows makes the test flaky, see https://github.com/prometheus/prometheus/issues/16450")
}
t.Parallel()
db.DisableCompactions()
appendSamples(t, db, 31, 41)
compactAndCheck()
require.Equal(t, 3.0, compactorRanCount(db))
})
var opts *Options
if c.compactionDelay > 0 {
opts = &Options{CompactionDelay: c.compactionDelay}
}
db := newTestDB(t, withOpts(opts), withRngs(10))
t.Run("delay enabled", func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
db := newTestDB(t, withOpts(&Options{CompactionDelay: delay}), withRngs(10))
label := labels.FromStrings("foo", "bar")
// The first compaction is expected to result in 1 block.
db.DisableCompactions()
app := db.Appender(context.Background())
_, err := app.Append(0, label, 0, 0)
require.NoError(t, err)
_, err = app.Append(0, label, 11, 0)
require.NoError(t, err)
_, err = app.Append(0, label, 21, 0)
require.NoError(t, err)
require.NoError(t, app.Commit())
if c.compactionDelay == 0 {
// When delay is not enabled, compaction should run on the first trigger.
compactAndCheck(db)
} else {
db.EnableCompactions()
waitUntilCompactedAndCheck(db)
// The db.compactc signals have been processed multiple times since a compaction is triggered every 1ms by waitUntilCompacted.
// This implies that the compaction delay doesn't block or wait on the initial trigger.
// 3 is an arbitrary value because it's difficult to determine the precise value.
require.GreaterOrEqual(t, prom_testutil.ToFloat64(db.metrics.compactionsTriggered)-prom_testutil.ToFloat64(db.metrics.compactionsSkipped), 3.0)
// The delay doesn't change the head blocks alignment.
require.Eventually(t, func() bool {
return db.head.MinTime() == db.compactor.(*LeveledCompactor).ranges[0]+1
}, 500*time.Millisecond, 10*time.Millisecond)
// One compaction was run and one block was produced.
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran))
}
// The second compaction is expected to result in 2 blocks.
// This ensures that the logic for compaction delay doesn't only work for the first compaction, but also takes into account the future compactions.
// This also ensures that no delay happens between consecutive compactions.
db.DisableCompactions()
app = db.Appender(context.Background())
_, err = app.Append(0, label, 31, 0)
require.NoError(t, err)
_, err = app.Append(0, label, 41, 0)
require.NoError(t, err)
require.NoError(t, app.Commit())
if c.compactionDelay == 0 {
// Compaction should still run on the first trigger.
compactAndCheck(db)
} else {
db.EnableCompactions()
waitUntilCompactedAndCheck(db)
}
// Two other compactions were run.
require.Eventually(t, func() bool {
return prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran) == 3.0
}, 500*time.Millisecond, 10*time.Millisecond)
if c.compactionDelay == 0 {
return
}
// This test covers a special case. If auto compaction is in a delay period and a manual compaction is triggered,
// auto compaction should stop waiting for the delay if the head is no longer compactable.
// Of course, if the head is still compactable after the manual compaction, auto compaction will continue waiting for the same delay.
getTimeWhenCompactionDelayStarted := func() time.Time {
t.Helper()
getDelayStart := func() time.Time {
db.cmtx.Lock()
defer db.cmtx.Unlock()
return db.timeWhenCompactionDelayStarted
}
db.DisableCompactions()
app = db.Appender(context.Background())
_, err = app.Append(0, label, 51, 0)
require.NoError(t, err)
require.NoError(t, app.Commit())
// triggerAutoCompaction sends a compaction trigger and waits for it to be processed.
triggerAutoCompaction := func() {
db.compactc <- struct{}{}
synctest.Wait()
}
// First compaction: expect 1 block.
db.DisableCompactions()
appendSamples(t, db, 0, 11, 21)
db.EnableCompactions()
// First trigger starts the delay period; no compaction yet.
triggerAutoCompaction()
require.NotZero(t, getDelayStart())
require.Equal(t, 0.0, compactorRanCount(db))
// Compaction doesn't run before the delay expires.
time.Sleep(delay / 2)
triggerAutoCompaction()
require.Equal(t, 0.0, compactorRanCount(db))
// Compaction runs once the delay expires.
time.Sleep(delay / 2)
triggerAutoCompaction()
// One compaction was run and one block was produced.
require.Equal(t, 1.0, compactorRanCount(db))
// The compaction delay doesn't block or wait on the trigger;
// 3 triggers were processed above without blocking.
require.GreaterOrEqual(t, prom_testutil.ToFloat64(db.metrics.compactionsTriggered)-prom_testutil.ToFloat64(db.metrics.compactionsSkipped), 3.0)
// The delay doesn't change the head blocks alignment.
require.Equal(t, db.compactor.(*LeveledCompactor).ranges[0]+1, db.head.MinTime())
// Second compaction: expect 2 more blocks.
// This ensures that the logic for compaction delay doesn't only work for the first compaction, but also takes into account the future compactions.
// This also ensures that no delay happens between consecutive compactions.
db.DisableCompactions()
appendSamples(t, db, 31, 41)
db.EnableCompactions()
triggerAutoCompaction()
require.Equal(t, 1.0, compactorRanCount(db))
time.Sleep(delay / 2)
triggerAutoCompaction()
require.Equal(t, 1.0, compactorRanCount(db))
time.Sleep(delay / 2)
triggerAutoCompaction()
require.Equal(t, 3.0, compactorRanCount(db))
// This test covers a special case. If auto compaction is in a delay period and a manual compaction is triggered,
// auto compaction should stop waiting for the delay if the head is no longer compactable.
// Of course, if the head is still compactable after the manual compaction, auto compaction will continue waiting for the same delay.
db.DisableCompactions()
appendSamples(t, db, 51)
require.True(t, db.head.compactable())
db.EnableCompactions()
// Trigger an auto compaction.
db.compactc <- struct{}{}
triggerAutoCompaction()
// That made auto compaction start waiting for the delay.
require.Eventually(t, func() bool {
return !getTimeWhenCompactionDelayStarted().IsZero()
}, 100*time.Millisecond, 10*time.Millisecond)
require.NotZero(t, getDelayStart())
// Trigger a manual compaction.
require.NoError(t, db.CompactHead(NewRangeHead(db.Head(), 0, 50.0)))
require.Equal(t, 4.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran))
require.Equal(t, 4.0, compactorRanCount(db))
// Re-trigger an auto compaction.
db.compactc <- struct{}{}
triggerAutoCompaction()
// That made auto compaction stop waiting for the delay.
require.Eventually(t, func() bool {
return getTimeWhenCompactionDelayStarted().IsZero()
}, 100*time.Millisecond, 10*time.Millisecond)
require.Zero(t, getDelayStart())
})
}
})
}
// TestDelayedCompactionDoesNotBlockUnrelatedOps makes sure that when delayed compaction is enabled,