diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 3159c6eff76..74b8380f1a4 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -1505,10 +1505,12 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate MutablePodResourcesForSuspendedJobs: { {Version: version.MustParse("1.35"), Default: false, PreRelease: featuregate.Alpha}, + {Version: version.MustParse("1.36"), Default: true, PreRelease: featuregate.Beta}, }, MutableSchedulingDirectivesForSuspendedJobs: { {Version: version.MustParse("1.35"), Default: false, PreRelease: featuregate.Alpha}, + {Version: version.MustParse("1.36"), Default: true, PreRelease: featuregate.Beta}, }, NFTablesProxyMode: { diff --git a/test/compatibility_lifecycle/reference/feature_list.md b/test/compatibility_lifecycle/reference/feature_list.md index 2bf77fb7850..f1cfc78dc2e 100644 --- a/test/compatibility_lifecycle/reference/feature_list.md +++ b/test/compatibility_lifecycle/reference/feature_list.md @@ -120,8 +120,8 @@ | MultiCIDRServiceAllocator | :ballot_box_with_check: 1.33+ | :closed_lock_with_key: 1.34+ | 1.27–1.30 | 1.31–1.32 | 1.33– | | | [code](https://cs.k8s.io/?q=%5CbMultiCIDRServiceAllocator%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbMultiCIDRServiceAllocator%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | MutableCSINodeAllocatableCount | :ballot_box_with_check: 1.35+ | | 1.33 | 1.34– | | | | [code](https://cs.k8s.io/?q=%5CbMutableCSINodeAllocatableCount%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbMutableCSINodeAllocatableCount%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | MutablePVNodeAffinity | | | 1.35– | | | | | [code](https://cs.k8s.io/?q=%5CbMutablePVNodeAffinity%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbMutablePVNodeAffinity%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | -| MutablePodResourcesForSuspendedJobs | | | 1.35– | | | | | [code](https://cs.k8s.io/?q=%5CbMutablePodResourcesForSuspendedJobs%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbMutablePodResourcesForSuspendedJobs%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | -| MutableSchedulingDirectivesForSuspendedJobs | | | 1.35– | | | | | [code](https://cs.k8s.io/?q=%5CbMutableSchedulingDirectivesForSuspendedJobs%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbMutableSchedulingDirectivesForSuspendedJobs%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | +| MutablePodResourcesForSuspendedJobs | :ballot_box_with_check: 1.36+ | | 1.35 | 1.36– | | | | [code](https://cs.k8s.io/?q=%5CbMutablePodResourcesForSuspendedJobs%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbMutablePodResourcesForSuspendedJobs%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | +| MutableSchedulingDirectivesForSuspendedJobs | :ballot_box_with_check: 1.36+ | | 1.35 | 1.36– | | | | [code](https://cs.k8s.io/?q=%5CbMutableSchedulingDirectivesForSuspendedJobs%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbMutableSchedulingDirectivesForSuspendedJobs%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | MutatingAdmissionPolicy | | | 1.32–1.33 | 1.34– | | | | [code](https://cs.k8s.io/?q=%5CbMutatingAdmissionPolicy%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbMutatingAdmissionPolicy%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | NFTablesProxyMode | :ballot_box_with_check: 1.31+ | :closed_lock_with_key: 1.33+ | 1.29–1.30 | 1.31–1.32 | 1.33– | | | [code](https://cs.k8s.io/?q=%5CbNFTablesProxyMode%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbNFTablesProxyMode%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | NodeDeclaredFeatures | | | 1.35– | | | | | [code](https://cs.k8s.io/?q=%5CbNodeDeclaredFeatures%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbNodeDeclaredFeatures%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | diff --git a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml index abffb3fe4d8..678d44c8ebf 100644 --- a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml +++ b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml @@ -1097,6 +1097,10 @@ lockToDefault: false preRelease: Alpha version: "1.35" + - default: true + lockToDefault: false + preRelease: Beta + version: "1.36" - name: MutablePVNodeAffinity versionedSpecs: - default: false @@ -1109,6 +1113,10 @@ lockToDefault: false preRelease: Alpha version: "1.35" + - default: true + lockToDefault: false + preRelease: Beta + version: "1.36" - name: MutatingAdmissionPolicy versionedSpecs: - default: false diff --git a/test/e2e/apps/job.go b/test/e2e/apps/job.go index f700e8bb5b3..76830110160 100644 --- a/test/e2e/apps/job.go +++ b/test/e2e/apps/job.go @@ -1377,6 +1377,203 @@ done`} gomega.Expect(job.Status.Ready).Should(gomega.Equal(ptr.To[int32](0))) gomega.Expect(job.Status.Terminating).Should(gomega.Equal(ptr.To[int32](0))) }) + + /* + Testname: Allow updating pod resources for suspended Jobs + Description: Create a suspended Job with initial container resources. + Update the container resources while the job is suspended. + Unsuspend the job and verify that pods are created with the updated resources. + This verifies KEP-5440: Mutable Job Pod Resource Updates. + */ + framework.It("should allow updating pod resources for a suspended job", framework.WithFeatureGate(features.MutablePodResourcesForSuspendedJobs), func(ctx context.Context) { + jobName := "e2e-mutable-resources" + utilrand.String(5) + + parallelism := int32(1) + completions := int32(1) + backoffLimit := int32(6) + + initialCPU := resource.MustParse("100m") + initialMemory := resource.MustParse("128Mi") + updatedCPU := resource.MustParse("200m") + updatedMemory := resource.MustParse("256Mi") + + ginkgo.By("Creating a suspended job with initial resources") + job := e2ejob.NewTestJob("succeed", jobName, v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit) + job.Spec.Suspend = ptr.To(true) + for i := range job.Spec.Template.Spec.Containers { + job.Spec.Template.Spec.Containers[i].Resources = v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: initialCPU, + v1.ResourceMemory: initialMemory, + }, + } + } + job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job) + framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) + + ginkgo.By("Verifying job is suspended and no pods are created") + err = e2ejob.WaitForJobSuspend(ctx, f.ClientSet, f.Namespace.Name, jobName) + framework.ExpectNoError(err, "failed to verify job is suspended") + pods, err := e2ejob.GetJobPods(ctx, f.ClientSet, f.Namespace.Name, jobName) + framework.ExpectNoError(err, "failed to get pods for job") + gomega.Expect(pods.Items).To(gomega.BeEmpty(), "expected no pods while job is suspended") + + ginkgo.By("Updating container resources while job is suspended") + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + job, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, jobName) + if err != nil { + return err + } + for i := range job.Spec.Template.Spec.Containers { + job.Spec.Template.Spec.Containers[i].Resources = v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: updatedCPU, + v1.ResourceMemory: updatedMemory, + }, + } + } + job, err = e2ejob.UpdateJob(ctx, f.ClientSet, f.Namespace.Name, job) + return err + }) + framework.ExpectNoError(err, "failed to update job resources") + + ginkgo.By("Unsuspending the job") + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + job, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, jobName) + if err != nil { + return err + } + job.Spec.Suspend = ptr.To(false) + job, err = e2ejob.UpdateJob(ctx, f.ClientSet, f.Namespace.Name, job) + return err + }) + framework.ExpectNoError(err, "failed to unsuspend job") + + ginkgo.By("Waiting for job to complete") + err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, jobName, batchv1.JobReasonCompletionsReached, completions) + framework.ExpectNoError(err, "failed to wait for job completion") + + ginkgo.By("Verifying pods were created with updated resources") + pods, err = e2ejob.GetJobPods(ctx, f.ClientSet, f.Namespace.Name, jobName) + framework.ExpectNoError(err, "failed to get pods for job") + gomega.Expect(pods.Items).NotTo(gomega.BeEmpty(), "expected at least one pod") + + for _, pod := range pods.Items { + for _, container := range pod.Spec.Containers { + cpuRequest := container.Resources.Requests[v1.ResourceCPU] + memoryRequest := container.Resources.Requests[v1.ResourceMemory] + gomega.Expect(cpuRequest.Equal(updatedCPU)).To(gomega.BeTrueBecause( + "expected CPU request %v, got %v", updatedCPU.String(), cpuRequest.String())) + gomega.Expect(memoryRequest.Equal(updatedMemory)).To(gomega.BeTrueBecause( + "expected memory request %v, got %v", updatedMemory.String(), memoryRequest.String())) + } + } + }) + + /* + Testname: Allow updating pod resources for a job that started and was suspended + Description: Create a job that starts running, suspend it, update the + container resources while suspended, then unsuspend and verify that newly + created pods have the updated resources. This verifies that resource updates + are allowed even for jobs that have previously started. + */ + framework.It("should allow updating pod resources for a job that started and then was suspended", framework.WithFeatureGate(features.MutablePodResourcesForSuspendedJobs), func(ctx context.Context) { + jobName := "e2e-start-suspend" + utilrand.String(5) + + parallelism := int32(2) + completions := int32(4) + backoffLimit := int32(6) + + initialCPU := resource.MustParse("100m") + initialMemory := resource.MustParse("128Mi") + updatedCPU := resource.MustParse("200m") + updatedMemory := resource.MustParse("256Mi") + + ginkgo.By("Creating a running job with initial resources") + job := e2ejob.NewTestJob("notTerminate", jobName, v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit) + job.Spec.Suspend = ptr.To(false) + for i := range job.Spec.Template.Spec.Containers { + job.Spec.Template.Spec.Containers[i].Resources = v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: initialCPU, + v1.ResourceMemory: initialMemory, + }, + } + } + job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job) + framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) + + ginkgo.By("Waiting for pods to be running") + err = e2ejob.WaitForJobPodsRunning(ctx, f.ClientSet, f.Namespace.Name, jobName, parallelism) + framework.ExpectNoError(err, "failed to wait for job pods to be running") + + ginkgo.By("Suspending the running job") + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + job, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, jobName) + if err != nil { + return err + } + job.Spec.Suspend = ptr.To(true) + job, err = e2ejob.UpdateJob(ctx, f.ClientSet, f.Namespace.Name, job) + return err + }) + framework.ExpectNoError(err, "failed to suspend job") + + ginkgo.By("Waiting for all pods to be deleted after suspension") + err = e2ejob.WaitForAllJobPodsGone(ctx, f.ClientSet, f.Namespace.Name, jobName) + framework.ExpectNoError(err, "failed to wait for pods to be deleted after suspension") + + ginkgo.By("Updating container resources while job is suspended") + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + job, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, jobName) + if err != nil { + return err + } + for i := range job.Spec.Template.Spec.Containers { + job.Spec.Template.Spec.Containers[i].Resources = v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: updatedCPU, + v1.ResourceMemory: updatedMemory, + }, + } + } + job, err = e2ejob.UpdateJob(ctx, f.ClientSet, f.Namespace.Name, job) + return err + }) + framework.ExpectNoError(err, "failed to update job resources") + + ginkgo.By("Unsuspending the job") + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + job, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, jobName) + if err != nil { + return err + } + job.Spec.Suspend = ptr.To(false) + job, err = e2ejob.UpdateJob(ctx, f.ClientSet, f.Namespace.Name, job) + return err + }) + framework.ExpectNoError(err, "failed to unsuspend job") + + ginkgo.By("Waiting for new pods to be running with updated resources") + err = e2ejob.WaitForJobPodsRunning(ctx, f.ClientSet, f.Namespace.Name, jobName, parallelism) + framework.ExpectNoError(err, "failed to wait for job pods to be running after unsuspending") + + ginkgo.By("Verifying newly created pods have updated resources") + pods, err := e2ejob.GetJobPods(ctx, f.ClientSet, f.Namespace.Name, jobName) + framework.ExpectNoError(err, "failed to get pods for job") + gomega.Expect(pods.Items).NotTo(gomega.BeEmpty(), "expected at least one pod") + + for _, pod := range pods.Items { + for _, container := range pod.Spec.Containers { + cpuRequest := container.Resources.Requests[v1.ResourceCPU] + memoryRequest := container.Resources.Requests[v1.ResourceMemory] + gomega.Expect(cpuRequest.Equal(updatedCPU)).To(gomega.BeTrueBecause( + "expected CPU request %v, got %v", updatedCPU.String(), cpuRequest.String())) + gomega.Expect(memoryRequest.Equal(updatedMemory)).To(gomega.BeTrueBecause( + "expected memory request %v, got %v", updatedMemory.String(), memoryRequest.String())) + } + } + }) }) func updateJobSuspendWithRetries(ctx context.Context, f *framework.Framework, job *batchv1.Job, suspend *bool) error { diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 191bf8946f4..b1e5e201c17 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -4228,6 +4228,7 @@ func TestSuspendJobControllerRestart(t *testing.T) { func TestNodeSelectorUpdate(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "suspend") + t.Cleanup(closeFn) ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) t.Cleanup(cancel) @@ -4243,7 +4244,14 @@ func TestNodeSelectorUpdate(t *testing.T) { jobNamespace := job.Namespace jobClient := clientSet.BatchV1().Jobs(jobNamespace) - // (1) Unsuspend and set node selector in the same update. + // Since MutableSchedulingDirectives is set to true, one needs + // to wait for the suspend condition to be set reflecting that the + // job is actually suspended. + waitForPodsToBeActive(ctx, t, jobClient, 0, job) + validateJobCondition(ctx, t, clientSet, job, batchv1.JobSuspended) + + // (1) set node selector in the same update. + nodeSelector := map[string]string{"foo": "bar"} if _, err := updateJob(ctx, jobClient, jobName, func(j *batchv1.Job) { j.Spec.Template.Spec.NodeSelector = nodeSelector