handle job complete update delayed event

This commit is contained in:
Keerthan Reddy Mala 2025-02-20 20:13:56 -08:00
parent 6be1530bbc
commit 547c005cb7
3 changed files with 142 additions and 1 deletions

View file

@ -123,6 +123,10 @@ type Controller struct {
// Store with information to compute the expotential backoff delay for pod
// recreation in case of pod failures.
podBackoffStore *backoffStore
// completedJobStore contains the job ids for which the job status is updated to completed
// but the corresponding event is not yet received.
completedJobStore *jobUIDCache
}
type syncJobCtx struct {
@ -184,6 +188,9 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
clock: clock,
podBackoffStore: newBackoffStore(),
completedJobStore: &jobUIDCache{
set: sets.New[types.UID](),
},
}
if _, err := jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -536,6 +543,7 @@ func (jm *Controller) deleteJob(logger klog.Logger, obj interface{}) {
return
}
}
jm.completedJobStore.remove(jobObj.UID)
jm.enqueueLabelSelector(jobObj)
}
@ -820,7 +828,6 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
}
return err
}
// Skip syncing of the job it is managed by another controller.
// We cannot rely solely on skipping of queueing such jobs for synchronization,
// because it is possible a synchronization task is queued for a job, without
@ -841,6 +848,11 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
// re-syncing here as the record has to be removed for finished/deleted jobs
return fmt.Errorf("error removing backoff record %w", err)
}
jm.completedJobStore.remove(job.UID)
return nil
}
if jm.completedJobStore.exists(job.UID) {
logger.V(2).Info("Skip syncing the job as its marked completed but the completed update event is not yet received", "uid", job.UID, "key", key)
return nil
}
@ -1304,6 +1316,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
}
if jobFinished {
jm.recordJobFinished(jobCtx.job, jobCtx.finishedCondition)
jm.completedJobStore.add(jobCtx.job.UID)
}
recordJobPodFinished(logger, jobCtx.job, oldCounters)
}

View file

@ -56,6 +56,7 @@ import (
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/job/metrics"
"k8s.io/kubernetes/pkg/controller/job/util"
"k8s.io/kubernetes/pkg/controller/testutil"
"k8s.io/kubernetes/pkg/features"
"k8s.io/utils/clock"
@ -2879,6 +2880,110 @@ func TestSingleJobFailedCondition(t *testing.T) {
}
func TestJobControllerMissingJobSucceedEvent(t *testing.T) {
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
logger, ctx := ktesting.NewTestContext(t)
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1"
clientSet := fake.NewSimpleClientset(job1)
fakeClock := clocktesting.NewFakeClock(time.Now())
jm, informer := newControllerFromClientWithClock(ctx, t, clientSet, controller.NoResyncPeriodFunc, fakeClock)
jm.podControl = &controller.RealPodControl{
KubeClient: clientSet,
Recorder: testutil.NewFakeRecorder(),
}
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
err := informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
if err != nil {
t.Fatalf("Unexpected error when adding job to indexer %v", err)
}
// 1st reconcile should create a new pod
err = jm.syncJob(ctx, testutil.GetKey(job1, t))
if err != nil {
t.Fatalf("Unexpected error when syncing jobs %v", err)
}
podIndexer := informer.Core().V1().Pods().Informer().GetIndexer()
podList, err := clientSet.Tracker().List(
schema.GroupVersionResource{Version: "v1", Resource: "pods"},
schema.GroupVersionKind{Version: "v1", Kind: "Pod"},
"default")
if err != nil {
t.Fatalf("Unexpected error when fetching pods %v", err)
}
// manually adding the just-created pod from fake clientset memory to informer cache because informer is not started.
// we are updating the pod status to succeeded which should update the job status to succeeded and remove the finalizer of the pod.
justCreatedPod := podList.(*v1.PodList).Items[0]
fmt.Printf("pod is %v\n", podList.(*v1.PodList).Items[0])
justCreatedPod.Status.Phase = v1.PodSucceeded
err = podIndexer.Add(&justCreatedPod)
if err != nil {
t.Fatalf("Unexpected error when adding pod to indexer %v", err)
}
jm.addPod(logger, &justCreatedPod)
err = jm.syncJob(ctx, testutil.GetKey(job1, t))
if err != nil {
t.Fatalf("Unexpected error when syncing jobs %v", err)
}
jobList, err := clientSet.Tracker().List(
schema.GroupVersionResource{Group: "batch", Version: "v1", Resource: "jobs"},
schema.GroupVersionKind{Group: "batch", Version: "v1", Kind: "Job"},
"default")
if err != nil {
t.Fatalf("Unexpected error when trying to get job from the store: %v", err)
}
updatedJob := jobList.(*batch.JobList).Items[0]
if !util.IsJobSucceeded(&updatedJob) {
t.Fatalf("job status is not succeeded: %v", updatedJob)
}
// add the updated pod from the fake clientset memory to informer cache because informer is not started.
podList, err = clientSet.Tracker().List(
schema.GroupVersionResource{Version: "v1", Resource: "pods"},
schema.GroupVersionKind{Version: "v1", Kind: "Pod"},
"default")
if err != nil {
t.Fatalf("Unexpected error when fetching pods %v", err)
}
fmt.Printf("pod is %v\n", podList.(*v1.PodList).Items[0])
updatedPod := podList.(*v1.PodList).Items[0]
updatedPod.Status.Phase = v1.PodSucceeded
err = podIndexer.Add(&updatedPod)
if err != nil {
t.Fatalf("Unexpected error when adding pod to indexer %v", err)
}
// removing the just created pod from fake clientset memory inorder for the sync job to succeed if creating a new pod because of bug
// but the pod will remain inside informer cache
err = clientSet.Tracker().Delete(
schema.GroupVersionResource{Version: "v1", Resource: "pods"},
"default", "")
if err != nil {
t.Fatalf("Unexpected error when deleting pod to indexer %v", err)
}
err = jm.syncJob(ctx, testutil.GetKey(job1, t))
if err != nil {
t.Fatalf("Unexpected error when syncing jobs %v", err)
}
time.Sleep(time.Second)
podList, err = clientSet.Tracker().List(
schema.GroupVersionResource{Version: "v1", Resource: "pods"},
schema.GroupVersionKind{Version: "v1", Kind: "Pod"},
"default")
if err != nil {
t.Fatalf("Unexpected error when syncing jobs %v", err)
}
// no pod should be created
if len(podList.(*v1.PodList).Items) != 0 {
t.Errorf("expect no pods to be created but %v pods are created", len(podList.(*v1.PodList).Items))
}
}
func TestSyncJobComplete(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})

View file

@ -151,3 +151,26 @@ func isFinishedPodWithTrackingFinalizer(pod *v1.Pod) bool {
}
return (pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded) && hasJobTrackingFinalizer(pod)
}
type jobUIDCache struct {
sync.RWMutex
set sets.Set[types.UID]
}
func (j *jobUIDCache) add(uid types.UID) {
j.Lock()
defer j.Unlock()
j.set.Insert(uid)
}
func (j *jobUIDCache) remove(uid types.UID) {
j.Lock()
defer j.Unlock()
j.set.Delete(uid)
}
func (j *jobUIDCache) exists(uid types.UID) bool {
j.RLock()
defer j.RUnlock()
return j.set.Has(uid)
}