diff --git a/test/integration/scheduler_perf/scheduler_perf.go b/test/integration/scheduler_perf/scheduler_perf.go index 7645663f894..e1d37624742 100644 --- a/test/integration/scheduler_perf/scheduler_perf.go +++ b/test/integration/scheduler_perf/scheduler_perf.go @@ -624,7 +624,7 @@ func initTestOutput(tb testing.TB) io.Writer { var specialFilenameChars = regexp.MustCompile(`[^a-zA-Z0-9-_]`) -func setupTestCase(t testing.TB, tc *testCase, featureGates map[featuregate.Feature]bool, workload *Workload, opts *schedulerPerfOptions) (*scheduler.Scheduler, informers.SharedInformerFactory, ktesting.TContext) { +func setupTestCase(t testing.TB, tc *testCase, featureGates map[featuregate.Feature]bool, workload *Workload, opts *schedulerPerfOptions) (*scheduler.Scheduler, informers.SharedInformerFactory, <-chan struct{}, ktesting.TContext) { tCtx := ktesting.Init(t, initoption.PerTestOutput(UseTestingLog)) artifacts, doArtifacts := os.LookupEnv("ARTIFACTS") if !UseTestingLog && doArtifacts { @@ -820,7 +820,7 @@ func RunBenchmarkPerfScheduling(b *testing.B, configFile string, topicName strin fixJSONOutput(b) featureGates := featureGatesMerge(tc.FeatureGates, w.FeatureGates) - scheduler, informerFactory, tCtx := setupTestCase(b, tc, featureGates, w, opts) + scheduler, informerFactory, schedulerDone, tCtx := setupTestCase(b, tc, featureGates, w, opts) err := w.isValid(tc.MetricsCollectorConfig) if err != nil { @@ -870,6 +870,10 @@ func RunBenchmarkPerfScheduling(b *testing.B, configFile string, topicName strin } } + tCtx.Cancel("workload is done") + // Wait for the scheduler to stop to avoid data races when resetting metrics. + <-schedulerDone + // Reset metrics to prevent metrics generated in current workload gets // carried over to the next workload. legacyregistry.Reset() @@ -942,7 +946,7 @@ func RunIntegrationPerfScheduling(t *testing.T, configFile string, options ...Sc t.Skipf("disabled by label filter %q", TestSchedulingLabelFilter) } featureGates := featureGatesMerge(tc.FeatureGates, w.FeatureGates) - scheduler, informerFactory, tCtx := setupTestCase(t, tc, featureGates, w, opts) + scheduler, informerFactory, schedulerDone, tCtx := setupTestCase(t, tc, featureGates, w, opts) err := w.isValid(tc.MetricsCollectorConfig) if err != nil { t.Fatalf("workload %s is not valid: %v", w.Name, err) @@ -960,6 +964,10 @@ func RunIntegrationPerfScheduling(t *testing.T, configFile string, options ...Sc } } + tCtx.Cancel("workload is done") + // Wait for the scheduler to stop to avoid data races when resetting metrics. + <-schedulerDone + // Reset metrics to prevent metrics generated in current workload gets // carried over to the next workload. legacyregistry.Reset() @@ -1008,7 +1016,7 @@ func unrollWorkloadTemplate(tb ktesting.TB, wt []op, w *Workload) []op { return unrolled } -func setupClusterForWorkload(tCtx ktesting.TContext, configPath string, featureGates map[featuregate.Feature]bool, opts *schedulerPerfOptions) (*scheduler.Scheduler, informers.SharedInformerFactory, ktesting.TContext) { +func setupClusterForWorkload(tCtx ktesting.TContext, configPath string, featureGates map[featuregate.Feature]bool, opts *schedulerPerfOptions) (*scheduler.Scheduler, informers.SharedInformerFactory, <-chan struct{}, ktesting.TContext) { var cfg *config.KubeSchedulerConfiguration var err error if configPath != "" { diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index 2cb5ed6107c..2106c3f6948 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -93,7 +93,7 @@ func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) { // remove resources after finished. // Notes on rate limiter: // - client rate limit is set to 5000. -func mustSetupCluster(tCtx ktesting.TContext, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool, opts *schedulerPerfOptions) (*scheduler.Scheduler, informers.SharedInformerFactory, ktesting.TContext) { +func mustSetupCluster(tCtx ktesting.TContext, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool, opts *schedulerPerfOptions) (*scheduler.Scheduler, informers.SharedInformerFactory, <-chan struct{}, ktesting.TContext) { var runtimeConfig []string if enabledFeatures[features.DynamicResourceAllocation] { runtimeConfig = append(runtimeConfig, fmt.Sprintf("%s=true", resourceapi.SchemeGroupVersion)) @@ -143,11 +143,10 @@ func mustSetupCluster(tCtx ktesting.TContext, config *config.KubeSchedulerConfig // Not all config options will be effective but only those mostly related with scheduler performance will // be applied to start a scheduler, most of them are defined in `scheduler.schedulerOptions`. - scheduler, informerFactory := util.StartScheduler(tCtx, config, opts.outOfTreePluginRegistry) + scheduler, informerFactory, done := util.StartSchedulerWithDone(tCtx, config, opts.outOfTreePluginRegistry) util.StartFakePVController(tCtx, tCtx.Client(), informerFactory) runGC := util.CreateGCController(tCtx, tCtx, *cfg, informerFactory) runNS := util.CreateNamespaceController(tCtx, tCtx, *cfg, informerFactory) - runResourceClaimController := func() {} if enabledFeatures[features.DynamicResourceAllocation] { // Testing of DRA with inline resource claims depends on this @@ -166,7 +165,7 @@ func mustSetupCluster(tCtx ktesting.TContext, config *config.KubeSchedulerConfig go runNS() go runResourceClaimController() - return scheduler, informerFactory, tCtx + return scheduler, informerFactory, done, tCtx } func isAttempted(pod *v1.Pod) bool { diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 6cb34cada36..87591319bf8 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -82,10 +82,19 @@ import ( // ShutdownFunc represents the function handle to be called, typically in a defer handler, to shutdown a running module type ShutdownFunc func() -// StartScheduler configures and starts a scheduler given a handle to the clientSet interface -// and event broadcaster. It returns the running scheduler and podInformer. Background goroutines -// will keep running until the context is canceled. +// StartScheduler is a wrapper around StartSchedulerWithDone for backward compatibility. func StartScheduler(tCtx ktesting.TContext, cfg *kubeschedulerconfig.KubeSchedulerConfiguration, outOfTreePluginRegistry frameworkruntime.Registry) (*scheduler.Scheduler, informers.SharedInformerFactory) { + sched, informerFactory, _ := StartSchedulerWithDone(tCtx, cfg, outOfTreePluginRegistry) + return sched, informerFactory +} + +// StartSchedulerWithDone configures and starts a scheduler. Background goroutines +// will keep running until the context is canceled. It returns the running scheduler, +// the informer factory, and a channel that is closed when the scheduler goroutine +// actually exits. Callers can use this channel to ensure the scheduler has fully +// stopped before performing operations that might race with it (e.g., resetting +// global metrics). +func StartSchedulerWithDone(tCtx ktesting.TContext, cfg *kubeschedulerconfig.KubeSchedulerConfiguration, outOfTreePluginRegistry frameworkruntime.Registry) (*scheduler.Scheduler, informers.SharedInformerFactory, <-chan struct{}) { clientSet := tCtx.Client() informerFactory := scheduler.NewInformerFactory(clientSet, 0) evtBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ @@ -121,9 +130,13 @@ func StartScheduler(tCtx ktesting.TContext, cfg *kubeschedulerconfig.KubeSchedul err = sched.WaitForHandlersSync(tCtx) tCtx.ExpectNoError(err, "waiting for handlers to sync") logger.V(3).Info("Handlers synced") - go sched.Run(tCtx) + done := make(chan struct{}) + go func() { + sched.Run(tCtx) + close(done) + }() - return sched, informerFactory + return sched, informerFactory, done } // CreateResourceClaimController creates a ResourceClaim controller and returns a blocking run function.