diff --git a/pkg/registry/batch/job/strategy.go b/pkg/registry/batch/job/strategy.go index 6232155658e..8e24b52c4f4 100644 --- a/pkg/registry/batch/job/strategy.go +++ b/pkg/registry/batch/job/strategy.go @@ -128,6 +128,16 @@ func (jobStrategy) Validate(ctx context.Context, obj runtime.Object) field.Error return batchvalidation.ValidateJob(job, opts) } +// shouldAllowMutablePodTemplate returns true if the Job's pod template should +// be mutable. This is safe for suspended jobs with no active pods that either +// have never started or have the JobSuspended condition set. +func shouldAllowMutablePodTemplate(job *batch.Job) bool { + suspended := job.Spec.Suspend != nil && *job.Spec.Suspend + notStarted := job.Status.StartTime == nil + hasSuspendedCondition := batchvalidation.IsConditionTrue(job.Status.Conditions, batch.JobSuspended) + return suspended && (notStarted || hasSuspendedCondition) && job.Status.Active == 0 +} + func validationOptionsForJob(newJob, oldJob *batch.Job) batchvalidation.JobValidationOptions { var newPodTemplate, oldPodTemplate *core.PodTemplateSpec if newJob != nil { @@ -147,11 +157,12 @@ func validationOptionsForJob(newJob, oldJob *batch.Job) batchvalidation.JobValid suspended := oldJob.Spec.Suspend != nil && *oldJob.Spec.Suspend notStarted := oldJob.Status.StartTime == nil opts.AllowMutableSchedulingDirectives = suspended && notStarted + allowMutablePodTemplate := shouldAllowMutablePodTemplate(oldJob) if utilfeature.DefaultFeatureGate.Enabled(features.MutablePodResourcesForSuspendedJobs) { - opts.AllowMutablePodResources = suspended && batchvalidation.IsConditionTrue(oldJob.Status.Conditions, batch.JobSuspended) && oldJob.Status.Active == 0 + opts.AllowMutablePodResources = allowMutablePodTemplate } if utilfeature.DefaultFeatureGate.Enabled(features.MutableSchedulingDirectivesForSuspendedJobs) { - opts.AllowMutableSchedulingDirectives = suspended && batchvalidation.IsConditionTrue(oldJob.Status.Conditions, batch.JobSuspended) && oldJob.Status.Active == 0 + opts.AllowMutableSchedulingDirectives = allowMutablePodTemplate } // Validation should not fail jobs if they don't have the new labels. // This can be removed once we have high confidence that both labels exist (1.30 at least) diff --git a/pkg/registry/batch/job/strategy_test.go b/pkg/registry/batch/job/strategy_test.go index bd859e0703e..b9b1e29cabb 100644 --- a/pkg/registry/batch/job/strategy_test.go +++ b/pkg/registry/batch/job/strategy_test.go @@ -1094,6 +1094,32 @@ func TestJobStrategy_ValidateUpdate_MutablePodResources(t *testing.T) { {Type: field.ErrorTypeInvalid, Field: "spec.template.spec"}, }, }, + "feature gate enabled - pod resource update allowed for suspended Job without Suspended condition but never started": { + enableFeatureGate: true, + job: &batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "myjob", + Namespace: metav1.NamespaceDefault, + ResourceVersion: "0", + }, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validPodTemplateSpec, + ManualSelector: ptr.To(true), + Parallelism: ptr.To[int32](1), + Suspend: ptr.To(true), + }, + Status: batch.JobStatus{ + Active: 0, + StartTime: nil, + }, + }, + update: func(job *batch.Job) { + job.Spec.Template.Spec.Containers[0].Resources.Requests = api.ResourceList{ + api.ResourceCPU: resource.MustParse("200m"), + } + }, + }, "feature gate enabled - job resuming (suspend=false) but JobSuspended condition still true - resource updates rejected": { enableFeatureGate: true, job: &batch.Job{ @@ -3835,6 +3861,32 @@ func TestJobStrategy_ValidateUpdate_MutableSchedulingDirectives(t *testing.T) { {Type: field.ErrorTypeInvalid, Field: "spec.template"}, }, }, + "feature gate enabled - scheduling directives update allowed for suspended Job without Suspended condition but never started": { + enableFeatureGate: true, + job: &batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "myjob", + Namespace: metav1.NamespaceDefault, + ResourceVersion: "0", + }, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validPodTemplateSpec, + ManualSelector: ptr.To(true), + Parallelism: ptr.To[int32](1), + Suspend: ptr.To(true), + }, + Status: batch.JobStatus{ + Active: 0, + StartTime: nil, + }, + }, + update: func(job *batch.Job) { + job.Spec.Template.Spec.NodeSelector = map[string]string{ + "key": "value", + } + }, + }, } for name, tc := range cases { diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index c407e11c0ca..79292405ce5 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -4256,6 +4256,79 @@ func TestMutableSchedulingDirectivesForSuspendedJobs(t *testing.T) { } } +// TestMutableSchedulingDirectivesForSuspendedJobsNotYetStarted verifies that +// scheduling directives can be mutated on a suspended Job that has never started, +// even before the JobSuspended condition is set by the job controller. +func TestMutableSchedulingDirectivesForSuspendedJobsNotYetStarted(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.MutableSchedulingDirectivesForSuspendedJobs, true) + + closeFn, restConfig, clientSet, ns := setup(t, "mutable-scheduling-directives-not-started") + t.Cleanup(closeFn) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) + t.Cleanup(cancel) + + // Create a suspended Job. + job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](1), + Completions: ptr.To[int32](1), + Suspend: ptr.To(true), + }, + }) + if err != nil { + t.Fatalf("Failed to create Job: %v", err) + } + + // Immediately update the nodeSelector without waiting for the JobSuspended + // condition to be set. This simulates external controllers (e.g. MultiKueue) + // that inject scheduling directives right after creating a suspended Job. + nodeSelector := map[string]string{"foo": "bar"} + job.Spec.Template.Spec.NodeSelector = nodeSelector + _, err = clientSet.BatchV1().Jobs(ns.Name).Update(ctx, job, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update scheduling directives on suspended Job that was never started: %v", err) + } +} + +// TestMutablePodResourcesForSuspendedJobsNotYetStarted verifies that +// pod resources can be mutated on a suspended Job that has never started, +// even before the JobSuspended condition is set by the job controller. +func TestMutablePodResourcesForSuspendedJobsNotYetStarted(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.MutablePodResourcesForSuspendedJobs, true) + + closeFn, restConfig, clientSet, ns := setup(t, "mutable-pod-resources-not-started") + t.Cleanup(closeFn) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) + t.Cleanup(cancel) + + // Create a suspended Job. + job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](1), + Completions: ptr.To[int32](1), + Suspend: ptr.To(true), + }, + }) + if err != nil { + t.Fatalf("Failed to create Job: %v", err) + } + + // Immediately update the pod resources without waiting for the JobSuspended + // condition to be set. This simulates external controllers (e.g. MultiKueue) + // that inject resource requirements right after creating a suspended Job. + wantResources := v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("200m"), + v1.ResourceMemory: resource.MustParse("256Mi"), + }, + } + job.Spec.Template.Spec.Containers[0].Resources = wantResources + _, err = clientSet.BatchV1().Jobs(ns.Name).Update(ctx, job, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update pod resources on suspended Job that was never started: %v", err) + } +} + type podsByStatus struct { Active int Ready *int32