diff --git a/pkg/controller/devicetainteviction/device_taint_eviction.go b/pkg/controller/devicetainteviction/device_taint_eviction.go index 6c9b201773e..bff689e108e 100644 --- a/pkg/controller/devicetainteviction/device_taint_eviction.go +++ b/pkg/controller/devicetainteviction/device_taint_eviction.go @@ -48,6 +48,7 @@ import ( resourcealphalisters "k8s.io/client-go/listers/resource/v1alpha3" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" "k8s.io/dynamic-resource-allocation/resourceclaim" "k8s.io/klog/v2" apipod "k8s.io/kubernetes/pkg/api/v1/pod" @@ -57,9 +58,12 @@ import ( ) const ( - // retries is the number of times that the controller tries to delete a pod - // that needs to be evicted. - retries = 5 + // This is a compromise between getting work done and not overwhelming the apiserver + // and pod informers. Integration testing with 100 workers modified pods so quickly + // that a watch in the integration test couldn't keep up: + // cacher.go:855] cacher (pods): 100 objects queued in incoming channel. + // cache_watcher.go:203] Forcing pods watcher close due to unresponsiveness: key: "/pods/", labels: "", fields: "". len(c.input) = 10, len(c.result) = 10, graceful = false + numWorkers = 10 ) // Controller listens to Taint changes of DRA devices and Toleration changes of ResourceClaims, @@ -94,7 +98,9 @@ type Controller struct { classInformer resourceinformers.DeviceClassInformer ruleLister resourcealphalisters.DeviceTaintRuleLister haveSynced []cache.InformerSynced + hasSynced atomic.Int32 metrics metrics.Metrics + workqueue workqueue.TypedRateLimitingInterface[workItem] // evictPod ensures that the pod gets evicted at the specified time. // It doesn't block. @@ -104,13 +110,24 @@ type Controller struct { // Idempotent, returns false if there was nothing to cancel. cancelEvict func(pod tainteviction.NamespacedObject) bool + // mutex protects the following shared data structures. + mutex sync.Mutex + + // deletePodAt maps a pod to the time when it is meant to be evicted. + // + // The entry for pod gets deleted when eviction is no longer necessary + // and updated when the time changes. + deletePodAt map[tainteviction.NamespacedObject]time.Time + + // maybeDeletePodCount counts how often a worker checked a pod. + // This is useful for unit testing, but probably not a good public metric. + maybeDeletePodCount int64 + // allocatedClaims holds all currently known allocated claims. allocatedClaims map[types.NamespacedName]allocatedClaim // A value is slightly more efficient in BenchmarkTaintUntaint (less allocations!). // pools indexes all slices by driver and pool name. pools map[poolID]pool - - hasSynced atomic.Int32 } type poolID struct { @@ -218,35 +235,77 @@ type allocatedClaim struct { evictionTime *metav1.Time } -func (tc *Controller) deletePodHandler(c clientset.Interface, emitEventFunc func(tainteviction.NamespacedObject)) func(ctx context.Context, fireAt time.Time, args *tainteviction.WorkArgs) error { - return func(ctx context.Context, fireAt time.Time, args *tainteviction.WorkArgs) error { - var err error - for i := 0; i < retries; i++ { - if i > 0 { - time.Sleep(10 * time.Millisecond) - } - err = addConditionAndDeletePod(ctx, c, args.Object, &emitEventFunc) - if apierrors.IsNotFound(err) { - // Not a problem, the work is done. - // But we didn't do it, so don't - // bump the metric. - return nil - } - if err == nil { - podDeletionLatency := time.Since(fireAt) - // TODO: include more information why it was evicted. - klog.FromContext(ctx).Info("Evicted pod by deleting it", "pod", args.Object, "latency", podDeletionLatency) - tc.metrics.PodDeletionsTotal.Inc() - tc.metrics.PodDeletionsLatency.Observe(float64(podDeletionLatency.Seconds())) - return nil - } - } - return err - } +// workItem is stored in a workqueue and describes some piece of work which +// needs to be done. +// +// Right now that work is deleting pods. +// Updating DeviceTaintRule status will be added later. +type workItem struct { + // podRef references a pod which may need to be deleted. + // + // Controller.deletePodAt is the source of truth for if and when the pod really needs to be removed. + podRef tainteviction.NamespacedObject } -func addConditionAndDeletePod(ctx context.Context, c clientset.Interface, podRef tainteviction.NamespacedObject, emitEventFunc *func(tainteviction.NamespacedObject)) (err error) { - pod, err := c.CoreV1().Pods(podRef.Namespace).Get(ctx, podRef.Name, metav1.GetOptions{}) +// maybeDeletePod checks whether the pod needs to be deleted now and if so, does it. +// Three results are possible: +// - an error if anything goes wrong and the operation needs to be repeated +// - a positive delay if the operation needs to be repeated in the future +// - a zero delay if the deletion is done or no longer necessary +func (tc *Controller) maybeDeletePod(ctx context.Context, podRef tainteviction.NamespacedObject) (againAfter time.Duration, finalErr error) { + logger := klog.FromContext(ctx) + + // We must not hold this mutex while doing blocking API calls. + // TODO: try an atomic map instead. + tc.mutex.Lock() + tc.maybeDeletePodCount++ + fireAt, ok := tc.deletePodAt[podRef] + tc.mutex.Unlock() + logger.V(5).Info("Processing pod deletion work item", "pod", podRef, "active", ok, "fireAt", fireAt) + + if !ok { + logger.V(5).Info("Work item for pod deletion obsolete, nothing to do", "pod", podRef) + return 0, nil + } + + now := time.Now() + againAfter = fireAt.Sub(now) + if againAfter > 0 { + // Not yet. Maybe the fireAt time got updated. + return againAfter, nil + } + + defer func() { + if finalErr == nil { + // Forget the deletion time, we are done. + tc.mutex.Lock() + delete(tc.deletePodAt, podRef) + tc.mutex.Unlock() + } + }() + + err := tc.addConditionAndDeletePod(ctx, podRef) + if apierrors.IsNotFound(err) { + // Not a problem, the work is done. + // But we didn't do it, so don't + // bump the metric. + return 0, nil + } + if err != nil { + return 0, err + } + + podDeletionLatency := time.Since(fireAt) + // TODO: include more information why it was evicted. + klog.FromContext(ctx).Info("Evicted pod by deleting it", "pod", podRef, "latency", podDeletionLatency) + tc.metrics.PodDeletionsTotal.Inc() + tc.metrics.PodDeletionsLatency.Observe(float64(podDeletionLatency.Seconds())) + + return 0, nil +} + +func (tc *Controller) addConditionAndDeletePod(ctx context.Context, podRef tainteviction.NamespacedObject) (err error) { + pod, err := tc.client.CoreV1().Pods(podRef.Namespace).Get(ctx, podRef.Name, metav1.GetOptions{}) if err != nil { return err } @@ -258,12 +317,14 @@ func addConditionAndDeletePod(ctx context.Context, c clientset.Interface, podRef return apierrors.NewNotFound(v1.SchemeGroupVersion.WithResource("pods").GroupResource(), pod.Name) } - // Emit the event only once, and only if we are actually doing something. - if *emitEventFunc != nil { - (*emitEventFunc)(podRef) - *emitEventFunc = nil + if pod.DeletionTimestamp != nil { + // Already deleted, no need to evict. + return nil } + // Emit the event only if we are actually doing something. + tc.emitPodDeletionEvent(podRef) + newStatus := pod.Status.DeepCopy() updated := apipod.UpdatePodCondition(newStatus, &v1.PodCondition{ Type: v1.DisruptionTarget, @@ -272,7 +333,7 @@ func addConditionAndDeletePod(ctx context.Context, c clientset.Interface, podRef Message: "Device Taint manager: deleting due to NoExecute taint", }) if updated { - if _, _, _, err := utilpod.PatchPodStatus(ctx, c, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil { + if _, _, _, err := utilpod.PatchPodStatus(ctx, tc.client, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil { return err } } @@ -280,7 +341,7 @@ func addConditionAndDeletePod(ctx context.Context, c clientset.Interface, podRef // another pod using the same name in the meantime. Include a precondition // to prevent that race. This delete attempt then fails and the next one detects // the new pod and stops retrying. - return c.CoreV1().Pods(podRef.Namespace).Delete(ctx, podRef.Name, metav1.DeleteOptions{ + return tc.client.CoreV1().Pods(podRef.Namespace).Delete(ctx, podRef.Name, metav1.DeleteOptions{ Preconditions: &metav1.Preconditions{ UID: &podRef.UID, }, @@ -303,6 +364,7 @@ func New(c clientset.Interface, podInformer coreinformers.PodInformer, claimInfo taintInformer: taintInformer, classInformer: classInformer, ruleLister: taintInformer.Lister(), + deletePodAt: make(map[tainteviction.NamespacedObject]time.Time), allocatedClaims: make(map[types.NamespacedName]allocatedClaim), pools: make(map[poolID]pool), // Instantiate all informers now to ensure that they get started. @@ -325,9 +387,12 @@ func (tc *Controller) Run(ctx context.Context) error { defer utilruntime.HandleCrash() logger := klog.FromContext(ctx) logger.Info("Starting", "controller", tc.name) - defer logger.Info("Shutting down controller", "controller", tc.name) + defer logger.Info("Shut down controller", "controller", tc.name, "reason", context.Cause(ctx)) tc.logger = logger + var wg sync.WaitGroup + defer wg.Wait() + // Doing debug logging? if loggerV := logger.V(6); loggerV.Enabled() { tc.eventLogger = &loggerV @@ -341,21 +406,47 @@ func (tc *Controller) Run(ctx context.Context) error { tc.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: tc.name}).WithLogger(logger) defer eventBroadcaster.Shutdown() - taintEvictionQueue := tainteviction.CreateWorkerQueue(tc.deletePodHandler(tc.client, tc.emitPodDeletionEvent)) + queueLogger := klog.LoggerWithName(logger, "workqueue") + delayingQueue := workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[workItem]{ + Logger: &queueLogger, + Name: tc.name, + }) + tc.workqueue = workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[workItem](), + workqueue.TypedRateLimitingQueueConfig[workItem]{ + Name: tc.name, + DelayingQueue: delayingQueue, + }, + ) + defer func() { + logger.V(3).Info("Shutting down work queue") + tc.workqueue.ShutDown() + }() + evictPod := tc.evictPod tc.evictPod = func(podRef tainteviction.NamespacedObject, fireAt time.Time) { + tc.deletePodAt[podRef] = fireAt + now := time.Now() + tc.workqueue.AddAfter(workItem{podRef: podRef}, fireAt.Sub(now)) + // Only relevant for testing. if evictPod != nil { evictPod(podRef, fireAt) } - taintEvictionQueue.UpdateWork(ctx, &tainteviction.WorkArgs{Object: podRef}, time.Now(), fireAt) } cancelEvict := tc.cancelEvict tc.cancelEvict = func(podRef tainteviction.NamespacedObject) bool { + _, ok := tc.deletePodAt[podRef] + if !ok { + // Nothing to cancel. + return false + } + delete(tc.deletePodAt, podRef) if cancelEvict != nil { cancelEvict(podRef) } - return taintEvictionQueue.CancelWork(logger, podRef.NamespacedName.String()) + // Cannot remove from a work queue. The worker will detect that the entry is obsolete by checking deletePodAt. + return true } // Start events processing pipeline. @@ -369,9 +460,6 @@ func (tc *Controller) Run(ctx context.Context) error { } defer eventBroadcaster.Shutdown() - // mutex serializes event processing. - var mutex sync.Mutex - claimHandler, err := tc.claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { claim, ok := obj.(*resourceapi.ResourceClaim) @@ -379,8 +467,8 @@ func (tc *Controller) Run(ctx context.Context) error { logger.Error(nil, "Expected ResourceClaim", "actual", fmt.Sprintf("%T", obj)) return } - mutex.Lock() - defer mutex.Unlock() + tc.mutex.Lock() + defer tc.mutex.Unlock() tc.handleClaimChange(nil, claim) }, UpdateFunc: func(oldObj, newObj any) { @@ -393,8 +481,8 @@ func (tc *Controller) Run(ctx context.Context) error { if !ok { logger.Error(nil, "Expected ResourceClaim", "actual", fmt.Sprintf("%T", newObj)) } - mutex.Lock() - defer mutex.Unlock() + tc.mutex.Lock() + defer tc.mutex.Unlock() tc.handleClaimChange(oldClaim, newClaim) }, DeleteFunc: func(obj any) { @@ -406,8 +494,8 @@ func (tc *Controller) Run(ctx context.Context) error { logger.Error(nil, "Expected ResourceClaim", "actual", fmt.Sprintf("%T", obj)) return } - mutex.Lock() - defer mutex.Unlock() + tc.mutex.Lock() + defer tc.mutex.Unlock() tc.handleClaimChange(claim, nil) }, }) @@ -426,8 +514,8 @@ func (tc *Controller) Run(ctx context.Context) error { logger.Error(nil, "Expected Pod", "actual", fmt.Sprintf("%T", obj)) return } - mutex.Lock() - defer mutex.Unlock() + tc.mutex.Lock() + defer tc.mutex.Unlock() tc.handlePodChange(nil, pod) }, UpdateFunc: func(oldObj, newObj any) { @@ -440,8 +528,8 @@ func (tc *Controller) Run(ctx context.Context) error { if !ok { logger.Error(nil, "Expected Pod", "actual", fmt.Sprintf("%T", newObj)) } - mutex.Lock() - defer mutex.Unlock() + tc.mutex.Lock() + defer tc.mutex.Unlock() tc.handlePodChange(oldPod, newPod) }, DeleteFunc: func(obj any) { @@ -453,8 +541,8 @@ func (tc *Controller) Run(ctx context.Context) error { logger.Error(nil, "Expected Pod", "actual", fmt.Sprintf("%T", obj)) return } - mutex.Lock() - defer mutex.Unlock() + tc.mutex.Lock() + defer tc.mutex.Unlock() tc.handlePodChange(pod, nil) }, }) @@ -473,8 +561,8 @@ func (tc *Controller) Run(ctx context.Context) error { logger.Error(nil, "Expected DeviceTaintRule", "actual", fmt.Sprintf("%T", obj)) return } - mutex.Lock() - defer mutex.Unlock() + tc.mutex.Lock() + defer tc.mutex.Unlock() tc.handleRuleChange(nil, rule) }, UpdateFunc: func(oldObj, newObj any) { @@ -487,8 +575,8 @@ func (tc *Controller) Run(ctx context.Context) error { if !ok { logger.Error(nil, "Expected DeviceTaintRule", "actual", fmt.Sprintf("%T", newObj)) } - mutex.Lock() - defer mutex.Unlock() + tc.mutex.Lock() + defer tc.mutex.Unlock() tc.handleRuleChange(oldRule, newRule) }, DeleteFunc: func(obj any) { @@ -500,8 +588,8 @@ func (tc *Controller) Run(ctx context.Context) error { logger.Error(nil, "Expected DeviceTaintRule", "actual", fmt.Sprintf("%T", obj)) return } - mutex.Lock() - defer mutex.Unlock() + tc.mutex.Lock() + defer tc.mutex.Unlock() tc.handleRuleChange(rule, nil) }, }) @@ -520,8 +608,8 @@ func (tc *Controller) Run(ctx context.Context) error { logger.Error(nil, "Expected ResourceSlice", "actual", fmt.Sprintf("%T", obj)) return } - mutex.Lock() - defer mutex.Unlock() + tc.mutex.Lock() + defer tc.mutex.Unlock() tc.handleSliceChange(nil, slice) }, UpdateFunc: func(oldObj, newObj any) { @@ -534,8 +622,8 @@ func (tc *Controller) Run(ctx context.Context) error { if !ok { logger.Error(nil, "Expected ResourceSlice", "actual", fmt.Sprintf("%T", newObj)) } - mutex.Lock() - defer mutex.Unlock() + tc.mutex.Lock() + defer tc.mutex.Unlock() tc.handleSliceChange(oldSlice, newSlice) }, DeleteFunc: func(obj any) { @@ -545,8 +633,8 @@ func (tc *Controller) Run(ctx context.Context) error { logger.Error(nil, "Expected ResourceSlice", "actual", fmt.Sprintf("%T", obj)) return } - mutex.Lock() - defer mutex.Unlock() + tc.mutex.Lock() + defer tc.mutex.Unlock() tc.handleSliceChange(slice, nil) }, }) @@ -565,10 +653,50 @@ func (tc *Controller) Run(ctx context.Context) error { logger.V(1).Info("Underlying informers have synced") tc.hasSynced.Store(1) + for i := range numWorkers { + wg.Go(func() { + tc.worker(klog.NewContext(ctx, klog.LoggerWithName(queueLogger, fmt.Sprintf("worker-%d", i)))) + }) + } + <-ctx.Done() return nil } +// worker blocks until the workqueue is shut down. +// Cancellation of the context only aborts on-going work. +func (tc *Controller) worker(ctx context.Context) { + logger := klog.FromContext(ctx) + defer utilruntime.HandleCrashWithLogger(logger) + + for { + item, shutdown := tc.workqueue.Get() + if shutdown { + return + } + + func() { + defer tc.workqueue.Done(item) + + var err error + var againAfter time.Duration + if item.podRef.Name != "" { + againAfter, err = tc.maybeDeletePod(ctx, item.podRef) + } + switch { + case err != nil: + logger.V(3).Info("Evicting pod failed, will retry", "err", err) + tc.workqueue.AddRateLimited(item) + case againAfter > 0: + logger.V(5).Info("Checking pod eviction again later", "delay", againAfter) + tc.workqueue.AddAfter(item, againAfter) + default: + tc.workqueue.Forget(item) + } + }() + } +} + func (tc *Controller) handleClaimChange(oldClaim, newClaim *resourceapi.ResourceClaim) { claim := newClaim if claim == nil { diff --git a/pkg/controller/devicetainteviction/device_taint_eviction_test.go b/pkg/controller/devicetainteviction/device_taint_eviction_test.go index 9795925ad40..fdf616d0d75 100644 --- a/pkg/controller/devicetainteviction/device_taint_eviction_test.go +++ b/pkg/controller/devicetainteviction/device_taint_eviction_test.go @@ -1593,22 +1593,45 @@ func testCancelEviction(tCtx ktesting.TContext, deletePod bool) { slice := sliceTainted.DeepCopy() slice.Spec.Devices[0].Taints[0].TimeAdded = &metav1.Time{Time: time.Now()} claim := inUseClaim.DeepCopy() + tolerationSeconds := int64(60) claim.Status.Allocation.Devices.Results[0].Tolerations = []resourceapi.DeviceToleration{{ Operator: resourceapi.DeviceTolerationOpExists, Effect: resourceapi.DeviceTaintEffectNoExecute, - TolerationSeconds: ptr.To(int64(60)), + TolerationSeconds: &tolerationSeconds, }} fakeClientset := fake.NewSimpleClientset( slice, claim, pod, ) - tCtx = ktesting.WithClients(tCtx, nil, nil, fakeClientset, nil, nil) - pod, err := fakeClientset.CoreV1().Pods(pod.Namespace).Get(tCtx, pod.Name, metav1.GetOptions{}) require.NoError(tCtx, err, "get pod before eviction") assert.Equal(tCtx, podWithClaimName, pod, "test pod") + var podGets int + var podUpdates int + var podDeletions int + + fakeClientset.PrependReactor("get", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { + podGets++ + podName := action.(core.GetAction).GetName() + assert.Equal(tCtx, podWithClaimName.Name, podName, "name of pod to patch") + return false, nil, nil + }) + fakeClientset.PrependReactor("patch", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { + podUpdates++ + podName := action.(core.PatchAction).GetName() + assert.Equal(tCtx, podWithClaimName.Name, podName, "name of pod to get") + return false, nil, nil + }) + fakeClientset.PrependReactor("delete", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { + podDeletions++ + podName := action.(core.DeleteAction).GetName() + assert.Equal(tCtx, podWithClaimName.Name, podName, "name of pod to delete") + return false, nil, nil + }) + + tCtx = ktesting.WithClients(tCtx, nil, nil, fakeClientset, nil, nil) controller := newTestController(tCtx, fakeClientset) var mutex sync.Mutex @@ -1666,10 +1689,29 @@ func testCancelEviction(tCtx ktesting.TContext, deletePod bool) { ktesting.Eventually(tCtx, listEvents).WithTimeout(30 * time.Second).Should(matchCancellationEvent()) } tCtx.Wait() + matchEvents := matchCancellationEvent() if deletePod { matchEvents = gomega.BeEmpty() + assert.Equal(tCtx, 1, podDeletions, "Pod should have been deleted exactly once by test.") + } else { + assert.Equal(tCtx, 0, podDeletions, "Pod should not have been deleted.") } + + // Naively (?) one could expect synctest.Wait to have blocked until the work item added via AddAfter + // got processed because before that the overall state isn't stable yet. But the workqueue package + // seems to implement AddAfter in a way which is not detected as "blocking on time to pass" by + // by synctest and therefore it returns without advancing time enough. + // + // Here we trigger that manually as a workaround (?). The factor doesn't really matter. + // Commenting this out causes the controller.maybeDeletePodCount check to fail. + time.Sleep(10 * time.Duration(tolerationSeconds) * time.Second) + tCtx.Wait() + + assert.Equal(tCtx, 0, podGets, "Worker should not have needed to get the pod.") + assert.Equal(tCtx, 0, podUpdates, "Worker should not have needed to update the pod.") + assert.Equal(tCtx, 0, controller.workqueue.Len(), "Work queue should be empty now.") + assert.Equal(tCtx, int64(1), controller.maybeDeletePodCount, "Work queue should have processed pod.") gomega.NewWithT(tCtx).Expect(listEvents(tCtx)).Should(matchEvents) tCtx.ExpectNoError(testPodDeletionsMetrics(controller, 0)) } @@ -1822,78 +1864,6 @@ func TestRetry(t *testing.T) { }) } -// TestRetry covers the scenario that an eviction attempt fails. -func TestEvictionFailure(t *testing.T) { - tCtx := ktesting.Init(t) - - tCtx.SyncTest("", func(tCtx ktesting.TContext) { - // This scenario is the same as "evict-pod-resourceclaim" above. - pod := podWithClaimName.DeepCopy() - fakeClientset := fake.NewSimpleClientset( - sliceTainted, - slice2, - inUseClaim, - pod, - ) - tCtx = ktesting.WithClients(tCtx, nil, nil, fakeClientset, nil, nil) - - pod, err := fakeClientset.CoreV1().Pods(pod.Namespace).Get(tCtx, pod.Name, metav1.GetOptions{}) - require.NoError(tCtx, err, "get pod before eviction") - assert.Equal(tCtx, podWithClaimName, pod, "test pod") - - var mutex sync.Mutex - var podGets int - var podDeletions int - - fakeClientset.PrependReactor("get", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { - mutex.Lock() - defer mutex.Unlock() - podGets++ - podName := action.(core.GetAction).GetName() - assert.Equal(t, podWithClaimName.Name, podName, "name of patched pod") - return false, nil, nil - }) - fakeClientset.PrependReactor("delete", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { - mutex.Lock() - defer mutex.Unlock() - podDeletions++ - podName := action.(core.DeleteAction).GetName() - assert.Equal(t, podWithClaimName.Name, podName, "name of deleted pod") - return true, nil, apierrors.NewInternalError(errors.New("fake error")) - }) - controller := newTestController(tCtx, fakeClientset) - - var wg sync.WaitGroup - defer func() { - t.Log("Waiting for goroutine termination...") - tCtx.Cancel("time to stop") - wg.Wait() - }() - wg.Add(1) - go func() { - defer wg.Done() - assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed") - }() - - // Block until eviction has started. - // Eventually deletion is attempted a few times. - ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) int { - mutex.Lock() - defer mutex.Unlock() - return podDeletions - }).WithTimeout(30*time.Second).Should(gomega.BeNumerically(">=", retries), "pod eviction failed") - - // Now we can check the API calls. - // The background goroutined must be done when Wait returns, - // otherwise Wait wouldn't return. - tCtx.Wait() - assert.Equal(tCtx, retries, podGets, "number of pod get calls") - assert.Equal(tCtx, retries, podDeletions, "number of pod delete calls") - gomega.NewWithT(tCtx).Expect(listEvents(tCtx)).Should(matchDeletionEvent()) - tCtx.ExpectNoError(testPodDeletionsMetrics(controller, 0)) - }) -} - // BenchTaintUntaint checks the full flow of detecting a claim as // tainted because of a new DeviceTaintRule, starting to evict its // consumer, and then undoing that when the DeviceTaintRule is removed.