From 37629e3c21e68732609ae2d69810d6f08dd51dd2 Mon Sep 17 00:00:00 2001 From: Kevin Hannon Date: Mon, 25 May 2026 23:18:21 -0400 Subject: [PATCH] batch/job: Fix scheduling directives mutation for not-yet-started suspended Jobs When MutableSchedulingDirectivesForSuspendedJobs feature gate is enabled, it overwrites the notStarted check with a stricter condition requiring the JobSuspended=True condition. This rejects mutations on suspended Jobs that have never started but whose JobSuspended condition has not yet been set by the job controller, breaking external controllers like MultiKueue that inject scheduling directives immediately after creating a suspended Job. Preserve the notStarted path as an OR condition alongside the JobSuspended condition check, restoring pre-1.36 behavior for not-yet-started Jobs while maintaining the new relaxation for previously-started Jobs. Kubernetes-issue: https://github.com/kubernetes/kubernetes/issues/139281 --- pkg/registry/batch/job/strategy.go | 15 ++++- pkg/registry/batch/job/strategy_test.go | 52 ++++++++++++++++++ test/integration/job/job_test.go | 73 +++++++++++++++++++++++++ 3 files changed, 138 insertions(+), 2 deletions(-) diff --git a/pkg/registry/batch/job/strategy.go b/pkg/registry/batch/job/strategy.go index 6232155658e..8e24b52c4f4 100644 --- a/pkg/registry/batch/job/strategy.go +++ b/pkg/registry/batch/job/strategy.go @@ -128,6 +128,16 @@ func (jobStrategy) Validate(ctx context.Context, obj runtime.Object) field.Error return batchvalidation.ValidateJob(job, opts) } +// shouldAllowMutablePodTemplate returns true if the Job's pod template should +// be mutable. This is safe for suspended jobs with no active pods that either +// have never started or have the JobSuspended condition set. +func shouldAllowMutablePodTemplate(job *batch.Job) bool { + suspended := job.Spec.Suspend != nil && *job.Spec.Suspend + notStarted := job.Status.StartTime == nil + hasSuspendedCondition := batchvalidation.IsConditionTrue(job.Status.Conditions, batch.JobSuspended) + return suspended && (notStarted || hasSuspendedCondition) && job.Status.Active == 0 +} + func validationOptionsForJob(newJob, oldJob *batch.Job) batchvalidation.JobValidationOptions { var newPodTemplate, oldPodTemplate *core.PodTemplateSpec if newJob != nil { @@ -147,11 +157,12 @@ func validationOptionsForJob(newJob, oldJob *batch.Job) batchvalidation.JobValid suspended := oldJob.Spec.Suspend != nil && *oldJob.Spec.Suspend notStarted := oldJob.Status.StartTime == nil opts.AllowMutableSchedulingDirectives = suspended && notStarted + allowMutablePodTemplate := shouldAllowMutablePodTemplate(oldJob) if utilfeature.DefaultFeatureGate.Enabled(features.MutablePodResourcesForSuspendedJobs) { - opts.AllowMutablePodResources = suspended && batchvalidation.IsConditionTrue(oldJob.Status.Conditions, batch.JobSuspended) && oldJob.Status.Active == 0 + opts.AllowMutablePodResources = allowMutablePodTemplate } if utilfeature.DefaultFeatureGate.Enabled(features.MutableSchedulingDirectivesForSuspendedJobs) { - opts.AllowMutableSchedulingDirectives = suspended && batchvalidation.IsConditionTrue(oldJob.Status.Conditions, batch.JobSuspended) && oldJob.Status.Active == 0 + opts.AllowMutableSchedulingDirectives = allowMutablePodTemplate } // Validation should not fail jobs if they don't have the new labels. // This can be removed once we have high confidence that both labels exist (1.30 at least) diff --git a/pkg/registry/batch/job/strategy_test.go b/pkg/registry/batch/job/strategy_test.go index bd859e0703e..b9b1e29cabb 100644 --- a/pkg/registry/batch/job/strategy_test.go +++ b/pkg/registry/batch/job/strategy_test.go @@ -1094,6 +1094,32 @@ func TestJobStrategy_ValidateUpdate_MutablePodResources(t *testing.T) { {Type: field.ErrorTypeInvalid, Field: "spec.template.spec"}, }, }, + "feature gate enabled - pod resource update allowed for suspended Job without Suspended condition but never started": { + enableFeatureGate: true, + job: &batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "myjob", + Namespace: metav1.NamespaceDefault, + ResourceVersion: "0", + }, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validPodTemplateSpec, + ManualSelector: ptr.To(true), + Parallelism: ptr.To[int32](1), + Suspend: ptr.To(true), + }, + Status: batch.JobStatus{ + Active: 0, + StartTime: nil, + }, + }, + update: func(job *batch.Job) { + job.Spec.Template.Spec.Containers[0].Resources.Requests = api.ResourceList{ + api.ResourceCPU: resource.MustParse("200m"), + } + }, + }, "feature gate enabled - job resuming (suspend=false) but JobSuspended condition still true - resource updates rejected": { enableFeatureGate: true, job: &batch.Job{ @@ -3835,6 +3861,32 @@ func TestJobStrategy_ValidateUpdate_MutableSchedulingDirectives(t *testing.T) { {Type: field.ErrorTypeInvalid, Field: "spec.template"}, }, }, + "feature gate enabled - scheduling directives update allowed for suspended Job without Suspended condition but never started": { + enableFeatureGate: true, + job: &batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "myjob", + Namespace: metav1.NamespaceDefault, + ResourceVersion: "0", + }, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validPodTemplateSpec, + ManualSelector: ptr.To(true), + Parallelism: ptr.To[int32](1), + Suspend: ptr.To(true), + }, + Status: batch.JobStatus{ + Active: 0, + StartTime: nil, + }, + }, + update: func(job *batch.Job) { + job.Spec.Template.Spec.NodeSelector = map[string]string{ + "key": "value", + } + }, + }, } for name, tc := range cases { diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index c407e11c0ca..79292405ce5 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -4256,6 +4256,79 @@ func TestMutableSchedulingDirectivesForSuspendedJobs(t *testing.T) { } } +// TestMutableSchedulingDirectivesForSuspendedJobsNotYetStarted verifies that +// scheduling directives can be mutated on a suspended Job that has never started, +// even before the JobSuspended condition is set by the job controller. +func TestMutableSchedulingDirectivesForSuspendedJobsNotYetStarted(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.MutableSchedulingDirectivesForSuspendedJobs, true) + + closeFn, restConfig, clientSet, ns := setup(t, "mutable-scheduling-directives-not-started") + t.Cleanup(closeFn) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) + t.Cleanup(cancel) + + // Create a suspended Job. + job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](1), + Completions: ptr.To[int32](1), + Suspend: ptr.To(true), + }, + }) + if err != nil { + t.Fatalf("Failed to create Job: %v", err) + } + + // Immediately update the nodeSelector without waiting for the JobSuspended + // condition to be set. This simulates external controllers (e.g. MultiKueue) + // that inject scheduling directives right after creating a suspended Job. + nodeSelector := map[string]string{"foo": "bar"} + job.Spec.Template.Spec.NodeSelector = nodeSelector + _, err = clientSet.BatchV1().Jobs(ns.Name).Update(ctx, job, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update scheduling directives on suspended Job that was never started: %v", err) + } +} + +// TestMutablePodResourcesForSuspendedJobsNotYetStarted verifies that +// pod resources can be mutated on a suspended Job that has never started, +// even before the JobSuspended condition is set by the job controller. +func TestMutablePodResourcesForSuspendedJobsNotYetStarted(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.MutablePodResourcesForSuspendedJobs, true) + + closeFn, restConfig, clientSet, ns := setup(t, "mutable-pod-resources-not-started") + t.Cleanup(closeFn) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) + t.Cleanup(cancel) + + // Create a suspended Job. + job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](1), + Completions: ptr.To[int32](1), + Suspend: ptr.To(true), + }, + }) + if err != nil { + t.Fatalf("Failed to create Job: %v", err) + } + + // Immediately update the pod resources without waiting for the JobSuspended + // condition to be set. This simulates external controllers (e.g. MultiKueue) + // that inject resource requirements right after creating a suspended Job. + wantResources := v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("200m"), + v1.ResourceMemory: resource.MustParse("256Mi"), + }, + } + job.Spec.Template.Spec.Containers[0].Resources = wantResources + _, err = clientSet.BatchV1().Jobs(ns.Name).Update(ctx, job, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update pod resources on suspended Job that was never started: %v", err) + } +} + type podsByStatus struct { Active int Ready *int32