From 3d2647dc6f5d1816fdc1f19b35d73dc5d0b1002e Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Mon, 16 Feb 2026 10:30:46 +0100 Subject: [PATCH] rules: fix flaky TestAsyncRuleEvaluation on Windows (#17965) Convert all timing-sensitive subtests of TestAsyncRuleEvaluation to use synctest for deterministic testing. This fixes flakiness on Windows caused by timer granularity and scheduling variance. The timing assertions are preserved using synctest's fake time, which allows accurate verification of sequential vs concurrent execution timing without relying on wall-clock time. Fixes #17961 Signed-off-by: Arve Knudsen --- rules/manager_test.go | 473 +++++++++++++++++++++--------------------- 1 file changed, 237 insertions(+), 236 deletions(-) diff --git a/rules/manager_test.go b/rules/manager_test.go index 1b9f4be7d5..27930fc4c7 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -49,6 +49,7 @@ import ( "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/util/teststorage" prom_testutil "github.com/prometheus/prometheus/util/testutil" + "github.com/prometheus/prometheus/util/testutil/synctest" ) func TestMain(m *testing.M) { @@ -2010,306 +2011,306 @@ func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { func TestAsyncRuleEvaluation(t *testing.T) { t.Run("synchronous evaluation with independent rules", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) + ctx := t.Context() - ruleManager := NewManager(optsFactory(storage, &maxInflight, &inflightQueries, 0)) - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) + ruleManager := NewManager(optsFactory(storage, &maxInflight, &inflightQueries, 0)) + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) - expectedRuleCount := 6 - expectedSampleCount := 4 + expectedRuleCount := 6 + expectedSampleCount := 4 - for _, group := range groups { - require.Len(t, group.rules, expectedRuleCount) + for _, group := range groups { + require.Len(t, group.rules, expectedRuleCount) - start := time.Now() - DefaultEvalIterationFunc(ctx, group, start) + start := time.Now() + DefaultEvalIterationFunc(ctx, group, start) - // Expected evaluation order - order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) - require.Nil(t, order) + // Expected evaluation order + order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) + require.Nil(t, order) - // Never expect more than 1 inflight query at a time. - require.EqualValues(t, 1, maxInflight.Load()) - // Each rule should take at least 1 second to execute sequentially. - require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) - // Each recording rule produces one vector. - require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) - // Group duration is higher than the sum of rule durations (group overhead). - require.GreaterOrEqual(t, group.GetEvaluationTime(), group.GetRuleEvaluationTimeSum()) - } + // Never expect more than 1 inflight query at a time. + require.EqualValues(t, 1, maxInflight.Load()) + // Each rule should take at least 1 second to execute sequentially. + require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) + // Each recording rule produces one vector. + require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + // Group duration is higher than the sum of rule durations (group overhead). + require.GreaterOrEqual(t, group.GetEvaluationTime(), group.GetRuleEvaluationTimeSum()) + } + }) }) t.Run("asynchronous evaluation with independent and dependent rules", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) + ctx := t.Context() - expectedRuleCount := 6 - expectedSampleCount := 4 - opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + expectedRuleCount := 6 + expectedSampleCount := 4 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) - // Configure concurrency settings. - opts.ConcurrentEvalsEnabled = true - opts.MaxConcurrentEvals = 2 - opts.RuleConcurrencyController = nil - ruleManager := NewManager(opts) + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) - for _, group := range groups { - require.Len(t, group.rules, expectedRuleCount) + for _, group := range groups { + require.Len(t, group.rules, expectedRuleCount) - start := time.Now() - DefaultEvalIterationFunc(ctx, group, start) + start := time.Now() + DefaultEvalIterationFunc(ctx, group, start) - // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. - require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) - // Some rules should execute concurrently so should complete quicker. - require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) - // Each recording rule produces one vector. - require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) - } + // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. + require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) + // Each recording rule produces one vector. + require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + } + }) }) t.Run("asynchronous evaluation of all independent rules, insufficient concurrency", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) + ctx := t.Context() - expectedRuleCount := 8 - expectedSampleCount := expectedRuleCount - opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + expectedRuleCount := 8 + expectedSampleCount := expectedRuleCount + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) - // Configure concurrency settings. - opts.ConcurrentEvalsEnabled = true - opts.MaxConcurrentEvals = 2 - opts.RuleConcurrencyController = nil - ruleManager := NewManager(opts) + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_independent.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_independent.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) - for _, group := range groups { - require.Len(t, group.rules, expectedRuleCount) + for _, group := range groups { + require.Len(t, group.rules, expectedRuleCount) - start := time.Now() - DefaultEvalIterationFunc(ctx, group, start) + start := time.Now() + DefaultEvalIterationFunc(ctx, group, start) - // Expected evaluation order (isn't affected by concurrency settings) - order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) - require.Equal(t, []ConcurrentRules{ - {0, 1, 2, 3, 4, 5, 6, 7}, - }, order) + // Expected evaluation order (isn't affected by concurrency settings) + order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) + require.Equal(t, []ConcurrentRules{ + {0, 1, 2, 3, 4, 5, 6, 7}, + }, order) - // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. - require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) - // Some rules should execute concurrently so should complete quicker. - require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) - // Each recording rule produces one vector. - require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) - } + // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. + require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) + // Each recording rule produces one vector. + require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + } + }) }) t.Run("asynchronous evaluation of all independent rules, sufficient concurrency", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) + ctx := t.Context() - expectedRuleCount := 8 - expectedSampleCount := expectedRuleCount - opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + expectedRuleCount := 8 + expectedSampleCount := expectedRuleCount + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) - // Configure concurrency settings. - opts.ConcurrentEvalsEnabled = true - opts.MaxConcurrentEvals = int64(expectedRuleCount) * 2 - opts.RuleConcurrencyController = nil - ruleManager := NewManager(opts) + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = int64(expectedRuleCount) * 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_independent.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_independent.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) - for _, group := range groups { - require.Len(t, group.rules, expectedRuleCount) + for _, group := range groups { + require.Len(t, group.rules, expectedRuleCount) + + start := time.Now() + + DefaultEvalIterationFunc(ctx, group, start) + + // Expected evaluation order + order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) + require.Equal(t, []ConcurrentRules{ + {0, 1, 2, 3, 4, 5, 6, 7}, + }, order) + + // Max inflight can be up to MaxConcurrentEvals concurrent evals, since there is sufficient concurrency to run all rules at once. + require.LessOrEqual(t, int64(maxInflight.Load()), opts.MaxConcurrentEvals) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) + // Each recording rule produces one vector. + require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + // Group duration is less than the sum of rule durations + require.Less(t, group.GetEvaluationTime(), group.GetRuleEvaluationTimeSum()) + } + }) + }) + + t.Run("asynchronous evaluation of independent rules, with indeterminate. Should be synchronous", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) + + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} + + ctx := t.Context() + + ruleCount := 7 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = int64(ruleCount) * 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) + + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_indeterminates.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + + for _, group := range groups { + require.Len(t, group.rules, ruleCount) + + start := time.Now() + + group.Eval(ctx, start) + + // Never expect more than 1 inflight query at a time. + require.EqualValues(t, 1, maxInflight.Load()) + // Each rule should take at least 1 second to execute sequentially. + require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) + // Each rule produces one vector. + require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + } + }) + }) + + t.Run("asynchronous evaluation of rules that benefit from reordering", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) + + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} + + ctx := t.Context() + + ruleCount := 8 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = int64(ruleCount) * 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) + + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_dependents_on_base.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + var group *Group + for _, g := range groups { + group = g + } start := time.Now() - DefaultEvalIterationFunc(ctx, group, start) - // Expected evaluation order order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) require.Equal(t, []ConcurrentRules{ - {0, 1, 2, 3, 4, 5, 6, 7}, + {0, 4}, + {1, 2, 3, 5, 6, 7}, }, order) - // Max inflight can be up to MaxConcurrentEvals concurrent evals, since there is sufficient concurrency to run all rules at once. - require.LessOrEqual(t, int64(maxInflight.Load()), opts.MaxConcurrentEvals) - // Some rules should execute concurrently so should complete quicker. - require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) - // Each recording rule produces one vector. - require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) - // Group duration is less than the sum of rule durations - require.Less(t, group.GetEvaluationTime(), group.GetRuleEvaluationTimeSum()) - } - }) - - t.Run("asynchronous evaluation of independent rules, with indeterminate. Should be synchronous", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) - - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} - - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - ruleCount := 7 - opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) - - // Configure concurrency settings. - opts.ConcurrentEvalsEnabled = true - opts.MaxConcurrentEvals = int64(ruleCount) * 2 - opts.RuleConcurrencyController = nil - ruleManager := NewManager(opts) - - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_indeterminates.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) - - for _, group := range groups { - require.Len(t, group.rules, ruleCount) - - start := time.Now() - group.Eval(ctx, start) - // Never expect more than 1 inflight query at a time. - require.EqualValues(t, 1, maxInflight.Load()) - // Each rule should take at least 1 second to execute sequentially. - require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) + // Inflight queries should be equal to 6. This is the size of the second batch of rules that can be executed concurrently. + require.EqualValues(t, 6, maxInflight.Load()) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) // Each rule produces one vector. require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) - } - }) - - t.Run("asynchronous evaluation of rules that benefit from reordering", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) - - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} - - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - ruleCount := 8 - opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) - - // Configure concurrency settings. - opts.ConcurrentEvalsEnabled = true - opts.MaxConcurrentEvals = int64(ruleCount) * 2 - opts.RuleConcurrencyController = nil - ruleManager := NewManager(opts) - - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_dependents_on_base.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) - var group *Group - for _, g := range groups { - group = g - } - - start := time.Now() - - // Expected evaluation order - order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) - require.Equal(t, []ConcurrentRules{ - {0, 4}, - {1, 2, 3, 5, 6, 7}, - }, order) - - group.Eval(ctx, start) - - // Inflight queries should be equal to 6. This is the size of the second batch of rules that can be executed concurrently. - require.EqualValues(t, 6, maxInflight.Load()) - // Some rules should execute concurrently so should complete quicker. - require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) - // Each rule produces one vector. - require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + }) }) t.Run("attempted asynchronous evaluation of chained rules", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) + ctx := t.Context() - ruleCount := 7 - opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + ruleCount := 7 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) - // Configure concurrency settings. - opts.ConcurrentEvalsEnabled = true - opts.MaxConcurrentEvals = int64(ruleCount) * 2 - opts.RuleConcurrencyController = nil - ruleManager := NewManager(opts) + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = int64(ruleCount) * 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_chain.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) - var group *Group - for _, g := range groups { - group = g - } + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_chain.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + var group *Group + for _, g := range groups { + group = g + } - start := time.Now() + start := time.Now() - // Expected evaluation order - order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) - require.Equal(t, []ConcurrentRules{ - {0, 1}, - {2}, - {3}, - {4, 5, 6}, - }, order) + // Expected evaluation order + order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) + require.Equal(t, []ConcurrentRules{ + {0, 1}, + {2}, + {3}, + {4, 5, 6}, + }, order) - group.Eval(ctx, start) + group.Eval(ctx, start) - require.EqualValues(t, 3, maxInflight.Load()) - // Some rules should execute concurrently so should complete quicker. - require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) - // Each rule produces one vector. - require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + require.EqualValues(t, 3, maxInflight.Load()) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) + // Each rule produces one vector. + require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + }) }) }