diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 2a811639d7f..0eef6eed394 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -123,6 +123,10 @@ type Controller struct { // Store with information to compute the expotential backoff delay for pod // recreation in case of pod failures. podBackoffStore *backoffStore + + // completedJobStore contains the job ids for which the job status is updated to completed + // but the corresponding event is not yet received. + completedJobStore *jobUIDCache } type syncJobCtx struct { @@ -184,6 +188,9 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), clock: clock, podBackoffStore: newBackoffStore(), + completedJobStore: &jobUIDCache{ + set: sets.New[types.UID](), + }, } if _, err := jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -536,6 +543,7 @@ func (jm *Controller) deleteJob(logger klog.Logger, obj interface{}) { return } } + jm.completedJobStore.remove(jobObj.UID) jm.enqueueLabelSelector(jobObj) } @@ -820,7 +828,6 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { } return err } - // Skip syncing of the job it is managed by another controller. // We cannot rely solely on skipping of queueing such jobs for synchronization, // because it is possible a synchronization task is queued for a job, without @@ -841,6 +848,11 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { // re-syncing here as the record has to be removed for finished/deleted jobs return fmt.Errorf("error removing backoff record %w", err) } + jm.completedJobStore.remove(job.UID) + return nil + } + if jm.completedJobStore.exists(job.UID) { + logger.V(2).Info("Skip syncing the job as its marked completed but the completed update event is not yet received", "uid", job.UID, "key", key) return nil } @@ -1304,6 +1316,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job } if jobFinished { jm.recordJobFinished(jobCtx.job, jobCtx.finishedCondition) + jm.completedJobStore.add(jobCtx.job.UID) } recordJobPodFinished(logger, jobCtx.job, oldCounters) } diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 8cd66c0d1d1..a2b2a5fe5bc 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -56,6 +56,7 @@ import ( _ "k8s.io/kubernetes/pkg/apis/core/install" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/job/metrics" + "k8s.io/kubernetes/pkg/controller/job/util" "k8s.io/kubernetes/pkg/controller/testutil" "k8s.io/kubernetes/pkg/features" "k8s.io/utils/clock" @@ -2879,6 +2880,110 @@ func TestSingleJobFailedCondition(t *testing.T) { } +func TestJobControllerMissingJobSucceedEvent(t *testing.T) { + t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod)) + logger, ctx := ktesting.NewTestContext(t) + job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) + job1.Name = "job1" + clientSet := fake.NewSimpleClientset(job1) + fakeClock := clocktesting.NewFakeClock(time.Now()) + jm, informer := newControllerFromClientWithClock(ctx, t, clientSet, controller.NoResyncPeriodFunc, fakeClock) + jm.podControl = &controller.RealPodControl{ + KubeClient: clientSet, + Recorder: testutil.NewFakeRecorder(), + } + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + err := informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + if err != nil { + t.Fatalf("Unexpected error when adding job to indexer %v", err) + } + // 1st reconcile should create a new pod + err = jm.syncJob(ctx, testutil.GetKey(job1, t)) + if err != nil { + t.Fatalf("Unexpected error when syncing jobs %v", err) + } + + podIndexer := informer.Core().V1().Pods().Informer().GetIndexer() + podList, err := clientSet.Tracker().List( + schema.GroupVersionResource{Version: "v1", Resource: "pods"}, + schema.GroupVersionKind{Version: "v1", Kind: "Pod"}, + "default") + if err != nil { + t.Fatalf("Unexpected error when fetching pods %v", err) + } + // manually adding the just-created pod from fake clientset memory to informer cache because informer is not started. + // we are updating the pod status to succeeded which should update the job status to succeeded and remove the finalizer of the pod. + justCreatedPod := podList.(*v1.PodList).Items[0] + fmt.Printf("pod is %v\n", podList.(*v1.PodList).Items[0]) + justCreatedPod.Status.Phase = v1.PodSucceeded + err = podIndexer.Add(&justCreatedPod) + if err != nil { + t.Fatalf("Unexpected error when adding pod to indexer %v", err) + } + jm.addPod(logger, &justCreatedPod) + err = jm.syncJob(ctx, testutil.GetKey(job1, t)) + if err != nil { + t.Fatalf("Unexpected error when syncing jobs %v", err) + } + + jobList, err := clientSet.Tracker().List( + schema.GroupVersionResource{Group: "batch", Version: "v1", Resource: "jobs"}, + schema.GroupVersionKind{Group: "batch", Version: "v1", Kind: "Job"}, + "default") + if err != nil { + t.Fatalf("Unexpected error when trying to get job from the store: %v", err) + } + updatedJob := jobList.(*batch.JobList).Items[0] + if !util.IsJobSucceeded(&updatedJob) { + t.Fatalf("job status is not succeeded: %v", updatedJob) + } + + // add the updated pod from the fake clientset memory to informer cache because informer is not started. + podList, err = clientSet.Tracker().List( + schema.GroupVersionResource{Version: "v1", Resource: "pods"}, + schema.GroupVersionKind{Version: "v1", Kind: "Pod"}, + "default") + if err != nil { + t.Fatalf("Unexpected error when fetching pods %v", err) + } + fmt.Printf("pod is %v\n", podList.(*v1.PodList).Items[0]) + updatedPod := podList.(*v1.PodList).Items[0] + updatedPod.Status.Phase = v1.PodSucceeded + err = podIndexer.Add(&updatedPod) + if err != nil { + t.Fatalf("Unexpected error when adding pod to indexer %v", err) + } + + // removing the just created pod from fake clientset memory inorder for the sync job to succeed if creating a new pod because of bug + // but the pod will remain inside informer cache + err = clientSet.Tracker().Delete( + schema.GroupVersionResource{Version: "v1", Resource: "pods"}, + "default", "") + if err != nil { + t.Fatalf("Unexpected error when deleting pod to indexer %v", err) + } + + err = jm.syncJob(ctx, testutil.GetKey(job1, t)) + if err != nil { + t.Fatalf("Unexpected error when syncing jobs %v", err) + } + time.Sleep(time.Second) + + podList, err = clientSet.Tracker().List( + schema.GroupVersionResource{Version: "v1", Resource: "pods"}, + schema.GroupVersionKind{Version: "v1", Kind: "Pod"}, + "default") + if err != nil { + t.Fatalf("Unexpected error when syncing jobs %v", err) + } + // no pod should be created + if len(podList.(*v1.PodList).Items) != 0 { + t.Errorf("expect no pods to be created but %v pods are created", len(podList.(*v1.PodList).Items)) + } +} + func TestSyncJobComplete(t *testing.T) { _, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) diff --git a/pkg/controller/job/tracking_utils.go b/pkg/controller/job/tracking_utils.go index 7e5147eb76f..c7327b0e3f2 100644 --- a/pkg/controller/job/tracking_utils.go +++ b/pkg/controller/job/tracking_utils.go @@ -151,3 +151,26 @@ func isFinishedPodWithTrackingFinalizer(pod *v1.Pod) bool { } return (pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded) && hasJobTrackingFinalizer(pod) } + +type jobUIDCache struct { + sync.RWMutex + set sets.Set[types.UID] +} + +func (j *jobUIDCache) add(uid types.UID) { + j.Lock() + defer j.Unlock() + j.set.Insert(uid) +} + +func (j *jobUIDCache) remove(uid types.UID) { + j.Lock() + defer j.Unlock() + j.set.Delete(uid) +} + +func (j *jobUIDCache) exists(uid types.UID) bool { + j.RLock() + defer j.RUnlock() + return j.set.Has(uid) +}