Merge pull request #139287 from kannon92/issue-139281

batch/job: Fix scheduling directives mutation for not-yet-started suspended Jobs
This commit is contained in:
Kubernetes Prow Robot 2026-05-27 20:57:07 +05:30 committed by GitHub
commit d96ef3cd6a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 138 additions and 2 deletions

View file

@ -128,6 +128,16 @@ func (jobStrategy) Validate(ctx context.Context, obj runtime.Object) field.Error
return batchvalidation.ValidateJob(job, opts)
}
// shouldAllowMutablePodTemplate returns true if the Job's pod template should
// be mutable. This is safe for suspended jobs with no active pods that either
// have never started or have the JobSuspended condition set.
func shouldAllowMutablePodTemplate(job *batch.Job) bool {
suspended := job.Spec.Suspend != nil && *job.Spec.Suspend
notStarted := job.Status.StartTime == nil
hasSuspendedCondition := batchvalidation.IsConditionTrue(job.Status.Conditions, batch.JobSuspended)
return suspended && (notStarted || hasSuspendedCondition) && job.Status.Active == 0
}
func validationOptionsForJob(newJob, oldJob *batch.Job) batchvalidation.JobValidationOptions {
var newPodTemplate, oldPodTemplate *core.PodTemplateSpec
if newJob != nil {
@ -147,11 +157,12 @@ func validationOptionsForJob(newJob, oldJob *batch.Job) batchvalidation.JobValid
suspended := oldJob.Spec.Suspend != nil && *oldJob.Spec.Suspend
notStarted := oldJob.Status.StartTime == nil
opts.AllowMutableSchedulingDirectives = suspended && notStarted
allowMutablePodTemplate := shouldAllowMutablePodTemplate(oldJob)
if utilfeature.DefaultFeatureGate.Enabled(features.MutablePodResourcesForSuspendedJobs) {
opts.AllowMutablePodResources = suspended && batchvalidation.IsConditionTrue(oldJob.Status.Conditions, batch.JobSuspended) && oldJob.Status.Active == 0
opts.AllowMutablePodResources = allowMutablePodTemplate
}
if utilfeature.DefaultFeatureGate.Enabled(features.MutableSchedulingDirectivesForSuspendedJobs) {
opts.AllowMutableSchedulingDirectives = suspended && batchvalidation.IsConditionTrue(oldJob.Status.Conditions, batch.JobSuspended) && oldJob.Status.Active == 0
opts.AllowMutableSchedulingDirectives = allowMutablePodTemplate
}
// Validation should not fail jobs if they don't have the new labels.
// This can be removed once we have high confidence that both labels exist (1.30 at least)

View file

@ -1094,6 +1094,32 @@ func TestJobStrategy_ValidateUpdate_MutablePodResources(t *testing.T) {
{Type: field.ErrorTypeInvalid, Field: "spec.template.spec"},
},
},
"feature gate enabled - pod resource update allowed for suspended Job without Suspended condition but never started": {
enableFeatureGate: true,
job: &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "myjob",
Namespace: metav1.NamespaceDefault,
ResourceVersion: "0",
},
Spec: batch.JobSpec{
Selector: validSelector,
Template: validPodTemplateSpec,
ManualSelector: ptr.To(true),
Parallelism: ptr.To[int32](1),
Suspend: ptr.To(true),
},
Status: batch.JobStatus{
Active: 0,
StartTime: nil,
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.Containers[0].Resources.Requests = api.ResourceList{
api.ResourceCPU: resource.MustParse("200m"),
}
},
},
"feature gate enabled - job resuming (suspend=false) but JobSuspended condition still true - resource updates rejected": {
enableFeatureGate: true,
job: &batch.Job{
@ -3835,6 +3861,32 @@ func TestJobStrategy_ValidateUpdate_MutableSchedulingDirectives(t *testing.T) {
{Type: field.ErrorTypeInvalid, Field: "spec.template"},
},
},
"feature gate enabled - scheduling directives update allowed for suspended Job without Suspended condition but never started": {
enableFeatureGate: true,
job: &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "myjob",
Namespace: metav1.NamespaceDefault,
ResourceVersion: "0",
},
Spec: batch.JobSpec{
Selector: validSelector,
Template: validPodTemplateSpec,
ManualSelector: ptr.To(true),
Parallelism: ptr.To[int32](1),
Suspend: ptr.To(true),
},
Status: batch.JobStatus{
Active: 0,
StartTime: nil,
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.NodeSelector = map[string]string{
"key": "value",
}
},
},
}
for name, tc := range cases {

View file

@ -4256,6 +4256,79 @@ func TestMutableSchedulingDirectivesForSuspendedJobs(t *testing.T) {
}
}
// TestMutableSchedulingDirectivesForSuspendedJobsNotYetStarted verifies that
// scheduling directives can be mutated on a suspended Job that has never started,
// even before the JobSuspended condition is set by the job controller.
func TestMutableSchedulingDirectivesForSuspendedJobsNotYetStarted(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.MutableSchedulingDirectivesForSuspendedJobs, true)
closeFn, restConfig, clientSet, ns := setup(t, "mutable-scheduling-directives-not-started")
t.Cleanup(closeFn)
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
t.Cleanup(cancel)
// Create a suspended Job.
job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](1),
Completions: ptr.To[int32](1),
Suspend: ptr.To(true),
},
})
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
// Immediately update the nodeSelector without waiting for the JobSuspended
// condition to be set. This simulates external controllers (e.g. MultiKueue)
// that inject scheduling directives right after creating a suspended Job.
nodeSelector := map[string]string{"foo": "bar"}
job.Spec.Template.Spec.NodeSelector = nodeSelector
_, err = clientSet.BatchV1().Jobs(ns.Name).Update(ctx, job, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update scheduling directives on suspended Job that was never started: %v", err)
}
}
// TestMutablePodResourcesForSuspendedJobsNotYetStarted verifies that
// pod resources can be mutated on a suspended Job that has never started,
// even before the JobSuspended condition is set by the job controller.
func TestMutablePodResourcesForSuspendedJobsNotYetStarted(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.MutablePodResourcesForSuspendedJobs, true)
closeFn, restConfig, clientSet, ns := setup(t, "mutable-pod-resources-not-started")
t.Cleanup(closeFn)
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
t.Cleanup(cancel)
// Create a suspended Job.
job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](1),
Completions: ptr.To[int32](1),
Suspend: ptr.To(true),
},
})
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
// Immediately update the pod resources without waiting for the JobSuspended
// condition to be set. This simulates external controllers (e.g. MultiKueue)
// that inject resource requirements right after creating a suspended Job.
wantResources := v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("200m"),
v1.ResourceMemory: resource.MustParse("256Mi"),
},
}
job.Spec.Template.Spec.Containers[0].Resources = wantResources
_, err = clientSet.BatchV1().Jobs(ns.Name).Update(ctx, job, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update pod resources on suspended Job that was never started: %v", err)
}
}
type podsByStatus struct {
Active int
Ready *int32