diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index cd908de599c..1ace380fc12 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -1047,6 +1047,9 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { if isUpdated { suspendCondChanged = true jm.recorder.Event(&job, v1.EventTypeNormal, "Suspended", "Job suspended") + if feature.DefaultFeatureGate.Enabled(features.MutableSchedulingDirectivesForSuspendedJobs) { + job.Status.StartTime = nil + } } } else { // Job not suspended. diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 5b31f00f460..ce26a4ec706 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -602,6 +602,12 @@ const ( // Enables mutable pod resources for suspended Jobs, regardless of whether they have started before. MutablePodResourcesForSuspendedJobs featuregate.Feature = "MutablePodResourcesForSuspendedJobs" + // owner: @mimowo + // kep: https://kep.k8s.io/5440 + // + // Enables mutable scheduling directives for suspended Jobs, regardless of whether they have started before. + MutableSchedulingDirectivesForSuspendedJobs featuregate.Feature = "MutableSchedulingDirectivesForSuspendedJobs" + // owner: @danwinship // kep: https://kep.k8s.io/3866 // @@ -1472,6 +1478,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.35"), Default: false, PreRelease: featuregate.Alpha}, }, + MutableSchedulingDirectivesForSuspendedJobs: { + {Version: version.MustParse("1.35"), Default: false, PreRelease: featuregate.Alpha}, + }, + NFTablesProxyMode: { {Version: version.MustParse("1.29"), Default: false, PreRelease: featuregate.Alpha}, {Version: version.MustParse("1.31"), Default: true, PreRelease: featuregate.Beta}, @@ -2226,6 +2236,8 @@ var defaultKubernetesFeatureGateDependencies = map[featuregate.Feature][]feature MutablePodResourcesForSuspendedJobs: {}, + MutableSchedulingDirectivesForSuspendedJobs: {}, + NFTablesProxyMode: {}, NodeInclusionPolicyInPodTopologySpread: {}, diff --git a/pkg/registry/batch/job/strategy.go b/pkg/registry/batch/job/strategy.go index fc6d9c5505d..d0b9f4f3d8a 100644 --- a/pkg/registry/batch/job/strategy.go +++ b/pkg/registry/batch/job/strategy.go @@ -194,6 +194,9 @@ func validationOptionsForJob(newJob, oldJob *batch.Job) batchvalidation.JobValid if utilfeature.DefaultFeatureGate.Enabled(features.MutablePodResourcesForSuspendedJobs) { opts.AllowMutablePodResources = batchvalidation.IsConditionTrue(oldJob.Status.Conditions, batch.JobSuspended) && oldJob.Status.Active == 0 } + if utilfeature.DefaultFeatureGate.Enabled(features.MutableSchedulingDirectivesForSuspendedJobs) { + opts.AllowMutableSchedulingDirectives = suspended && batchvalidation.IsConditionTrue(oldJob.Status.Conditions, batch.JobSuspended) && oldJob.Status.Active == 0 + } // Validation should not fail jobs if they don't have the new labels. // This can be removed once we have high confidence that both labels exist (1.30 at least) _, hadJobName := oldJob.Spec.Template.Labels[batch.JobNameLabel] diff --git a/pkg/registry/batch/job/strategy_test.go b/pkg/registry/batch/job/strategy_test.go index 4058d749eef..58da8cd5a57 100644 --- a/pkg/registry/batch/job/strategy_test.go +++ b/pkg/registry/batch/job/strategy_test.go @@ -3973,6 +3973,301 @@ func TestJobStrategy_GetAttrs(t *testing.T) { } } +func TestJobStrategy_ValidateUpdate_MutableSchedulingDirectives(t *testing.T) { + ctx := genericapirequest.NewDefaultContext() + now := metav1.Now() + validSelector := &metav1.LabelSelector{ + MatchLabels: map[string]string{"a": "b"}, + } + validPodTemplateSpec := api.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: validSelector.MatchLabels, + }, + Spec: podtest.MakePodSpec(podtest.SetRestartPolicy(api.RestartPolicyOnFailure)), + } + + cases := map[string]struct { + enableFeatureGate bool + job *batch.Job + update func(*batch.Job) + wantErrs field.ErrorList + }{ + "feature gate disabled - scheduling directives update allowed for suspended job never started": { + enableFeatureGate: false, + job: &batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "myjob", + Namespace: metav1.NamespaceDefault, + ResourceVersion: "0", + }, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validPodTemplateSpec, + ManualSelector: ptr.To(true), + Parallelism: ptr.To[int32](1), + Suspend: ptr.To(true), + }, + Status: batch.JobStatus{ + Active: 0, + StartTime: nil, + Conditions: []batch.JobCondition{ + { + Type: batch.JobSuspended, + Status: api.ConditionStatus(metav1.ConditionTrue), + }, + }, + }, + }, + update: func(job *batch.Job) { + job.Spec.Template.Spec.NodeSelector = map[string]string{ + "key": "value", + } + }, + }, + "feature gate enabled - scheduling directives update allowed for suspended job never started": { + enableFeatureGate: true, + job: &batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "myjob", + Namespace: metav1.NamespaceDefault, + ResourceVersion: "0", + }, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validPodTemplateSpec, + ManualSelector: ptr.To(true), + Parallelism: ptr.To[int32](1), + Suspend: ptr.To(true), + }, + Status: batch.JobStatus{ + Active: 0, + StartTime: nil, + Conditions: []batch.JobCondition{ + { + Type: batch.JobSuspended, + Status: api.ConditionStatus(metav1.ConditionTrue), + }, + }, + }, + }, + update: func(job *batch.Job) { + job.Spec.Template.Spec.NodeSelector = map[string]string{ + "key": "value", + } + }, + }, + "feature gate disabled - scheduling directives update rejected for unsuspended Job": { + enableFeatureGate: false, + job: &batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "myjob", + Namespace: metav1.NamespaceDefault, + ResourceVersion: "0", + }, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validPodTemplateSpec, + ManualSelector: ptr.To(true), + Parallelism: ptr.To[int32](1), + Suspend: ptr.To(true), + }, + Status: batch.JobStatus{ + Active: 1, + StartTime: &now, + }, + }, + update: func(job *batch.Job) { + job.Spec.Template.Spec.NodeSelector = map[string]string{ + "key": "value", + } + }, + wantErrs: field.ErrorList{ + {Type: field.ErrorTypeInvalid, Field: "spec.template"}, + }, + }, + "feature gate enabled - scheduling directives update rejected for unsuspended Job": { + enableFeatureGate: true, + job: &batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "myjob", + Namespace: metav1.NamespaceDefault, + ResourceVersion: "0", + }, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validPodTemplateSpec, + ManualSelector: ptr.To(true), + Parallelism: ptr.To[int32](1), + Suspend: ptr.To(false), + }, + Status: batch.JobStatus{ + Active: 1, + StartTime: &now, + }, + }, + update: func(job *batch.Job) { + job.Spec.Template.Spec.NodeSelector = map[string]string{ + "key": "value", + } + }, + wantErrs: field.ErrorList{ + {Type: field.ErrorTypeInvalid, Field: "spec.template"}, + }, + }, + "feature gate disabled - scheduling directives update rejected for suspended Job which was running in the past": { + enableFeatureGate: false, + job: &batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "myjob", + Namespace: metav1.NamespaceDefault, + ResourceVersion: "0", + }, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validPodTemplateSpec, + ManualSelector: ptr.To(true), + Parallelism: ptr.To[int32](1), + Suspend: ptr.To(true), + }, + Status: batch.JobStatus{ + StartTime: &now, + Active: 0, + Conditions: []batch.JobCondition{ + { + Type: batch.JobSuspended, + Status: api.ConditionStatus(metav1.ConditionTrue), + }, + }, + }, + }, + update: func(job *batch.Job) { + job.Spec.Template.Spec.NodeSelector = map[string]string{ + "key": "value", + } + }, + wantErrs: field.ErrorList{ + {Type: field.ErrorTypeInvalid, Field: "spec.template"}, + }, + }, + "feature gate enabled - scheduling directives update allowed for suspended Job which was running in the past": { + enableFeatureGate: true, + job: &batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "myjob", + Namespace: metav1.NamespaceDefault, + ResourceVersion: "0", + }, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validPodTemplateSpec, + ManualSelector: ptr.To(true), + Parallelism: ptr.To[int32](1), + Suspend: ptr.To(true), + }, + Status: batch.JobStatus{ + StartTime: &now, + Active: 0, + Conditions: []batch.JobCondition{ + { + Type: batch.JobSuspended, + Status: api.ConditionStatus(metav1.ConditionTrue), + }, + }, + }, + }, + update: func(job *batch.Job) { + job.Spec.Template.Spec.NodeSelector = map[string]string{ + "key": "value", + } + }, + }, + "feature gate enabled - scheduling directives update rejected for suspended Job still have active pods": { + enableFeatureGate: true, + job: &batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "myjob", + Namespace: metav1.NamespaceDefault, + ResourceVersion: "0", + }, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validPodTemplateSpec, + ManualSelector: ptr.To(true), + Parallelism: ptr.To[int32](1), + Suspend: ptr.To(true), + }, + Status: batch.JobStatus{ + StartTime: &now, + Active: 1, + Conditions: []batch.JobCondition{ + { + Type: batch.JobSuspended, + Status: api.ConditionStatus(metav1.ConditionTrue), + }, + }, + }, + }, + update: func(job *batch.Job) { + job.Spec.Template.Spec.NodeSelector = map[string]string{ + "key": "value", + } + }, + wantErrs: field.ErrorList{ + {Type: field.ErrorTypeInvalid, Field: "spec.template"}, + }, + }, + "feature gate enabled - scheduling directives update rejected for suspended Job without Suspended condition": { + enableFeatureGate: true, + job: &batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "myjob", + Namespace: metav1.NamespaceDefault, + ResourceVersion: "0", + }, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validPodTemplateSpec, + ManualSelector: ptr.To(true), + Parallelism: ptr.To[int32](1), + Suspend: ptr.To(true), + }, + Status: batch.JobStatus{ + StartTime: &now, + Active: 1, + Conditions: []batch.JobCondition{ + { + Type: batch.JobSuspended, + Status: api.ConditionStatus(metav1.ConditionFalse), + }, + }, + }, + }, + update: func(job *batch.Job) { + job.Spec.Template.Spec.NodeSelector = map[string]string{ + "key": "value", + } + }, + wantErrs: field.ErrorList{ + {Type: field.ErrorTypeInvalid, Field: "spec.template"}, + }, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, utilversion.MustParse("1.35")) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MutableSchedulingDirectivesForSuspendedJobs, tc.enableFeatureGate) + + newJob := tc.job.DeepCopy() + tc.update(newJob) + errs := Strategy.ValidateUpdate(ctx, newJob, tc.job) + if diff := cmp.Diff(tc.wantErrs, errs, ignoreErrValueDetail); diff != "" { + t.Errorf("Unexpected errors (-want,+got):\n%s", diff) + } + }) + } +} + func TestJobToSelectiableFields(t *testing.T) { apitesting.TestSelectableFieldLabelConversionsOfKind(t, "batch/v1", diff --git a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml index a532cec975f..59c9e9ec01d 100644 --- a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml +++ b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml @@ -1071,6 +1071,12 @@ lockToDefault: false preRelease: Alpha version: "1.35" +- name: MutableSchedulingDirectivesForSuspendedJobs + versionedSpecs: + - default: false + lockToDefault: false + preRelease: Alpha + version: "1.35" - name: MutatingAdmissionPolicy versionedSpecs: - default: false diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index c661c9068a3..798ab81a925 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -4614,6 +4614,118 @@ func TestDelayedJobUpdateEvent(t *testing.T) { } +// TestMutableSchedulingDirectivesForSuspendedJobs verifies that scheduling directives +// can be mutated when the feature is enabled for a suspended JOb. +func TestMutableSchedulingDirectivesForSuspendedJobs(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.MutableSchedulingDirectivesForSuspendedJobs, true) + + closeFn, restConfig, clientSet, ns := setup(t, "mutable-scheduling-directives-for-suspended-jobs") + t.Cleanup(closeFn) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) + t.Cleanup(cancel) + + job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](1), + Completions: ptr.To[int32](2), + Suspend: ptr.To(false), + }, + }) + if err != nil { + t.Fatalf("Failed to create Job: %v", err) + } + + // await for the Job to be running and the status.startTime set + validateJobsPodsStatusOnly(ctx, t, clientSet, job, podsByStatus{ + Active: 1, + Ready: ptr.To[int32](0), + Terminating: ptr.To[int32](0), + }) + + job, err = clientSet.BatchV1().Jobs(ns.Name).Get(ctx, job.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get Job: %v", err) + } + if job.Status.StartTime == nil { + t.Fatalf("Job startTime was not set") + } + + // verify an update to an unsuspended Job would fail + jobCopy := job.DeepCopy() + jobCopy.Spec.Template.Spec.NodeSelector = map[string]string{ + "key": "value", + } + + _, err = clientSet.BatchV1().Jobs(job.Namespace).Update(ctx, jobCopy, metav1.UpdateOptions{}) + if err == nil { + t.Fatalf("update of scheduling directives succeeded for unsuspended job %s", klog.KObj(jobCopy)) + } + + // suspend the Job + job.Spec.Suspend = ptr.To(true) + job, err = clientSet.BatchV1().Jobs(ns.Name).Update(ctx, job, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to suspend Job: %v", err) + } + + // await for the Job to have no active Pods and have the JobSuspended condition + validateJobsPodsStatusOnly(ctx, t, clientSet, job, podsByStatus{ + Active: 0, + Ready: ptr.To[int32](0), + Terminating: ptr.To[int32](0), + }) + + job, err = clientSet.BatchV1().Jobs(ns.Name).Get(ctx, job.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get Job: %v", err) + } + + if job.Status.StartTime != nil { + t.Fatalf("Job startTime was not cleaned") + } + + if getJobConditionStatus(ctx, job, batchv1.JobSuspended) != v1.ConditionTrue { + t.Fatalf("JobSuspended condition was not set to True") + } + + // since the Job is suspended the update is now accepted + job.Spec.Template.Spec.NodeSelector = map[string]string{ + "key": "value", + } + + job, err = clientSet.BatchV1().Jobs(job.Namespace).Update(ctx, job, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("update of scheduling directives failed for suspended job %s", klog.KObj(job)) + } + + // resume the Job again and the Pods get created + job.Spec.Suspend = ptr.To(false) + job, err = clientSet.BatchV1().Jobs(ns.Name).Update(ctx, job, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to resume Job: %v", err) + } + + // await for the Job to be running again and assert on the status + validateJobsPodsStatusOnly(ctx, t, clientSet, job, podsByStatus{ + Active: 1, + Ready: ptr.To[int32](0), + Terminating: ptr.To[int32](0), + }) + + job, err = clientSet.BatchV1().Jobs(ns.Name).Get(ctx, job.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get Job: %v", err) + } + + if getJobConditionStatus(ctx, job, batchv1.JobSuspended) != v1.ConditionFalse { + t.Error("JobSuspended condition was not set to False") + } + + if job.Status.StartTime == nil { + t.Error("Job startTime was not set after resume") + } +} + type podsByStatus struct { Active int Ready *int32