mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-06-09 00:34:10 -04:00
Fix JobTrackingWithFinalizers when a pod succeeds after the job fails
Change-Id: I3be351fb3b53216948a37b1d58224f8fbbf22b47
This commit is contained in:
parent
e1ab1debdb
commit
24b8252b10
3 changed files with 108 additions and 28 deletions
|
|
@ -1021,7 +1021,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
|
|||
if podFinished || podTerminating || job.DeletionTimestamp != nil {
|
||||
podsToRemoveFinalizer = append(podsToRemoveFinalizer, pod)
|
||||
}
|
||||
if pod.Status.Phase == v1.PodSucceeded {
|
||||
if pod.Status.Phase == v1.PodSucceeded && !uncounted.failed.Has(string(pod.UID)) {
|
||||
if isIndexed {
|
||||
// The completion index is enough to avoid recounting succeeded pods.
|
||||
// No need to track UIDs.
|
||||
|
|
|
|||
|
|
@ -1640,6 +1640,32 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
"pod flips from failed to succeeded": {
|
||||
job: batch.Job{
|
||||
Spec: batch.JobSpec{
|
||||
Completions: pointer.Int32(2),
|
||||
Parallelism: pointer.Int32(2),
|
||||
},
|
||||
Status: batch.JobStatus{
|
||||
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
|
||||
Failed: []types.UID{"a", "b"},
|
||||
},
|
||||
},
|
||||
},
|
||||
pods: []*v1.Pod{
|
||||
buildPod().uid("a").phase(v1.PodFailed).trackingFinalizer().Pod,
|
||||
buildPod().uid("b").phase(v1.PodSucceeded).trackingFinalizer().Pod,
|
||||
},
|
||||
finishedCond: failedCond,
|
||||
wantRmFinalizers: 2,
|
||||
wantStatusUpdates: []batch.JobStatus{
|
||||
{
|
||||
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
|
||||
Failed: 2,
|
||||
Conditions: []batch.JobCondition{*failedCond},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for name, tc := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
|
@ -88,7 +89,7 @@ func TestNonParallelJob(t *testing.T) {
|
|||
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
|
||||
|
||||
// Failed Pod is replaced.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
|
|
@ -101,7 +102,7 @@ func TestNonParallelJob(t *testing.T) {
|
|||
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
|
||||
|
||||
// No more Pods are created after the Pod succeeds.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
|
||||
}
|
||||
validateJobSucceeded(ctx, t, clientSet, jobObj)
|
||||
|
|
@ -156,7 +157,7 @@ func TestParallelJob(t *testing.T) {
|
|||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
|
||||
|
||||
// Tracks ready pods, if enabled.
|
||||
if err := setJobPodsReady(ctx, clientSet, jobObj, 2); err != nil {
|
||||
if err, _ := setJobPodsReady(ctx, clientSet, jobObj, 2); err != nil {
|
||||
t.Fatalf("Failed Marking Pods as ready: %v", err)
|
||||
}
|
||||
if tc.enableReadyPods {
|
||||
|
|
@ -165,7 +166,7 @@ func TestParallelJob(t *testing.T) {
|
|||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
|
||||
|
||||
// Failed Pods are replaced.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
|
||||
}
|
||||
want = podsByStatus{
|
||||
|
|
@ -177,7 +178,7 @@ func TestParallelJob(t *testing.T) {
|
|||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
|
||||
// Once one Pod succeeds, no more Pods are created, even if some fail.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
|
||||
}
|
||||
want = podsByStatus{
|
||||
|
|
@ -189,7 +190,7 @@ func TestParallelJob(t *testing.T) {
|
|||
want.Ready = pointer.Int32(0)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
|
||||
}
|
||||
want = podsByStatus{
|
||||
|
|
@ -202,7 +203,7 @@ func TestParallelJob(t *testing.T) {
|
|||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
|
||||
// No more Pods are created after remaining Pods succeed.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
|
||||
}
|
||||
validateJobSucceeded(ctx, t, clientSet, jobObj)
|
||||
|
|
@ -263,7 +264,7 @@ func TestParallelJobParallelism(t *testing.T) {
|
|||
}, wFinalizers)
|
||||
|
||||
// Succeed Job
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 4); err != nil {
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 4); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
|
||||
}
|
||||
validateJobSucceeded(ctx, t, clientSet, jobObj)
|
||||
|
|
@ -324,7 +325,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
|
|||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
|
||||
|
||||
// Tracks ready pods, if enabled.
|
||||
if err := setJobPodsReady(ctx, clientSet, jobObj, 52); err != nil {
|
||||
if err, _ := setJobPodsReady(ctx, clientSet, jobObj, 52); err != nil {
|
||||
t.Fatalf("Failed Marking Pods as ready: %v", err)
|
||||
}
|
||||
if tc.enableReadyPods {
|
||||
|
|
@ -333,7 +334,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
|
|||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
|
||||
|
||||
// Failed Pods are replaced.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
|
||||
}
|
||||
want = podsByStatus{
|
||||
|
|
@ -345,7 +346,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
|
|||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
|
||||
// Pods are created until the number of succeeded Pods equals completions.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 53); err != nil {
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 53); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
|
||||
}
|
||||
want = podsByStatus{
|
||||
|
|
@ -358,7 +359,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
|
|||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
|
||||
// No more Pods are created after the Job completes.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
|
||||
}
|
||||
validateJobSucceeded(ctx, t, clientSet, jobObj)
|
||||
|
|
@ -420,7 +421,7 @@ func TestIndexedJob(t *testing.T) {
|
|||
// Disable feature gate and restart controller.
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, false)()
|
||||
cancel()
|
||||
ctx, cancel = startJobController(restConfig)
|
||||
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
|
||||
events, err := clientSet.EventsV1().Events(ns.Name).Watch(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
|
@ -439,7 +440,7 @@ func TestIndexedJob(t *testing.T) {
|
|||
// Re-enable feature gate and restart controller. Failed Pod should be recreated now.
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, true)()
|
||||
cancel()
|
||||
ctx, cancel = startJobController(restConfig)
|
||||
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
|
||||
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 3,
|
||||
|
|
@ -449,7 +450,7 @@ func TestIndexedJob(t *testing.T) {
|
|||
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1")
|
||||
|
||||
// Remaining Pods succeed.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
|
||||
t.Fatal("Failed trying to succeed remaining pods")
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
|
|
@ -499,7 +500,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) {
|
|||
cancel()
|
||||
|
||||
// Fail a pod while Job controller is stopped.
|
||||
if err := setJobPodsPhase(context.Background(), clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
||||
if err, _ := setJobPodsPhase(context.Background(), clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
|
||||
}
|
||||
|
||||
|
|
@ -525,7 +526,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) {
|
|||
cancel()
|
||||
|
||||
// Succeed a pod while Job controller is stopped.
|
||||
if err := setJobPodsPhase(context.Background(), clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||
if err, _ := setJobPodsPhase(context.Background(), clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
|
||||
}
|
||||
|
||||
|
|
@ -614,7 +615,7 @@ func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) {
|
|||
|
||||
// Fail a pod ASAP.
|
||||
err = wait.PollImmediate(time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) {
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
|
|
@ -628,6 +629,56 @@ func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) {
|
|||
validateNoOrphanPodsWithFinalizers(ctx, t, clientSet, jobObj)
|
||||
}
|
||||
|
||||
// TestJobFailedWithInterrupts tests that a job were one pod fails and the rest
|
||||
// succeed is marked as Failed, even if the controller fails in the middle.
|
||||
func TestJobFailedWithInterrupts(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
|
||||
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
defer func() {
|
||||
cancel()
|
||||
}()
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
|
||||
Spec: batchv1.JobSpec{
|
||||
Completions: pointer.Int32(10),
|
||||
Parallelism: pointer.Int32(10),
|
||||
BackoffLimit: pointer.Int32(0),
|
||||
Template: v1.PodTemplateSpec{
|
||||
Spec: v1.PodSpec{
|
||||
NodeName: "foo", // Scheduled pods are not deleted immediately.
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Could not create job: %v", err)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 10,
|
||||
}, true)
|
||||
t.Log("Finishing pods")
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
||||
t.Fatalf("Could not fail a pod: %v", err)
|
||||
}
|
||||
remaining := 9
|
||||
if err := wait.PollImmediate(5*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) {
|
||||
if err, succ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil {
|
||||
remaining -= succ
|
||||
t.Logf("Transient failure succeeding pods: %v", err)
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
}); err != nil {
|
||||
t.Fatalf("Could not succeed the remaining %d pods: %v", remaining, err)
|
||||
}
|
||||
t.Log("Recreating job controller")
|
||||
cancel()
|
||||
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
|
||||
validateJobCondition(ctx, t, clientSet, jobObj, batchv1.JobFailed)
|
||||
}
|
||||
|
||||
func validateNoOrphanPodsWithFinalizers(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) {
|
||||
t.Helper()
|
||||
orphanPods := 0
|
||||
|
|
@ -827,7 +878,7 @@ func TestSuspendJobControllerRestart(t *testing.T) {
|
|||
// Disable feature gate and restart controller to test that pods get created.
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.SuspendJob, false)()
|
||||
cancel()
|
||||
ctx, cancel = startJobController(restConfig)
|
||||
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
|
||||
job, err = clientSet.BatchV1().Jobs(ns.Name).Get(ctx, job.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get Job: %v", err)
|
||||
|
|
@ -1076,7 +1127,7 @@ func validateJobCondition(ctx context.Context, t *testing.T, clientSet clientset
|
|||
}
|
||||
}
|
||||
|
||||
func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, cnt int) error {
|
||||
func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, cnt int) (error, int) {
|
||||
op := func(p *v1.Pod) bool {
|
||||
p.Status.Phase = phase
|
||||
return true
|
||||
|
|
@ -1084,7 +1135,7 @@ func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj
|
|||
return updateJobPodsStatus(ctx, clientSet, jobObj, op, cnt)
|
||||
}
|
||||
|
||||
func setJobPodsReady(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, cnt int) error {
|
||||
func setJobPodsReady(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, cnt int) (error, int) {
|
||||
op := func(p *v1.Pod) bool {
|
||||
if podutil.IsPodReady(p) {
|
||||
return false
|
||||
|
|
@ -1098,10 +1149,10 @@ func setJobPodsReady(ctx context.Context, clientSet clientset.Interface, jobObj
|
|||
return updateJobPodsStatus(ctx, clientSet, jobObj, op, cnt)
|
||||
}
|
||||
|
||||
func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, op func(*v1.Pod) bool, cnt int) error {
|
||||
func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, op func(*v1.Pod) bool, cnt int) (error, int) {
|
||||
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("listing Job Pods: %w", err)
|
||||
return fmt.Errorf("listing Job Pods: %w", err), 0
|
||||
}
|
||||
updates := make([]v1.Pod, 0, cnt)
|
||||
for _, pod := range pods.Items {
|
||||
|
|
@ -1116,15 +1167,16 @@ func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, job
|
|||
}
|
||||
}
|
||||
if len(updates) != cnt {
|
||||
return fmt.Errorf("couldn't set phase on %d Job Pods", cnt)
|
||||
return fmt.Errorf("couldn't set phase on %d Job Pods", cnt), 0
|
||||
}
|
||||
return updatePodStatuses(ctx, clientSet, updates)
|
||||
}
|
||||
|
||||
func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updates []v1.Pod) error {
|
||||
func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updates []v1.Pod) (error, int) {
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(len(updates))
|
||||
errCh := make(chan error, len(updates))
|
||||
var updated int32
|
||||
|
||||
for _, pod := range updates {
|
||||
pod := pod
|
||||
|
|
@ -1132,6 +1184,8 @@ func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updat
|
|||
_, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, &pod, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
} else {
|
||||
atomic.AddInt32(&updated, 1)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
|
|
@ -1140,10 +1194,10 @@ func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updat
|
|||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
return fmt.Errorf("updating Pod status: %w", err)
|
||||
return fmt.Errorf("updating Pod status: %w", err), int(updated)
|
||||
default:
|
||||
}
|
||||
return nil
|
||||
return nil, int(updated)
|
||||
}
|
||||
|
||||
func setJobPhaseForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, ix int) error {
|
||||
|
|
|
|||
Loading…
Reference in a new issue