diff --git a/pkg/controller/serviceaccount/tokens_controller.go b/pkg/controller/serviceaccount/tokens_controller.go index 9b8b1ac491d..be136886d45 100644 --- a/pkg/controller/serviceaccount/tokens_controller.go +++ b/pkg/controller/serviceaccount/tokens_controller.go @@ -192,9 +192,14 @@ func (e *TokensController) Run(ctx context.Context, workers int) { } func (e *TokensController) queueServiceAccountSync(obj interface{}) { + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = tombstone.Obj + } if serviceAccount, ok := obj.(*v1.ServiceAccount); ok { e.syncServiceAccountQueue.Add(makeServiceAccountKey(serviceAccount)) + return } + utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type: %T", obj)) } func (e *TokensController) queueServiceAccountUpdateSync(oldObj interface{}, newObj interface{}) { diff --git a/pkg/controller/serviceaccount/tokens_controller_test.go b/pkg/controller/serviceaccount/tokens_controller_test.go index e1fef112861..66a19142cd1 100644 --- a/pkg/controller/serviceaccount/tokens_controller_test.go +++ b/pkg/controller/serviceaccount/tokens_controller_test.go @@ -32,6 +32,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/test/utils/ktesting" ) @@ -551,3 +552,28 @@ func TestTokenCreation(t *testing.T) { }) } } + +func TestQueueServiceAccountSync_Tombstone(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + sa := serviceAccount(emptySecretReferences()) + tombstone := cache.DeletedFinalStateUnknown{ + Key: "default/default", + Obj: sa, + } + + client := fake.NewClientset(sa) + informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + tokenController, err := NewTokensController(logger, informerFactory.Core().V1().ServiceAccounts(), informerFactory.Core().V1().Secrets(), client, TokensControllerOptions{}) + if err != nil { + t.Fatalf("error creating Tokens controller: %v", err) + } + + tokenController.queueServiceAccountSync(tombstone) + if tokenController.syncServiceAccountQueue.Len() != 1 { + t.Errorf("expected 1 item in queue, got %d", tokenController.syncServiceAccountQueue.Len()) + } + key, _ := tokenController.syncServiceAccountQueue.Get() + if key.uid != sa.UID { + t.Errorf("expected UID %s, got %s", sa.UID, key.uid) + } +} diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index bcad81c509c..9a15c63ed42 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -540,8 +540,12 @@ func (adc *attachDetachController) podUpdate(logger klog.Logger, oldObj, newObj } func (adc *attachDetachController) podDelete(logger klog.Logger, obj interface{}) { + if tombstone, ok := obj.(kcache.DeletedFinalStateUnknown); ok { + obj = tombstone.Obj + } pod, ok := obj.(*v1.Pod) - if pod == nil || !ok { + if !ok { + runtime.HandleError(fmt.Errorf("unexpected object type: %T", obj)) return } diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go index 044e1dc3823..5596fe6e796 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go @@ -736,3 +736,29 @@ func verifyAttachDetachCalls(t *testing.T, testPlugin *controllervolumetesting.T t.Fatalf("Fatal error encountered in the testing volume plugin") } } + +func TestPodDelete_Tombstone(t *testing.T) { + _, tCtx := ktesting.NewTestContext(t) + fakeKubeClient := controllervolumetesting.CreateTestClient(tCtx.Logger()) + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) + + adc := createADC(t, tCtx, fakeKubeClient, informerFactory, + append(controllervolumetesting.CreateTestPlugin(false), csi.ProbeVolumePlugins()...)) + + pod := controllervolumetesting.NewPodWithVolume("pod1", "vol1", "node1") + volumeName := v1.UniqueVolumeName(csiPDUniqueNamePrefix + "pdName") + + adc.desiredStateOfWorld.AddNode("node1") + adc.podAdd(tCtx.Logger(), pod) + + if !adc.desiredStateOfWorld.VolumeExists(volumeName, "node1") { + t.Fatalf("expected volume %s to exist in dsw", volumeName) + } + + // Tombstone + adc.podDelete(tCtx.Logger(), kcache.DeletedFinalStateUnknown{Key: "mynamespace/pod1", Obj: pod}) + + if adc.desiredStateOfWorld.VolumeExists(volumeName, "node1") { + t.Errorf("expected volume %s to be removed from dsw", volumeName) + } +}