fix: allow job startTime updates on resume from suspended state

Signed-off-by: Dejan Zele Pejchev <pejcev.dejan@gmail.com>
This commit is contained in:
Dejan Zele Pejchev 2025-10-22 08:26:54 +02:00
parent 7d353c5249
commit bbd44717c1
No known key found for this signature in database
GPG key ID: 8A900F09C964845E
4 changed files with 121 additions and 1 deletions

View file

@ -914,6 +914,15 @@ func IsConditionTrue(list []batch.JobCondition, cType batch.JobConditionType) bo
return false
}
func IsConditionFalse(list []batch.JobCondition, cType batch.JobConditionType) bool {
for _, c := range list {
if c.Type == cType && c.Status == api.ConditionFalse {
return true
}
}
return false
}
func validateFailedIndexesNotOverlapCompleted(completedIndexesStr string, failedIndexesStr string, completions int32) error {
if len(completedIndexesStr) == 0 || len(failedIndexesStr) == 0 {
return nil

View file

@ -380,6 +380,11 @@ func getStatusValidationOptions(newJob, oldJob *batch.Job) batchvalidation.JobSt
isReadyChanged := !ptr.Equal(oldJob.Status.Ready, newJob.Status.Ready)
isTerminatingChanged := !ptr.Equal(oldJob.Status.Terminating, newJob.Status.Terminating)
isSuspendedWithZeroCompletions := ptr.Equal(newJob.Spec.Suspend, ptr.To(true)) && ptr.Equal(newJob.Spec.Completions, ptr.To[int32](0))
// Detect job resume via condition changes (JobSuspended: True -> False)
// This handles the case where the controller updates status after the user has already
// changed spec.suspend=false, which is the scenario from https://github.com/kubernetes/kubernetes/issues/134521
isJobResuming := batchvalidation.IsConditionTrue(oldJob.Status.Conditions, batch.JobSuspended) &&
batchvalidation.IsConditionFalse(newJob.Status.Conditions, batch.JobSuspended)
return batchvalidation.JobStatusValidationOptions{
// We allow to decrease the counter for succeeded pods for jobs which
@ -397,7 +402,7 @@ func getStatusValidationOptions(newJob, oldJob *batch.Job) batchvalidation.JobSt
RejectFinishedJobWithActivePods: isJobFinishedChanged || isActiveChanged,
RejectFinishedJobWithoutStartTime: (isJobFinishedChanged || isStartTimeChanged) && !isSuspendedWithZeroCompletions,
RejectFinishedJobWithUncountedTerminatedPods: isJobFinishedChanged || isUncountedTerminatedPodsChanged,
RejectStartTimeUpdateForUnsuspendedJob: isStartTimeChanged,
RejectStartTimeUpdateForUnsuspendedJob: isStartTimeChanged && !isJobResuming,
RejectCompletionTimeBeforeStartTime: isStartTimeChanged || isCompletionTimeChanged,
RejectMutatingCompletionTime: true,
RejectNotCompleteJobWithCompletionTime: isJobCompleteChanged || isCompletionTimeChanged,

View file

@ -2563,6 +2563,33 @@ func TestStatusStrategy_ValidateUpdate(t *testing.T) {
{Type: field.ErrorTypeRequired, Field: "status.startTime"},
},
},
"verify startTime can be updated when resuming job (JobSuspended: True -> False)": {
enableJobManagedBy: true,
job: &batch.Job{
ObjectMeta: validObjectMeta,
Status: batch.JobStatus{
StartTime: &now,
Conditions: []batch.JobCondition{
{
Type: batch.JobSuspended,
Status: api.ConditionTrue,
},
},
},
},
newJob: &batch.Job{
ObjectMeta: validObjectMeta,
Status: batch.JobStatus{
StartTime: &nowPlusMinute,
Conditions: []batch.JobCondition{
{
Type: batch.JobSuspended,
Status: api.ConditionFalse,
},
},
},
},
},
"invalid attempt to set completionTime before startTime": {
enableJobManagedBy: true,
job: &batch.Job{

View file

@ -4095,6 +4095,85 @@ func TestSuspendJob(t *testing.T) {
}
}
// TestStartTimeUpdateOnResume verifies that the job controller can update startTime
// when resuming a suspended job (https://github.com/kubernetes/kubernetes/issues/134521).
func TestStartTimeUpdateOnResume(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, true)
closeFn, restConfig, clientSet, ns := setup(t, "suspend-starttime-validation")
t.Cleanup(closeFn)
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
t.Cleanup(cancel)
job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](1),
Completions: ptr.To[int32](2),
Suspend: ptr.To(false),
},
})
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
validateJobsPodsStatusOnly(ctx, t, clientSet, job, podsByStatus{
Active: 1,
Ready: ptr.To[int32](0),
Terminating: ptr.To[int32](0),
})
job, err = clientSet.BatchV1().Jobs(ns.Name).Get(ctx, job.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get Job: %v", err)
}
if job.Status.StartTime == nil {
t.Fatalf("Job startTime was not set")
}
job.Spec.Suspend = ptr.To(true)
job, err = clientSet.BatchV1().Jobs(ns.Name).Update(ctx, job, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to suspend Job: %v", err)
}
validateJobsPodsStatusOnly(ctx, t, clientSet, job, podsByStatus{
Active: 0,
Ready: ptr.To[int32](0),
Terminating: ptr.To[int32](0),
})
job, err = clientSet.BatchV1().Jobs(ns.Name).Get(ctx, job.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get Job: %v", err)
}
if getJobConditionStatus(ctx, job, batchv1.JobSuspended) != v1.ConditionTrue {
t.Fatalf("JobSuspended condition was not set to True")
}
job.Spec.Suspend = ptr.To(false)
job, err = clientSet.BatchV1().Jobs(ns.Name).Update(ctx, job, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to resume Job: %v", err)
}
validateJobsPodsStatusOnly(ctx, t, clientSet, job, podsByStatus{
Active: 1,
Ready: ptr.To[int32](0),
Terminating: ptr.To[int32](0),
})
job, err = clientSet.BatchV1().Jobs(ns.Name).Get(ctx, job.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get Job: %v", err)
}
if getJobConditionStatus(ctx, job, batchv1.JobSuspended) != v1.ConditionFalse {
t.Error("JobSuspended condition was not set to False")
}
if job.Status.StartTime == nil {
t.Error("Job startTime was not set after resume")
}
}
// TestSuspendJobWithZeroCompletions verifies the suspended Job with
// completions=0 is marked as Complete.
func TestSuspendJobWithZeroCompletions(t *testing.T) {