Merge pull request #135104 from mimowo/mutable-job-directives

Allow mutable job scheduling directives on suspended Jobs
This commit is contained in:
Kubernetes Prow Robot 2025-11-05 21:57:11 -08:00 committed by GitHub
commit ca03752ee7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 431 additions and 0 deletions

View file

@ -1047,6 +1047,9 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
if isUpdated {
suspendCondChanged = true
jm.recorder.Event(&job, v1.EventTypeNormal, "Suspended", "Job suspended")
if feature.DefaultFeatureGate.Enabled(features.MutableSchedulingDirectivesForSuspendedJobs) {
job.Status.StartTime = nil
}
}
} else {
// Job not suspended.

View file

@ -602,6 +602,12 @@ const (
// Enables mutable pod resources for suspended Jobs, regardless of whether they have started before.
MutablePodResourcesForSuspendedJobs featuregate.Feature = "MutablePodResourcesForSuspendedJobs"
// owner: @mimowo
// kep: https://kep.k8s.io/5440
//
// Enables mutable scheduling directives for suspended Jobs, regardless of whether they have started before.
MutableSchedulingDirectivesForSuspendedJobs featuregate.Feature = "MutableSchedulingDirectivesForSuspendedJobs"
// owner: @danwinship
// kep: https://kep.k8s.io/3866
//
@ -1472,6 +1478,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.35"), Default: false, PreRelease: featuregate.Alpha},
},
MutableSchedulingDirectivesForSuspendedJobs: {
{Version: version.MustParse("1.35"), Default: false, PreRelease: featuregate.Alpha},
},
NFTablesProxyMode: {
{Version: version.MustParse("1.29"), Default: false, PreRelease: featuregate.Alpha},
{Version: version.MustParse("1.31"), Default: true, PreRelease: featuregate.Beta},
@ -2226,6 +2236,8 @@ var defaultKubernetesFeatureGateDependencies = map[featuregate.Feature][]feature
MutablePodResourcesForSuspendedJobs: {},
MutableSchedulingDirectivesForSuspendedJobs: {},
NFTablesProxyMode: {},
NodeInclusionPolicyInPodTopologySpread: {},

View file

@ -194,6 +194,9 @@ func validationOptionsForJob(newJob, oldJob *batch.Job) batchvalidation.JobValid
if utilfeature.DefaultFeatureGate.Enabled(features.MutablePodResourcesForSuspendedJobs) {
opts.AllowMutablePodResources = batchvalidation.IsConditionTrue(oldJob.Status.Conditions, batch.JobSuspended) && oldJob.Status.Active == 0
}
if utilfeature.DefaultFeatureGate.Enabled(features.MutableSchedulingDirectivesForSuspendedJobs) {
opts.AllowMutableSchedulingDirectives = suspended && batchvalidation.IsConditionTrue(oldJob.Status.Conditions, batch.JobSuspended) && oldJob.Status.Active == 0
}
// Validation should not fail jobs if they don't have the new labels.
// This can be removed once we have high confidence that both labels exist (1.30 at least)
_, hadJobName := oldJob.Spec.Template.Labels[batch.JobNameLabel]

View file

@ -3973,6 +3973,301 @@ func TestJobStrategy_GetAttrs(t *testing.T) {
}
}
func TestJobStrategy_ValidateUpdate_MutableSchedulingDirectives(t *testing.T) {
ctx := genericapirequest.NewDefaultContext()
now := metav1.Now()
validSelector := &metav1.LabelSelector{
MatchLabels: map[string]string{"a": "b"},
}
validPodTemplateSpec := api.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: validSelector.MatchLabels,
},
Spec: podtest.MakePodSpec(podtest.SetRestartPolicy(api.RestartPolicyOnFailure)),
}
cases := map[string]struct {
enableFeatureGate bool
job *batch.Job
update func(*batch.Job)
wantErrs field.ErrorList
}{
"feature gate disabled - scheduling directives update allowed for suspended job never started": {
enableFeatureGate: false,
job: &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "myjob",
Namespace: metav1.NamespaceDefault,
ResourceVersion: "0",
},
Spec: batch.JobSpec{
Selector: validSelector,
Template: validPodTemplateSpec,
ManualSelector: ptr.To(true),
Parallelism: ptr.To[int32](1),
Suspend: ptr.To(true),
},
Status: batch.JobStatus{
Active: 0,
StartTime: nil,
Conditions: []batch.JobCondition{
{
Type: batch.JobSuspended,
Status: api.ConditionStatus(metav1.ConditionTrue),
},
},
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.NodeSelector = map[string]string{
"key": "value",
}
},
},
"feature gate enabled - scheduling directives update allowed for suspended job never started": {
enableFeatureGate: true,
job: &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "myjob",
Namespace: metav1.NamespaceDefault,
ResourceVersion: "0",
},
Spec: batch.JobSpec{
Selector: validSelector,
Template: validPodTemplateSpec,
ManualSelector: ptr.To(true),
Parallelism: ptr.To[int32](1),
Suspend: ptr.To(true),
},
Status: batch.JobStatus{
Active: 0,
StartTime: nil,
Conditions: []batch.JobCondition{
{
Type: batch.JobSuspended,
Status: api.ConditionStatus(metav1.ConditionTrue),
},
},
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.NodeSelector = map[string]string{
"key": "value",
}
},
},
"feature gate disabled - scheduling directives update rejected for unsuspended Job": {
enableFeatureGate: false,
job: &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "myjob",
Namespace: metav1.NamespaceDefault,
ResourceVersion: "0",
},
Spec: batch.JobSpec{
Selector: validSelector,
Template: validPodTemplateSpec,
ManualSelector: ptr.To(true),
Parallelism: ptr.To[int32](1),
Suspend: ptr.To(true),
},
Status: batch.JobStatus{
Active: 1,
StartTime: &now,
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.NodeSelector = map[string]string{
"key": "value",
}
},
wantErrs: field.ErrorList{
{Type: field.ErrorTypeInvalid, Field: "spec.template"},
},
},
"feature gate enabled - scheduling directives update rejected for unsuspended Job": {
enableFeatureGate: true,
job: &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "myjob",
Namespace: metav1.NamespaceDefault,
ResourceVersion: "0",
},
Spec: batch.JobSpec{
Selector: validSelector,
Template: validPodTemplateSpec,
ManualSelector: ptr.To(true),
Parallelism: ptr.To[int32](1),
Suspend: ptr.To(false),
},
Status: batch.JobStatus{
Active: 1,
StartTime: &now,
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.NodeSelector = map[string]string{
"key": "value",
}
},
wantErrs: field.ErrorList{
{Type: field.ErrorTypeInvalid, Field: "spec.template"},
},
},
"feature gate disabled - scheduling directives update rejected for suspended Job which was running in the past": {
enableFeatureGate: false,
job: &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "myjob",
Namespace: metav1.NamespaceDefault,
ResourceVersion: "0",
},
Spec: batch.JobSpec{
Selector: validSelector,
Template: validPodTemplateSpec,
ManualSelector: ptr.To(true),
Parallelism: ptr.To[int32](1),
Suspend: ptr.To(true),
},
Status: batch.JobStatus{
StartTime: &now,
Active: 0,
Conditions: []batch.JobCondition{
{
Type: batch.JobSuspended,
Status: api.ConditionStatus(metav1.ConditionTrue),
},
},
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.NodeSelector = map[string]string{
"key": "value",
}
},
wantErrs: field.ErrorList{
{Type: field.ErrorTypeInvalid, Field: "spec.template"},
},
},
"feature gate enabled - scheduling directives update allowed for suspended Job which was running in the past": {
enableFeatureGate: true,
job: &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "myjob",
Namespace: metav1.NamespaceDefault,
ResourceVersion: "0",
},
Spec: batch.JobSpec{
Selector: validSelector,
Template: validPodTemplateSpec,
ManualSelector: ptr.To(true),
Parallelism: ptr.To[int32](1),
Suspend: ptr.To(true),
},
Status: batch.JobStatus{
StartTime: &now,
Active: 0,
Conditions: []batch.JobCondition{
{
Type: batch.JobSuspended,
Status: api.ConditionStatus(metav1.ConditionTrue),
},
},
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.NodeSelector = map[string]string{
"key": "value",
}
},
},
"feature gate enabled - scheduling directives update rejected for suspended Job still have active pods": {
enableFeatureGate: true,
job: &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "myjob",
Namespace: metav1.NamespaceDefault,
ResourceVersion: "0",
},
Spec: batch.JobSpec{
Selector: validSelector,
Template: validPodTemplateSpec,
ManualSelector: ptr.To(true),
Parallelism: ptr.To[int32](1),
Suspend: ptr.To(true),
},
Status: batch.JobStatus{
StartTime: &now,
Active: 1,
Conditions: []batch.JobCondition{
{
Type: batch.JobSuspended,
Status: api.ConditionStatus(metav1.ConditionTrue),
},
},
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.NodeSelector = map[string]string{
"key": "value",
}
},
wantErrs: field.ErrorList{
{Type: field.ErrorTypeInvalid, Field: "spec.template"},
},
},
"feature gate enabled - scheduling directives update rejected for suspended Job without Suspended condition": {
enableFeatureGate: true,
job: &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "myjob",
Namespace: metav1.NamespaceDefault,
ResourceVersion: "0",
},
Spec: batch.JobSpec{
Selector: validSelector,
Template: validPodTemplateSpec,
ManualSelector: ptr.To(true),
Parallelism: ptr.To[int32](1),
Suspend: ptr.To(true),
},
Status: batch.JobStatus{
StartTime: &now,
Active: 1,
Conditions: []batch.JobCondition{
{
Type: batch.JobSuspended,
Status: api.ConditionStatus(metav1.ConditionFalse),
},
},
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.NodeSelector = map[string]string{
"key": "value",
}
},
wantErrs: field.ErrorList{
{Type: field.ErrorTypeInvalid, Field: "spec.template"},
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, utilversion.MustParse("1.35"))
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MutableSchedulingDirectivesForSuspendedJobs, tc.enableFeatureGate)
newJob := tc.job.DeepCopy()
tc.update(newJob)
errs := Strategy.ValidateUpdate(ctx, newJob, tc.job)
if diff := cmp.Diff(tc.wantErrs, errs, ignoreErrValueDetail); diff != "" {
t.Errorf("Unexpected errors (-want,+got):\n%s", diff)
}
})
}
}
func TestJobToSelectiableFields(t *testing.T) {
apitesting.TestSelectableFieldLabelConversionsOfKind(t,
"batch/v1",

View file

@ -1071,6 +1071,12 @@
lockToDefault: false
preRelease: Alpha
version: "1.35"
- name: MutableSchedulingDirectivesForSuspendedJobs
versionedSpecs:
- default: false
lockToDefault: false
preRelease: Alpha
version: "1.35"
- name: MutatingAdmissionPolicy
versionedSpecs:
- default: false

View file

@ -4614,6 +4614,118 @@ func TestDelayedJobUpdateEvent(t *testing.T) {
}
// TestMutableSchedulingDirectivesForSuspendedJobs verifies that scheduling directives
// can be mutated when the feature is enabled for a suspended JOb.
func TestMutableSchedulingDirectivesForSuspendedJobs(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.MutableSchedulingDirectivesForSuspendedJobs, true)
closeFn, restConfig, clientSet, ns := setup(t, "mutable-scheduling-directives-for-suspended-jobs")
t.Cleanup(closeFn)
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
t.Cleanup(cancel)
job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](1),
Completions: ptr.To[int32](2),
Suspend: ptr.To(false),
},
})
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
// await for the Job to be running and the status.startTime set
validateJobsPodsStatusOnly(ctx, t, clientSet, job, podsByStatus{
Active: 1,
Ready: ptr.To[int32](0),
Terminating: ptr.To[int32](0),
})
job, err = clientSet.BatchV1().Jobs(ns.Name).Get(ctx, job.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get Job: %v", err)
}
if job.Status.StartTime == nil {
t.Fatalf("Job startTime was not set")
}
// verify an update to an unsuspended Job would fail
jobCopy := job.DeepCopy()
jobCopy.Spec.Template.Spec.NodeSelector = map[string]string{
"key": "value",
}
_, err = clientSet.BatchV1().Jobs(job.Namespace).Update(ctx, jobCopy, metav1.UpdateOptions{})
if err == nil {
t.Fatalf("update of scheduling directives succeeded for unsuspended job %s", klog.KObj(jobCopy))
}
// suspend the Job
job.Spec.Suspend = ptr.To(true)
job, err = clientSet.BatchV1().Jobs(ns.Name).Update(ctx, job, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to suspend Job: %v", err)
}
// await for the Job to have no active Pods and have the JobSuspended condition
validateJobsPodsStatusOnly(ctx, t, clientSet, job, podsByStatus{
Active: 0,
Ready: ptr.To[int32](0),
Terminating: ptr.To[int32](0),
})
job, err = clientSet.BatchV1().Jobs(ns.Name).Get(ctx, job.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get Job: %v", err)
}
if job.Status.StartTime != nil {
t.Fatalf("Job startTime was not cleaned")
}
if getJobConditionStatus(ctx, job, batchv1.JobSuspended) != v1.ConditionTrue {
t.Fatalf("JobSuspended condition was not set to True")
}
// since the Job is suspended the update is now accepted
job.Spec.Template.Spec.NodeSelector = map[string]string{
"key": "value",
}
job, err = clientSet.BatchV1().Jobs(job.Namespace).Update(ctx, job, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("update of scheduling directives failed for suspended job %s", klog.KObj(job))
}
// resume the Job again and the Pods get created
job.Spec.Suspend = ptr.To(false)
job, err = clientSet.BatchV1().Jobs(ns.Name).Update(ctx, job, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to resume Job: %v", err)
}
// await for the Job to be running again and assert on the status
validateJobsPodsStatusOnly(ctx, t, clientSet, job, podsByStatus{
Active: 1,
Ready: ptr.To[int32](0),
Terminating: ptr.To[int32](0),
})
job, err = clientSet.BatchV1().Jobs(ns.Name).Get(ctx, job.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get Job: %v", err)
}
if getJobConditionStatus(ctx, job, batchv1.JobSuspended) != v1.ConditionFalse {
t.Error("JobSuspended condition was not set to False")
}
if job.Status.StartTime == nil {
t.Error("Job startTime was not set after resume")
}
}
type podsByStatus struct {
Active int
Ready *int32