From c69259cb7134d4e4ce7521c052ce5ff1729e2eca Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 24 Oct 2025 10:09:24 +0200 Subject: [PATCH] DRA device taints: switch to workqueue in controller The approach copied from node taint eviction was to fire off one goroutine per pod the intended time. This leads to the "thundering herd" problem: when a single taint causes eviction of several pods and those all have no or the same toleration grace period, then they all get deleted concurrently at the same time. For node taint eviction that is limited by the number of pods per node, which is typically ~100. In an integration test, that already led to problems with watchers: cacher.go:855] cacher (pods): 100 objects queued in incoming channel. cache_watcher.go:203] Forcing pods watcher close due to unresponsiveness: key: "/pods/", labels: "", fields: "". len(c.input) = 10, len(c.result) = 10, graceful = false It also causes spikes in memory consumption (mostly the 2KB stack per goroutine plus closure) with no upper limit. Using a workqueue makes concurrency more deterministic because there is an upper limit. In the integration test, 10 workers kept the watch active. Another advantage is that failures to evict the pod get retried with exponential backoff per affected pod forever. Previously, evicting was tried a few times with a fixed rate and then the controller gave up. If the apiserver was down long enough, pods didn't get evicted. --- .../device_taint_eviction.go | 266 +++++++++++++----- .../device_taint_eviction_test.go | 120 +++----- 2 files changed, 242 insertions(+), 144 deletions(-) diff --git a/pkg/controller/devicetainteviction/device_taint_eviction.go b/pkg/controller/devicetainteviction/device_taint_eviction.go index 6c9b201773e..bff689e108e 100644 --- a/pkg/controller/devicetainteviction/device_taint_eviction.go +++ b/pkg/controller/devicetainteviction/device_taint_eviction.go @@ -48,6 +48,7 @@ import ( resourcealphalisters "k8s.io/client-go/listers/resource/v1alpha3" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" "k8s.io/dynamic-resource-allocation/resourceclaim" "k8s.io/klog/v2" apipod "k8s.io/kubernetes/pkg/api/v1/pod" @@ -57,9 +58,12 @@ import ( ) const ( - // retries is the number of times that the controller tries to delete a pod - // that needs to be evicted. - retries = 5 + // This is a compromise between getting work done and not overwhelming the apiserver + // and pod informers. Integration testing with 100 workers modified pods so quickly + // that a watch in the integration test couldn't keep up: + // cacher.go:855] cacher (pods): 100 objects queued in incoming channel. + // cache_watcher.go:203] Forcing pods watcher close due to unresponsiveness: key: "/pods/", labels: "", fields: "". len(c.input) = 10, len(c.result) = 10, graceful = false + numWorkers = 10 ) // Controller listens to Taint changes of DRA devices and Toleration changes of ResourceClaims, @@ -94,7 +98,9 @@ type Controller struct { classInformer resourceinformers.DeviceClassInformer ruleLister resourcealphalisters.DeviceTaintRuleLister haveSynced []cache.InformerSynced + hasSynced atomic.Int32 metrics metrics.Metrics + workqueue workqueue.TypedRateLimitingInterface[workItem] // evictPod ensures that the pod gets evicted at the specified time. // It doesn't block. @@ -104,13 +110,24 @@ type Controller struct { // Idempotent, returns false if there was nothing to cancel. cancelEvict func(pod tainteviction.NamespacedObject) bool + // mutex protects the following shared data structures. + mutex sync.Mutex + + // deletePodAt maps a pod to the time when it is meant to be evicted. + // + // The entry for pod gets deleted when eviction is no longer necessary + // and updated when the time changes. + deletePodAt map[tainteviction.NamespacedObject]time.Time + + // maybeDeletePodCount counts how often a worker checked a pod. + // This is useful for unit testing, but probably not a good public metric. + maybeDeletePodCount int64 + // allocatedClaims holds all currently known allocated claims. allocatedClaims map[types.NamespacedName]allocatedClaim // A value is slightly more efficient in BenchmarkTaintUntaint (less allocations!). // pools indexes all slices by driver and pool name. pools map[poolID]pool - - hasSynced atomic.Int32 } type poolID struct { @@ -218,35 +235,77 @@ type allocatedClaim struct { evictionTime *metav1.Time } -func (tc *Controller) deletePodHandler(c clientset.Interface, emitEventFunc func(tainteviction.NamespacedObject)) func(ctx context.Context, fireAt time.Time, args *tainteviction.WorkArgs) error { - return func(ctx context.Context, fireAt time.Time, args *tainteviction.WorkArgs) error { - var err error - for i := 0; i < retries; i++ { - if i > 0 { - time.Sleep(10 * time.Millisecond) - } - err = addConditionAndDeletePod(ctx, c, args.Object, &emitEventFunc) - if apierrors.IsNotFound(err) { - // Not a problem, the work is done. - // But we didn't do it, so don't - // bump the metric. - return nil - } - if err == nil { - podDeletionLatency := time.Since(fireAt) - // TODO: include more information why it was evicted. - klog.FromContext(ctx).Info("Evicted pod by deleting it", "pod", args.Object, "latency", podDeletionLatency) - tc.metrics.PodDeletionsTotal.Inc() - tc.metrics.PodDeletionsLatency.Observe(float64(podDeletionLatency.Seconds())) - return nil - } - } - return err - } +// workItem is stored in a workqueue and describes some piece of work which +// needs to be done. +// +// Right now that work is deleting pods. +// Updating DeviceTaintRule status will be added later. +type workItem struct { + // podRef references a pod which may need to be deleted. + // + // Controller.deletePodAt is the source of truth for if and when the pod really needs to be removed. + podRef tainteviction.NamespacedObject } -func addConditionAndDeletePod(ctx context.Context, c clientset.Interface, podRef tainteviction.NamespacedObject, emitEventFunc *func(tainteviction.NamespacedObject)) (err error) { - pod, err := c.CoreV1().Pods(podRef.Namespace).Get(ctx, podRef.Name, metav1.GetOptions{}) +// maybeDeletePod checks whether the pod needs to be deleted now and if so, does it. +// Three results are possible: +// - an error if anything goes wrong and the operation needs to be repeated +// - a positive delay if the operation needs to be repeated in the future +// - a zero delay if the deletion is done or no longer necessary +func (tc *Controller) maybeDeletePod(ctx context.Context, podRef tainteviction.NamespacedObject) (againAfter time.Duration, finalErr error) { + logger := klog.FromContext(ctx) + + // We must not hold this mutex while doing blocking API calls. + // TODO: try an atomic map instead. + tc.mutex.Lock() + tc.maybeDeletePodCount++ + fireAt, ok := tc.deletePodAt[podRef] + tc.mutex.Unlock() + logger.V(5).Info("Processing pod deletion work item", "pod", podRef, "active", ok, "fireAt", fireAt) + + if !ok { + logger.V(5).Info("Work item for pod deletion obsolete, nothing to do", "pod", podRef) + return 0, nil + } + + now := time.Now() + againAfter = fireAt.Sub(now) + if againAfter > 0 { + // Not yet. Maybe the fireAt time got updated. + return againAfter, nil + } + + defer func() { + if finalErr == nil { + // Forget the deletion time, we are done. + tc.mutex.Lock() + delete(tc.deletePodAt, podRef) + tc.mutex.Unlock() + } + }() + + err := tc.addConditionAndDeletePod(ctx, podRef) + if apierrors.IsNotFound(err) { + // Not a problem, the work is done. + // But we didn't do it, so don't + // bump the metric. + return 0, nil + } + if err != nil { + return 0, err + } + + podDeletionLatency := time.Since(fireAt) + // TODO: include more information why it was evicted. + klog.FromContext(ctx).Info("Evicted pod by deleting it", "pod", podRef, "latency", podDeletionLatency) + tc.metrics.PodDeletionsTotal.Inc() + tc.metrics.PodDeletionsLatency.Observe(float64(podDeletionLatency.Seconds())) + + return 0, nil +} + +func (tc *Controller) addConditionAndDeletePod(ctx context.Context, podRef tainteviction.NamespacedObject) (err error) { + pod, err := tc.client.CoreV1().Pods(podRef.Namespace).Get(ctx, podRef.Name, metav1.GetOptions{}) if err != nil { return err } @@ -258,12 +317,14 @@ func addConditionAndDeletePod(ctx context.Context, c clientset.Interface, podRef return apierrors.NewNotFound(v1.SchemeGroupVersion.WithResource("pods").GroupResource(), pod.Name) } - // Emit the event only once, and only if we are actually doing something. - if *emitEventFunc != nil { - (*emitEventFunc)(podRef) - *emitEventFunc = nil + if pod.DeletionTimestamp != nil { + // Already deleted, no need to evict. + return nil } + // Emit the event only if we are actually doing something. + tc.emitPodDeletionEvent(podRef) + newStatus := pod.Status.DeepCopy() updated := apipod.UpdatePodCondition(newStatus, &v1.PodCondition{ Type: v1.DisruptionTarget, @@ -272,7 +333,7 @@ func addConditionAndDeletePod(ctx context.Context, c clientset.Interface, podRef Message: "Device Taint manager: deleting due to NoExecute taint", }) if updated { - if _, _, _, err := utilpod.PatchPodStatus(ctx, c, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil { + if _, _, _, err := utilpod.PatchPodStatus(ctx, tc.client, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil { return err } } @@ -280,7 +341,7 @@ func addConditionAndDeletePod(ctx context.Context, c clientset.Interface, podRef // another pod using the same name in the meantime. Include a precondition // to prevent that race. This delete attempt then fails and the next one detects // the new pod and stops retrying. - return c.CoreV1().Pods(podRef.Namespace).Delete(ctx, podRef.Name, metav1.DeleteOptions{ + return tc.client.CoreV1().Pods(podRef.Namespace).Delete(ctx, podRef.Name, metav1.DeleteOptions{ Preconditions: &metav1.Preconditions{ UID: &podRef.UID, }, @@ -303,6 +364,7 @@ func New(c clientset.Interface, podInformer coreinformers.PodInformer, claimInfo taintInformer: taintInformer, classInformer: classInformer, ruleLister: taintInformer.Lister(), + deletePodAt: make(map[tainteviction.NamespacedObject]time.Time), allocatedClaims: make(map[types.NamespacedName]allocatedClaim), pools: make(map[poolID]pool), // Instantiate all informers now to ensure that they get started. @@ -325,9 +387,12 @@ func (tc *Controller) Run(ctx context.Context) error { defer utilruntime.HandleCrash() logger := klog.FromContext(ctx) logger.Info("Starting", "controller", tc.name) - defer logger.Info("Shutting down controller", "controller", tc.name) + defer logger.Info("Shut down controller", "controller", tc.name, "reason", context.Cause(ctx)) tc.logger = logger + var wg sync.WaitGroup + defer wg.Wait() + // Doing debug logging? if loggerV := logger.V(6); loggerV.Enabled() { tc.eventLogger = &loggerV @@ -341,21 +406,47 @@ func (tc *Controller) Run(ctx context.Context) error { tc.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: tc.name}).WithLogger(logger) defer eventBroadcaster.Shutdown() - taintEvictionQueue := tainteviction.CreateWorkerQueue(tc.deletePodHandler(tc.client, tc.emitPodDeletionEvent)) + queueLogger := klog.LoggerWithName(logger, "workqueue") + delayingQueue := workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[workItem]{ + Logger: &queueLogger, + Name: tc.name, + }) + tc.workqueue = workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[workItem](), + workqueue.TypedRateLimitingQueueConfig[workItem]{ + Name: tc.name, + DelayingQueue: delayingQueue, + }, + ) + defer func() { + logger.V(3).Info("Shutting down work queue") + tc.workqueue.ShutDown() + }() + evictPod := tc.evictPod tc.evictPod = func(podRef tainteviction.NamespacedObject, fireAt time.Time) { + tc.deletePodAt[podRef] = fireAt + now := time.Now() + tc.workqueue.AddAfter(workItem{podRef: podRef}, fireAt.Sub(now)) + // Only relevant for testing. if evictPod != nil { evictPod(podRef, fireAt) } - taintEvictionQueue.UpdateWork(ctx, &tainteviction.WorkArgs{Object: podRef}, time.Now(), fireAt) } cancelEvict := tc.cancelEvict tc.cancelEvict = func(podRef tainteviction.NamespacedObject) bool { + _, ok := tc.deletePodAt[podRef] + if !ok { + // Nothing to cancel. + return false + } + delete(tc.deletePodAt, podRef) if cancelEvict != nil { cancelEvict(podRef) } - return taintEvictionQueue.CancelWork(logger, podRef.NamespacedName.String()) + // Cannot remove from a work queue. The worker will detect that the entry is obsolete by checking deletePodAt. + return true } // Start events processing pipeline. @@ -369,9 +460,6 @@ func (tc *Controller) Run(ctx context.Context) error { } defer eventBroadcaster.Shutdown() - // mutex serializes event processing. - var mutex sync.Mutex - claimHandler, err := tc.claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { claim, ok := obj.(*resourceapi.ResourceClaim) @@ -379,8 +467,8 @@ func (tc *Controller) Run(ctx context.Context) error { logger.Error(nil, "Expected ResourceClaim", "actual", fmt.Sprintf("%T", obj)) return } - mutex.Lock() - defer mutex.Unlock() + tc.mutex.Lock() + defer tc.mutex.Unlock() tc.handleClaimChange(nil, claim) }, UpdateFunc: func(oldObj, newObj any) { @@ -393,8 +481,8 @@ func (tc *Controller) Run(ctx context.Context) error { if !ok { logger.Error(nil, "Expected ResourceClaim", "actual", fmt.Sprintf("%T", newObj)) } - mutex.Lock() - defer mutex.Unlock() + tc.mutex.Lock() + defer tc.mutex.Unlock() tc.handleClaimChange(oldClaim, newClaim) }, DeleteFunc: func(obj any) { @@ -406,8 +494,8 @@ func (tc *Controller) Run(ctx context.Context) error { logger.Error(nil, "Expected ResourceClaim", "actual", fmt.Sprintf("%T", obj)) return } - mutex.Lock() - defer mutex.Unlock() + tc.mutex.Lock() + defer tc.mutex.Unlock() tc.handleClaimChange(claim, nil) }, }) @@ -426,8 +514,8 @@ func (tc *Controller) Run(ctx context.Context) error { logger.Error(nil, "Expected Pod", "actual", fmt.Sprintf("%T", obj)) return } - mutex.Lock() - defer mutex.Unlock() + tc.mutex.Lock() + defer tc.mutex.Unlock() tc.handlePodChange(nil, pod) }, UpdateFunc: func(oldObj, newObj any) { @@ -440,8 +528,8 @@ func (tc *Controller) Run(ctx context.Context) error { if !ok { logger.Error(nil, "Expected Pod", "actual", fmt.Sprintf("%T", newObj)) } - mutex.Lock() - defer mutex.Unlock() + tc.mutex.Lock() + defer tc.mutex.Unlock() tc.handlePodChange(oldPod, newPod) }, DeleteFunc: func(obj any) { @@ -453,8 +541,8 @@ func (tc *Controller) Run(ctx context.Context) error { logger.Error(nil, "Expected Pod", "actual", fmt.Sprintf("%T", obj)) return } - mutex.Lock() - defer mutex.Unlock() + tc.mutex.Lock() + defer tc.mutex.Unlock() tc.handlePodChange(pod, nil) }, }) @@ -473,8 +561,8 @@ func (tc *Controller) Run(ctx context.Context) error { logger.Error(nil, "Expected DeviceTaintRule", "actual", fmt.Sprintf("%T", obj)) return } - mutex.Lock() - defer mutex.Unlock() + tc.mutex.Lock() + defer tc.mutex.Unlock() tc.handleRuleChange(nil, rule) }, UpdateFunc: func(oldObj, newObj any) { @@ -487,8 +575,8 @@ func (tc *Controller) Run(ctx context.Context) error { if !ok { logger.Error(nil, "Expected DeviceTaintRule", "actual", fmt.Sprintf("%T", newObj)) } - mutex.Lock() - defer mutex.Unlock() + tc.mutex.Lock() + defer tc.mutex.Unlock() tc.handleRuleChange(oldRule, newRule) }, DeleteFunc: func(obj any) { @@ -500,8 +588,8 @@ func (tc *Controller) Run(ctx context.Context) error { logger.Error(nil, "Expected DeviceTaintRule", "actual", fmt.Sprintf("%T", obj)) return } - mutex.Lock() - defer mutex.Unlock() + tc.mutex.Lock() + defer tc.mutex.Unlock() tc.handleRuleChange(rule, nil) }, }) @@ -520,8 +608,8 @@ func (tc *Controller) Run(ctx context.Context) error { logger.Error(nil, "Expected ResourceSlice", "actual", fmt.Sprintf("%T", obj)) return } - mutex.Lock() - defer mutex.Unlock() + tc.mutex.Lock() + defer tc.mutex.Unlock() tc.handleSliceChange(nil, slice) }, UpdateFunc: func(oldObj, newObj any) { @@ -534,8 +622,8 @@ func (tc *Controller) Run(ctx context.Context) error { if !ok { logger.Error(nil, "Expected ResourceSlice", "actual", fmt.Sprintf("%T", newObj)) } - mutex.Lock() - defer mutex.Unlock() + tc.mutex.Lock() + defer tc.mutex.Unlock() tc.handleSliceChange(oldSlice, newSlice) }, DeleteFunc: func(obj any) { @@ -545,8 +633,8 @@ func (tc *Controller) Run(ctx context.Context) error { logger.Error(nil, "Expected ResourceSlice", "actual", fmt.Sprintf("%T", obj)) return } - mutex.Lock() - defer mutex.Unlock() + tc.mutex.Lock() + defer tc.mutex.Unlock() tc.handleSliceChange(slice, nil) }, }) @@ -565,10 +653,50 @@ func (tc *Controller) Run(ctx context.Context) error { logger.V(1).Info("Underlying informers have synced") tc.hasSynced.Store(1) + for i := range numWorkers { + wg.Go(func() { + tc.worker(klog.NewContext(ctx, klog.LoggerWithName(queueLogger, fmt.Sprintf("worker-%d", i)))) + }) + } + <-ctx.Done() return nil } +// worker blocks until the workqueue is shut down. +// Cancellation of the context only aborts on-going work. +func (tc *Controller) worker(ctx context.Context) { + logger := klog.FromContext(ctx) + defer utilruntime.HandleCrashWithLogger(logger) + + for { + item, shutdown := tc.workqueue.Get() + if shutdown { + return + } + + func() { + defer tc.workqueue.Done(item) + + var err error + var againAfter time.Duration + if item.podRef.Name != "" { + againAfter, err = tc.maybeDeletePod(ctx, item.podRef) + } + switch { + case err != nil: + logger.V(3).Info("Evicting pod failed, will retry", "err", err) + tc.workqueue.AddRateLimited(item) + case againAfter > 0: + logger.V(5).Info("Checking pod eviction again later", "delay", againAfter) + tc.workqueue.AddAfter(item, againAfter) + default: + tc.workqueue.Forget(item) + } + }() + } +} + func (tc *Controller) handleClaimChange(oldClaim, newClaim *resourceapi.ResourceClaim) { claim := newClaim if claim == nil { diff --git a/pkg/controller/devicetainteviction/device_taint_eviction_test.go b/pkg/controller/devicetainteviction/device_taint_eviction_test.go index 9795925ad40..fdf616d0d75 100644 --- a/pkg/controller/devicetainteviction/device_taint_eviction_test.go +++ b/pkg/controller/devicetainteviction/device_taint_eviction_test.go @@ -1593,22 +1593,45 @@ func testCancelEviction(tCtx ktesting.TContext, deletePod bool) { slice := sliceTainted.DeepCopy() slice.Spec.Devices[0].Taints[0].TimeAdded = &metav1.Time{Time: time.Now()} claim := inUseClaim.DeepCopy() + tolerationSeconds := int64(60) claim.Status.Allocation.Devices.Results[0].Tolerations = []resourceapi.DeviceToleration{{ Operator: resourceapi.DeviceTolerationOpExists, Effect: resourceapi.DeviceTaintEffectNoExecute, - TolerationSeconds: ptr.To(int64(60)), + TolerationSeconds: &tolerationSeconds, }} fakeClientset := fake.NewSimpleClientset( slice, claim, pod, ) - tCtx = ktesting.WithClients(tCtx, nil, nil, fakeClientset, nil, nil) - pod, err := fakeClientset.CoreV1().Pods(pod.Namespace).Get(tCtx, pod.Name, metav1.GetOptions{}) require.NoError(tCtx, err, "get pod before eviction") assert.Equal(tCtx, podWithClaimName, pod, "test pod") + var podGets int + var podUpdates int + var podDeletions int + + fakeClientset.PrependReactor("get", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { + podGets++ + podName := action.(core.GetAction).GetName() + assert.Equal(tCtx, podWithClaimName.Name, podName, "name of pod to patch") + return false, nil, nil + }) + fakeClientset.PrependReactor("patch", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { + podUpdates++ + podName := action.(core.PatchAction).GetName() + assert.Equal(tCtx, podWithClaimName.Name, podName, "name of pod to get") + return false, nil, nil + }) + fakeClientset.PrependReactor("delete", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { + podDeletions++ + podName := action.(core.DeleteAction).GetName() + assert.Equal(tCtx, podWithClaimName.Name, podName, "name of pod to delete") + return false, nil, nil + }) + + tCtx = ktesting.WithClients(tCtx, nil, nil, fakeClientset, nil, nil) controller := newTestController(tCtx, fakeClientset) var mutex sync.Mutex @@ -1666,10 +1689,29 @@ func testCancelEviction(tCtx ktesting.TContext, deletePod bool) { ktesting.Eventually(tCtx, listEvents).WithTimeout(30 * time.Second).Should(matchCancellationEvent()) } tCtx.Wait() + matchEvents := matchCancellationEvent() if deletePod { matchEvents = gomega.BeEmpty() + assert.Equal(tCtx, 1, podDeletions, "Pod should have been deleted exactly once by test.") + } else { + assert.Equal(tCtx, 0, podDeletions, "Pod should not have been deleted.") } + + // Naively (?) one could expect synctest.Wait to have blocked until the work item added via AddAfter + // got processed because before that the overall state isn't stable yet. But the workqueue package + // seems to implement AddAfter in a way which is not detected as "blocking on time to pass" by + // by synctest and therefore it returns without advancing time enough. + // + // Here we trigger that manually as a workaround (?). The factor doesn't really matter. + // Commenting this out causes the controller.maybeDeletePodCount check to fail. + time.Sleep(10 * time.Duration(tolerationSeconds) * time.Second) + tCtx.Wait() + + assert.Equal(tCtx, 0, podGets, "Worker should not have needed to get the pod.") + assert.Equal(tCtx, 0, podUpdates, "Worker should not have needed to update the pod.") + assert.Equal(tCtx, 0, controller.workqueue.Len(), "Work queue should be empty now.") + assert.Equal(tCtx, int64(1), controller.maybeDeletePodCount, "Work queue should have processed pod.") gomega.NewWithT(tCtx).Expect(listEvents(tCtx)).Should(matchEvents) tCtx.ExpectNoError(testPodDeletionsMetrics(controller, 0)) } @@ -1822,78 +1864,6 @@ func TestRetry(t *testing.T) { }) } -// TestRetry covers the scenario that an eviction attempt fails. -func TestEvictionFailure(t *testing.T) { - tCtx := ktesting.Init(t) - - tCtx.SyncTest("", func(tCtx ktesting.TContext) { - // This scenario is the same as "evict-pod-resourceclaim" above. - pod := podWithClaimName.DeepCopy() - fakeClientset := fake.NewSimpleClientset( - sliceTainted, - slice2, - inUseClaim, - pod, - ) - tCtx = ktesting.WithClients(tCtx, nil, nil, fakeClientset, nil, nil) - - pod, err := fakeClientset.CoreV1().Pods(pod.Namespace).Get(tCtx, pod.Name, metav1.GetOptions{}) - require.NoError(tCtx, err, "get pod before eviction") - assert.Equal(tCtx, podWithClaimName, pod, "test pod") - - var mutex sync.Mutex - var podGets int - var podDeletions int - - fakeClientset.PrependReactor("get", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { - mutex.Lock() - defer mutex.Unlock() - podGets++ - podName := action.(core.GetAction).GetName() - assert.Equal(t, podWithClaimName.Name, podName, "name of patched pod") - return false, nil, nil - }) - fakeClientset.PrependReactor("delete", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { - mutex.Lock() - defer mutex.Unlock() - podDeletions++ - podName := action.(core.DeleteAction).GetName() - assert.Equal(t, podWithClaimName.Name, podName, "name of deleted pod") - return true, nil, apierrors.NewInternalError(errors.New("fake error")) - }) - controller := newTestController(tCtx, fakeClientset) - - var wg sync.WaitGroup - defer func() { - t.Log("Waiting for goroutine termination...") - tCtx.Cancel("time to stop") - wg.Wait() - }() - wg.Add(1) - go func() { - defer wg.Done() - assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed") - }() - - // Block until eviction has started. - // Eventually deletion is attempted a few times. - ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) int { - mutex.Lock() - defer mutex.Unlock() - return podDeletions - }).WithTimeout(30*time.Second).Should(gomega.BeNumerically(">=", retries), "pod eviction failed") - - // Now we can check the API calls. - // The background goroutined must be done when Wait returns, - // otherwise Wait wouldn't return. - tCtx.Wait() - assert.Equal(tCtx, retries, podGets, "number of pod get calls") - assert.Equal(tCtx, retries, podDeletions, "number of pod delete calls") - gomega.NewWithT(tCtx).Expect(listEvents(tCtx)).Should(matchDeletionEvent()) - tCtx.ExpectNoError(testPodDeletionsMetrics(controller, 0)) - }) -} - // BenchTaintUntaint checks the full flow of detecting a claim as // tainted because of a new DeviceTaintRule, starting to evict its // consumer, and then undoing that when the DeviceTaintRule is removed.