test/integration: synchronize scheduler shutdown to fix metrics data race

Wait for the scheduler goroutine to exit before resetting global metrics.
Previously, metrics updates from a still-running scheduler could race
with legacyregistry.Reset() at the end of a performance test workload.

This change refactors StartScheduler into StartSchedulerWithDone to
provide a synchronization channel that is closed when the scheduler
actually stops, ensuring a clean teardown before registry cleanup.
This commit is contained in:
Matt Matejczyk 2026-04-09 15:29:39 +00:00
parent bc15d50fd2
commit 34b740db6b
3 changed files with 33 additions and 13 deletions

View file

@ -624,7 +624,7 @@ func initTestOutput(tb testing.TB) io.Writer {
var specialFilenameChars = regexp.MustCompile(`[^a-zA-Z0-9-_]`)
func setupTestCase(t testing.TB, tc *testCase, featureGates map[featuregate.Feature]bool, workload *Workload, opts *schedulerPerfOptions) (*scheduler.Scheduler, informers.SharedInformerFactory, ktesting.TContext) {
func setupTestCase(t testing.TB, tc *testCase, featureGates map[featuregate.Feature]bool, workload *Workload, opts *schedulerPerfOptions) (*scheduler.Scheduler, informers.SharedInformerFactory, <-chan struct{}, ktesting.TContext) {
tCtx := ktesting.Init(t, initoption.PerTestOutput(UseTestingLog))
artifacts, doArtifacts := os.LookupEnv("ARTIFACTS")
if !UseTestingLog && doArtifacts {
@ -820,7 +820,7 @@ func RunBenchmarkPerfScheduling(b *testing.B, configFile string, topicName strin
fixJSONOutput(b)
featureGates := featureGatesMerge(tc.FeatureGates, w.FeatureGates)
scheduler, informerFactory, tCtx := setupTestCase(b, tc, featureGates, w, opts)
scheduler, informerFactory, schedulerDone, tCtx := setupTestCase(b, tc, featureGates, w, opts)
err := w.isValid(tc.MetricsCollectorConfig)
if err != nil {
@ -870,6 +870,10 @@ func RunBenchmarkPerfScheduling(b *testing.B, configFile string, topicName strin
}
}
tCtx.Cancel("workload is done")
// Wait for the scheduler to stop to avoid data races when resetting metrics.
<-schedulerDone
// Reset metrics to prevent metrics generated in current workload gets
// carried over to the next workload.
legacyregistry.Reset()
@ -942,7 +946,7 @@ func RunIntegrationPerfScheduling(t *testing.T, configFile string, options ...Sc
t.Skipf("disabled by label filter %q", TestSchedulingLabelFilter)
}
featureGates := featureGatesMerge(tc.FeatureGates, w.FeatureGates)
scheduler, informerFactory, tCtx := setupTestCase(t, tc, featureGates, w, opts)
scheduler, informerFactory, schedulerDone, tCtx := setupTestCase(t, tc, featureGates, w, opts)
err := w.isValid(tc.MetricsCollectorConfig)
if err != nil {
t.Fatalf("workload %s is not valid: %v", w.Name, err)
@ -960,6 +964,10 @@ func RunIntegrationPerfScheduling(t *testing.T, configFile string, options ...Sc
}
}
tCtx.Cancel("workload is done")
// Wait for the scheduler to stop to avoid data races when resetting metrics.
<-schedulerDone
// Reset metrics to prevent metrics generated in current workload gets
// carried over to the next workload.
legacyregistry.Reset()
@ -1008,7 +1016,7 @@ func unrollWorkloadTemplate(tb ktesting.TB, wt []op, w *Workload) []op {
return unrolled
}
func setupClusterForWorkload(tCtx ktesting.TContext, configPath string, featureGates map[featuregate.Feature]bool, opts *schedulerPerfOptions) (*scheduler.Scheduler, informers.SharedInformerFactory, ktesting.TContext) {
func setupClusterForWorkload(tCtx ktesting.TContext, configPath string, featureGates map[featuregate.Feature]bool, opts *schedulerPerfOptions) (*scheduler.Scheduler, informers.SharedInformerFactory, <-chan struct{}, ktesting.TContext) {
var cfg *config.KubeSchedulerConfiguration
var err error
if configPath != "" {

View file

@ -93,7 +93,7 @@ func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) {
// remove resources after finished.
// Notes on rate limiter:
// - client rate limit is set to 5000.
func mustSetupCluster(tCtx ktesting.TContext, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool, opts *schedulerPerfOptions) (*scheduler.Scheduler, informers.SharedInformerFactory, ktesting.TContext) {
func mustSetupCluster(tCtx ktesting.TContext, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool, opts *schedulerPerfOptions) (*scheduler.Scheduler, informers.SharedInformerFactory, <-chan struct{}, ktesting.TContext) {
var runtimeConfig []string
if enabledFeatures[features.DynamicResourceAllocation] {
runtimeConfig = append(runtimeConfig, fmt.Sprintf("%s=true", resourceapi.SchemeGroupVersion))
@ -143,11 +143,10 @@ func mustSetupCluster(tCtx ktesting.TContext, config *config.KubeSchedulerConfig
// Not all config options will be effective but only those mostly related with scheduler performance will
// be applied to start a scheduler, most of them are defined in `scheduler.schedulerOptions`.
scheduler, informerFactory := util.StartScheduler(tCtx, config, opts.outOfTreePluginRegistry)
scheduler, informerFactory, done := util.StartSchedulerWithDone(tCtx, config, opts.outOfTreePluginRegistry)
util.StartFakePVController(tCtx, tCtx.Client(), informerFactory)
runGC := util.CreateGCController(tCtx, tCtx, *cfg, informerFactory)
runNS := util.CreateNamespaceController(tCtx, tCtx, *cfg, informerFactory)
runResourceClaimController := func() {}
if enabledFeatures[features.DynamicResourceAllocation] {
// Testing of DRA with inline resource claims depends on this
@ -166,7 +165,7 @@ func mustSetupCluster(tCtx ktesting.TContext, config *config.KubeSchedulerConfig
go runNS()
go runResourceClaimController()
return scheduler, informerFactory, tCtx
return scheduler, informerFactory, done, tCtx
}
func isAttempted(pod *v1.Pod) bool {

View file

@ -82,10 +82,19 @@ import (
// ShutdownFunc represents the function handle to be called, typically in a defer handler, to shutdown a running module
type ShutdownFunc func()
// StartScheduler configures and starts a scheduler given a handle to the clientSet interface
// and event broadcaster. It returns the running scheduler and podInformer. Background goroutines
// will keep running until the context is canceled.
// StartScheduler is a wrapper around StartSchedulerWithDone for backward compatibility.
func StartScheduler(tCtx ktesting.TContext, cfg *kubeschedulerconfig.KubeSchedulerConfiguration, outOfTreePluginRegistry frameworkruntime.Registry) (*scheduler.Scheduler, informers.SharedInformerFactory) {
sched, informerFactory, _ := StartSchedulerWithDone(tCtx, cfg, outOfTreePluginRegistry)
return sched, informerFactory
}
// StartSchedulerWithDone configures and starts a scheduler. Background goroutines
// will keep running until the context is canceled. It returns the running scheduler,
// the informer factory, and a channel that is closed when the scheduler goroutine
// actually exits. Callers can use this channel to ensure the scheduler has fully
// stopped before performing operations that might race with it (e.g., resetting
// global metrics).
func StartSchedulerWithDone(tCtx ktesting.TContext, cfg *kubeschedulerconfig.KubeSchedulerConfiguration, outOfTreePluginRegistry frameworkruntime.Registry) (*scheduler.Scheduler, informers.SharedInformerFactory, <-chan struct{}) {
clientSet := tCtx.Client()
informerFactory := scheduler.NewInformerFactory(clientSet, 0)
evtBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
@ -121,9 +130,13 @@ func StartScheduler(tCtx ktesting.TContext, cfg *kubeschedulerconfig.KubeSchedul
err = sched.WaitForHandlersSync(tCtx)
tCtx.ExpectNoError(err, "waiting for handlers to sync")
logger.V(3).Info("Handlers synced")
go sched.Run(tCtx)
done := make(chan struct{})
go func() {
sched.Run(tCtx)
close(done)
}()
return sched, informerFactory
return sched, informerFactory, done
}
// CreateResourceClaimController creates a ResourceClaim controller and returns a blocking run function.