From f75e1b071d413cc0fa204feec6d2b6a0f7616fbd Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Fri, 18 Mar 2022 14:31:01 -0400 Subject: [PATCH] Remove finalizer when orphaned Change-Id: Id88a28755660812a274dffab2693cb8a0ef4235c --- pkg/controller/job/job_controller.go | 41 ++++++++++++++++++++++------ test/integration/job/job_test.go | 6 ++-- 2 files changed, 35 insertions(+), 12 deletions(-) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 7511e85b059..10788865d84 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -236,7 +236,7 @@ func (jm *Controller) resolveControllerRef(namespace string, controllerRef *meta return job } -// When a pod is created, enqueue the controller that manages it and update it's expectations. +// When a pod is created, enqueue the controller that manages it and update its expectations. func (jm *Controller) addPod(obj interface{}) { pod := obj.(*v1.Pod) if pod.DeletionTimestamp != nil { @@ -261,7 +261,12 @@ func (jm *Controller) addPod(obj interface{}) { return } - // Otherwise, it's an orphan. Get a list of all matching controllers and sync + // Otherwise, it's an orphan. + // Clean the finalizer. + if hasJobTrackingFinalizer(pod) { + jm.enqueueOrphanPod(pod) + } + // Get a list of all matching controllers and sync // them to see if anyone wants to adopt it. // DO NOT observe creation because no controller should be waiting for an // orphan. @@ -331,7 +336,12 @@ func (jm *Controller) updatePod(old, cur interface{}) { return } - // Otherwise, it's an orphan. If anything changed, sync matching controllers + // Otherwise, it's an orphan. + // Clean the finalizer. + if hasJobTrackingFinalizer(curPod) { + jm.enqueueOrphanPod(curPod) + } + // If anything changed, sync matching controllers // to see if anyone wants to adopt it now. labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) if labelChanged || controllerRefChanged { @@ -364,17 +374,18 @@ func (jm *Controller) deletePod(obj interface{}, final bool) { } controllerRef := metav1.GetControllerOf(pod) + hasFinalizer := hasJobTrackingFinalizer(pod) if controllerRef == nil { // No controller should care about orphans being deleted. // But this pod might have belonged to a Job and the GC removed the reference. - if hasJobTrackingFinalizer(pod) { + if hasFinalizer { jm.enqueueOrphanPod(pod) } return } job := jm.resolveControllerRef(pod.Namespace, controllerRef) if job == nil { - if hasJobTrackingFinalizer(pod) { + if hasFinalizer { jm.enqueueOrphanPod(pod) } return @@ -387,7 +398,7 @@ func (jm *Controller) deletePod(obj interface{}, final bool) { // Consider the finalizer removed if this is the final delete. Otherwise, // it's an update for the deletion timestamp, then check finalizer. - if final || !hasJobTrackingFinalizer(pod) { + if final || !hasFinalizer { jm.finalizerExpectations.finalizerRemovalObserved(jobKey, string(pod.UID)) } @@ -441,10 +452,14 @@ func (jm *Controller) deleteJob(obj interface{}) { } } // Listing pods shouldn't really fail, as we are just querying the informer cache. - pods, _ := jm.podStore.Pods(jobObj.Namespace).List(labels.Everything()) + selector, err := metav1.LabelSelectorAsSelector(jobObj.Spec.Selector) + if err != nil { + utilruntime.HandleError(fmt.Errorf("parsing deleted job selector: %v", err)) + return + } + pods, _ := jm.podStore.Pods(jobObj.Namespace).List(selector) for _, pod := range pods { - controllerRef := metav1.GetControllerOf(pod) - if (controllerRef == nil || controllerRef.UID == jobObj.UID) && hasJobTrackingFinalizer(pod) { + if metav1.IsControlledBy(pod, jobObj) && hasJobTrackingFinalizer(pod) { jm.enqueueOrphanPod(pod) } } @@ -567,6 +582,14 @@ func (jm Controller) syncOrphanPod(ctx context.Context, key string) error { } return err } + // Make sure the pod is still orphaned. + if controllerRef := metav1.GetControllerOf(sharedPod); controllerRef != nil { + job := jm.resolveControllerRef(sharedPod.Namespace, controllerRef) + if job != nil { + // The pod was adopted. Do not remove finalizer. + return nil + } + } if patch := removeTrackingFinalizerPatch(sharedPod); patch != nil { if err := jm.podControl.PatchPod(ctx, ns, name, patch); err != nil && !apierrors.IsNotFound(err) { return err diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index ed932cf6d8f..e0ab02c487f 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -420,7 +420,7 @@ func TestIndexedJob(t *testing.T) { // Disable feature gate and restart controller. defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, false)() cancel() - ctx, cancel = startJobController(restConfig, clientSet) + ctx, cancel = startJobController(restConfig) events, err := clientSet.EventsV1().Events(ns.Name).Watch(ctx, metav1.ListOptions{}) if err != nil { t.Fatal(err) @@ -439,7 +439,7 @@ func TestIndexedJob(t *testing.T) { // Re-enable feature gate and restart controller. Failed Pod should be recreated now. defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, true)() cancel() - ctx, cancel = startJobController(restConfig, clientSet) + ctx, cancel = startJobController(restConfig) validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ Active: 3, @@ -779,7 +779,7 @@ func TestSuspendJobControllerRestart(t *testing.T) { // Disable feature gate and restart controller to test that pods get created. defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.SuspendJob, false)() cancel() - ctx, cancel = startJobController(restConfig, clientSet) + ctx, cancel = startJobController(restConfig) job, err = clientSet.BatchV1().Jobs(ns.Name).Get(ctx, job.Name, metav1.GetOptions{}) if err != nil { t.Fatalf("Failed to get Job: %v", err)