diff --git a/rules/manager_test.go b/rules/manager_test.go index 1b9f4be7d5..27930fc4c7 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -49,6 +49,7 @@ import ( "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/util/teststorage" prom_testutil "github.com/prometheus/prometheus/util/testutil" + "github.com/prometheus/prometheus/util/testutil/synctest" ) func TestMain(m *testing.M) { @@ -2010,306 +2011,306 @@ func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { func TestAsyncRuleEvaluation(t *testing.T) { t.Run("synchronous evaluation with independent rules", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) + ctx := t.Context() - ruleManager := NewManager(optsFactory(storage, &maxInflight, &inflightQueries, 0)) - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) + ruleManager := NewManager(optsFactory(storage, &maxInflight, &inflightQueries, 0)) + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) - expectedRuleCount := 6 - expectedSampleCount := 4 + expectedRuleCount := 6 + expectedSampleCount := 4 - for _, group := range groups { - require.Len(t, group.rules, expectedRuleCount) + for _, group := range groups { + require.Len(t, group.rules, expectedRuleCount) - start := time.Now() - DefaultEvalIterationFunc(ctx, group, start) + start := time.Now() + DefaultEvalIterationFunc(ctx, group, start) - // Expected evaluation order - order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) - require.Nil(t, order) + // Expected evaluation order + order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) + require.Nil(t, order) - // Never expect more than 1 inflight query at a time. - require.EqualValues(t, 1, maxInflight.Load()) - // Each rule should take at least 1 second to execute sequentially. - require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) - // Each recording rule produces one vector. - require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) - // Group duration is higher than the sum of rule durations (group overhead). - require.GreaterOrEqual(t, group.GetEvaluationTime(), group.GetRuleEvaluationTimeSum()) - } + // Never expect more than 1 inflight query at a time. + require.EqualValues(t, 1, maxInflight.Load()) + // Each rule should take at least 1 second to execute sequentially. + require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) + // Each recording rule produces one vector. + require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + // Group duration is higher than the sum of rule durations (group overhead). + require.GreaterOrEqual(t, group.GetEvaluationTime(), group.GetRuleEvaluationTimeSum()) + } + }) }) t.Run("asynchronous evaluation with independent and dependent rules", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) + ctx := t.Context() - expectedRuleCount := 6 - expectedSampleCount := 4 - opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + expectedRuleCount := 6 + expectedSampleCount := 4 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) - // Configure concurrency settings. - opts.ConcurrentEvalsEnabled = true - opts.MaxConcurrentEvals = 2 - opts.RuleConcurrencyController = nil - ruleManager := NewManager(opts) + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) - for _, group := range groups { - require.Len(t, group.rules, expectedRuleCount) + for _, group := range groups { + require.Len(t, group.rules, expectedRuleCount) - start := time.Now() - DefaultEvalIterationFunc(ctx, group, start) + start := time.Now() + DefaultEvalIterationFunc(ctx, group, start) - // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. - require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) - // Some rules should execute concurrently so should complete quicker. - require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) - // Each recording rule produces one vector. - require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) - } + // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. + require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) + // Each recording rule produces one vector. + require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + } + }) }) t.Run("asynchronous evaluation of all independent rules, insufficient concurrency", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) + ctx := t.Context() - expectedRuleCount := 8 - expectedSampleCount := expectedRuleCount - opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + expectedRuleCount := 8 + expectedSampleCount := expectedRuleCount + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) - // Configure concurrency settings. - opts.ConcurrentEvalsEnabled = true - opts.MaxConcurrentEvals = 2 - opts.RuleConcurrencyController = nil - ruleManager := NewManager(opts) + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_independent.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_independent.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) - for _, group := range groups { - require.Len(t, group.rules, expectedRuleCount) + for _, group := range groups { + require.Len(t, group.rules, expectedRuleCount) - start := time.Now() - DefaultEvalIterationFunc(ctx, group, start) + start := time.Now() + DefaultEvalIterationFunc(ctx, group, start) - // Expected evaluation order (isn't affected by concurrency settings) - order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) - require.Equal(t, []ConcurrentRules{ - {0, 1, 2, 3, 4, 5, 6, 7}, - }, order) + // Expected evaluation order (isn't affected by concurrency settings) + order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) + require.Equal(t, []ConcurrentRules{ + {0, 1, 2, 3, 4, 5, 6, 7}, + }, order) - // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. - require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) - // Some rules should execute concurrently so should complete quicker. - require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) - // Each recording rule produces one vector. - require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) - } + // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. + require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) + // Each recording rule produces one vector. + require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + } + }) }) t.Run("asynchronous evaluation of all independent rules, sufficient concurrency", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) + ctx := t.Context() - expectedRuleCount := 8 - expectedSampleCount := expectedRuleCount - opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + expectedRuleCount := 8 + expectedSampleCount := expectedRuleCount + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) - // Configure concurrency settings. - opts.ConcurrentEvalsEnabled = true - opts.MaxConcurrentEvals = int64(expectedRuleCount) * 2 - opts.RuleConcurrencyController = nil - ruleManager := NewManager(opts) + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = int64(expectedRuleCount) * 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_independent.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_independent.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) - for _, group := range groups { - require.Len(t, group.rules, expectedRuleCount) + for _, group := range groups { + require.Len(t, group.rules, expectedRuleCount) + + start := time.Now() + + DefaultEvalIterationFunc(ctx, group, start) + + // Expected evaluation order + order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) + require.Equal(t, []ConcurrentRules{ + {0, 1, 2, 3, 4, 5, 6, 7}, + }, order) + + // Max inflight can be up to MaxConcurrentEvals concurrent evals, since there is sufficient concurrency to run all rules at once. + require.LessOrEqual(t, int64(maxInflight.Load()), opts.MaxConcurrentEvals) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) + // Each recording rule produces one vector. + require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + // Group duration is less than the sum of rule durations + require.Less(t, group.GetEvaluationTime(), group.GetRuleEvaluationTimeSum()) + } + }) + }) + + t.Run("asynchronous evaluation of independent rules, with indeterminate. Should be synchronous", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) + + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} + + ctx := t.Context() + + ruleCount := 7 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = int64(ruleCount) * 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) + + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_indeterminates.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + + for _, group := range groups { + require.Len(t, group.rules, ruleCount) + + start := time.Now() + + group.Eval(ctx, start) + + // Never expect more than 1 inflight query at a time. + require.EqualValues(t, 1, maxInflight.Load()) + // Each rule should take at least 1 second to execute sequentially. + require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) + // Each rule produces one vector. + require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + } + }) + }) + + t.Run("asynchronous evaluation of rules that benefit from reordering", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) + + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} + + ctx := t.Context() + + ruleCount := 8 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = int64(ruleCount) * 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) + + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_dependents_on_base.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + var group *Group + for _, g := range groups { + group = g + } start := time.Now() - DefaultEvalIterationFunc(ctx, group, start) - // Expected evaluation order order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) require.Equal(t, []ConcurrentRules{ - {0, 1, 2, 3, 4, 5, 6, 7}, + {0, 4}, + {1, 2, 3, 5, 6, 7}, }, order) - // Max inflight can be up to MaxConcurrentEvals concurrent evals, since there is sufficient concurrency to run all rules at once. - require.LessOrEqual(t, int64(maxInflight.Load()), opts.MaxConcurrentEvals) - // Some rules should execute concurrently so should complete quicker. - require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) - // Each recording rule produces one vector. - require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) - // Group duration is less than the sum of rule durations - require.Less(t, group.GetEvaluationTime(), group.GetRuleEvaluationTimeSum()) - } - }) - - t.Run("asynchronous evaluation of independent rules, with indeterminate. Should be synchronous", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) - - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} - - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - ruleCount := 7 - opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) - - // Configure concurrency settings. - opts.ConcurrentEvalsEnabled = true - opts.MaxConcurrentEvals = int64(ruleCount) * 2 - opts.RuleConcurrencyController = nil - ruleManager := NewManager(opts) - - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_indeterminates.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) - - for _, group := range groups { - require.Len(t, group.rules, ruleCount) - - start := time.Now() - group.Eval(ctx, start) - // Never expect more than 1 inflight query at a time. - require.EqualValues(t, 1, maxInflight.Load()) - // Each rule should take at least 1 second to execute sequentially. - require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) + // Inflight queries should be equal to 6. This is the size of the second batch of rules that can be executed concurrently. + require.EqualValues(t, 6, maxInflight.Load()) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) // Each rule produces one vector. require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) - } - }) - - t.Run("asynchronous evaluation of rules that benefit from reordering", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) - - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} - - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - ruleCount := 8 - opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) - - // Configure concurrency settings. - opts.ConcurrentEvalsEnabled = true - opts.MaxConcurrentEvals = int64(ruleCount) * 2 - opts.RuleConcurrencyController = nil - ruleManager := NewManager(opts) - - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_multiple_dependents_on_base.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) - var group *Group - for _, g := range groups { - group = g - } - - start := time.Now() - - // Expected evaluation order - order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) - require.Equal(t, []ConcurrentRules{ - {0, 4}, - {1, 2, 3, 5, 6, 7}, - }, order) - - group.Eval(ctx, start) - - // Inflight queries should be equal to 6. This is the size of the second batch of rules that can be executed concurrently. - require.EqualValues(t, 6, maxInflight.Load()) - // Some rules should execute concurrently so should complete quicker. - require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) - // Each rule produces one vector. - require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + }) }) t.Run("attempted asynchronous evaluation of chained rules", func(t *testing.T) { - t.Parallel() - storage := teststorage.New(t) + synctest.Test(t, func(t *testing.T) { + storage := teststorage.New(t) - inflightQueries := atomic.Int32{} - maxInflight := atomic.Int32{} + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) + ctx := t.Context() - ruleCount := 7 - opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + ruleCount := 7 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) - // Configure concurrency settings. - opts.ConcurrentEvalsEnabled = true - opts.MaxConcurrentEvals = int64(ruleCount) * 2 - opts.RuleConcurrencyController = nil - ruleManager := NewManager(opts) + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = int64(ruleCount) * 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_chain.yaml"}...) - require.Empty(t, errs) - require.Len(t, groups, 1) - var group *Group - for _, g := range groups { - group = g - } + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, false, []string{"fixtures/rules_chain.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + var group *Group + for _, g := range groups { + group = g + } - start := time.Now() + start := time.Now() - // Expected evaluation order - order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) - require.Equal(t, []ConcurrentRules{ - {0, 1}, - {2}, - {3}, - {4, 5, 6}, - }, order) + // Expected evaluation order + order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) + require.Equal(t, []ConcurrentRules{ + {0, 1}, + {2}, + {3}, + {4, 5, 6}, + }, order) - group.Eval(ctx, start) + group.Eval(ctx, start) - require.EqualValues(t, 3, maxInflight.Load()) - // Some rules should execute concurrently so should complete quicker. - require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) - // Each rule produces one vector. - require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + require.EqualValues(t, 3, maxInflight.Load()) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) + // Each rule produces one vector. + require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + }) }) }