diff --git a/pkg/controller/resourceclaim/controller.go b/pkg/controller/resourceclaim/controller.go index 2d7d33e0c20..eb87e217656 100644 --- a/pkg/controller/resourceclaim/controller.go +++ b/pkg/controller/resourceclaim/controller.go @@ -32,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" corev1apply "k8s.io/client-go/applyconfigurations/core/v1" v1informers "k8s.io/client-go/informers/core/v1" @@ -53,9 +54,12 @@ import ( ) const ( - // podResourceClaimIndex is the lookup name for the index function which indexes by pod ResourceClaim templates. + // podResourceClaimIndex is the lookup name for the index function which indexes by pod ResourceClaim. podResourceClaimIndex = "pod-resource-claim-index" + // podResourceClaimTemplateIndexKey is the lookup name for the index function which indexes only by pod ResourceClaim templates. + podResourceClaimTemplateIndexKey = "pod-resource-claim-template-index" + // podResourceClaimAnnotation is the special annotation that generated // ResourceClaims get. Its value is the pod.spec.resourceClaims[].name // for which it was generated. This is used only inside the controller @@ -170,24 +174,43 @@ func NewController( } if _, err := claimInformer.Informer().AddEventHandlerWithOptions(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - logger.V(6).Info("new claim", "claimDump", obj) + logger.V(6).Info("New claim", "claimDump", obj) ec.enqueueResourceClaim(logger, nil, obj) }, UpdateFunc: func(old, updated interface{}) { - logger.V(6).Info("updated claim", "claimDump", updated) + logger.V(6).Info("Updated claim", "claimDump", updated) ec.enqueueResourceClaim(logger, old, updated) }, DeleteFunc: func(obj interface{}) { - logger.V(6).Info("deleted claim", "claimDump", obj) + logger.V(6).Info("Deleted claim", "claimDump", obj) ec.enqueueResourceClaim(logger, obj, nil) }, }, cache.HandlerOptions{Logger: &logger}); err != nil { return nil, err } + if _, err := templateInformer.Informer().AddEventHandlerWithOptions(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + logger.V(6).Info("New claim template", "claimTemplateDump", obj) + ec.enqueueResourceClaimTemplate(logger, obj) + }, + UpdateFunc: func(old, updated interface{}) { + logger.V(6).Info("Updated claim template", "claimTemplateDump", updated) + ec.enqueueResourceClaimTemplate(logger, updated) + }, + DeleteFunc: func(obj interface{}) { + logger.V(6).Info("Deleted claim template", "claimTemplateDump", obj) + }, + }, cache.HandlerOptions{Logger: &logger}); err != nil { + return nil, err + } if err := ec.podIndexer.AddIndexers(cache.Indexers{podResourceClaimIndex: podResourceClaimIndexFunc}); err != nil { return nil, fmt.Errorf("could not initialize ResourceClaim controller: %w", err) } + if err := ec.podIndexer.AddIndexers(cache.Indexers{podResourceClaimTemplateIndexKey: podResourceClaimTemplateIndexFunc}); err != nil { + return nil, fmt.Errorf("could not initialize ResourceClaim controller: %w", err) + } + // The mutation cache acts as an additional layer for the informer // cache and after a create made by the controller returns that // object until the informer catches up. That is necessary @@ -213,6 +236,47 @@ func NewController( return ec, nil } +func (ec *Controller) enqueueResourceClaimTemplate(logger klog.Logger, obj interface{}) { + if d, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = d.Obj + } + template, ok := obj.(*resourceapi.ResourceClaimTemplate) + if !ok { + // Not a template,return + runtime.HandleErrorWithLogger(logger, nil, "EnqueueResourceClaimTemplate called for unexpected object", "type", fmt.Sprintf("%T", obj)) + return + } + + logger.V(6).Info("ResourceClaimTemplate added or updated", "resourceClaimTemplate", klog.KObj(template)) + + // Enqueue all pods with this template name + objects, err := ec.podIndexer.ByIndex(podResourceClaimTemplateIndexKey, fmt.Sprintf("%s/%s", template.Namespace, template.Name)) + if err != nil { + runtime.HandleErrorWithLogger(logger, err, "Unable to list pods for claim template", "resourceClaimTemplate", klog.KObj(template)) + return + } + if len(objects) == 0 { + logger.V(6).Info("ResourceClaimTemplate change unrelated to any known pod", "resourceClaimTemplate", klog.KObj(template)) + return + } + + for _, object := range objects { + pod, ok1 := object.(*v1.Pod) + if !ok1 { + // Not a pod?! + runtime.HandleErrorWithLogger(logger, nil, "EnqueueResourceClaimTemplate called for unexpected object", "type", fmt.Sprintf("%T", obj)) + return + } + logger.V(4).Info( + "Enqueuing pod due to ResourceClaim change", + "resourceClaimTemplate", klog.KObj(template), + "pod", klog.KObj(pod), + ) + ec.enqueuePod(logger, object, false) + } + +} + func (ec *Controller) enqueuePod(logger klog.Logger, obj interface{}, deleted bool) { if d, ok := obj.(cache.DeletedFinalStateUnknown); ok { obj = d.Obj @@ -220,7 +284,7 @@ func (ec *Controller) enqueuePod(logger klog.Logger, obj interface{}, deleted bo pod, ok := obj.(*v1.Pod) if !ok { // Not a pod?! - logger.Error(nil, "enqueuePod called for unexpected object", "type", fmt.Sprintf("%T", obj)) + logger.Error(nil, "EnqueuePod called for unexpected object", "type", fmt.Sprintf("%T", obj)) return } @@ -238,11 +302,11 @@ func (ec *Controller) enqueuePod(logger klog.Logger, obj interface{}, deleted bo } if deleted { - logger.V(6).Info("pod got deleted", "pod", klog.KObj(pod)) + logger.V(6).Info("Pod got deleted", "pod", klog.KObj(pod)) ec.deletedObjects.Add(pod.UID) } - logger.V(6).Info("pod with resource claims changed", "pod", klog.KObj(pod), "deleted", deleted) + logger.V(6).Info("Pod with resource claims changed", "pod", klog.KObj(pod), "deleted", deleted) // Release reservations of a deleted or completed pod? needsClaims, reason := podNeedsClaims(pod, deleted) @@ -282,11 +346,11 @@ func (ec *Controller) enqueuePod(logger klog.Logger, obj interface{}, deleted bo needsWork, reason := ec.podNeedsWork(pod) if needsWork { - logger.V(6).Info("enqueing pod", "pod", klog.KObj(pod), "reason", reason) + logger.V(6).Info("Enqueueing pod", "pod", klog.KObj(pod), "reason", reason) ec.queue.Add(podKeyPrefix + pod.Namespace + "/" + pod.Name) return } - logger.V(6).Info("not enqueing pod", "pod", klog.KObj(pod), "reason", reason) + logger.V(6).Info("Not enqueueing pod", "pod", klog.KObj(pod), "reason", reason) } func podNeedsClaims(pod *v1.Pod, deleted bool) (bool, string) { @@ -388,21 +452,21 @@ func (ec *Controller) enqueueResourceClaim(logger klog.Logger, oldObj, newObj in // stale pods in ReservedFor. During an update, a pod might get added // that already no longer exists. key := claimKeyPrefix + claim.Namespace + "/" + claim.Name - logger.V(6).Info("enqueing new or updated claim", "claim", klog.KObj(claim), "key", key) + logger.V(6).Info("Enqueueing new or updated claim", "claim", klog.KObj(claim), "key", key) ec.queue.Add(key) } else { - logger.V(6).Info("not enqueing deleted claim", "claim", klog.KObj(claim)) + logger.V(6).Info("Not enqueueing deleted claim", "claim", klog.KObj(claim)) } // Also check whether this causes work for any of the currently // known pods which use the ResourceClaim. objs, err := ec.podIndexer.ByIndex(podResourceClaimIndex, fmt.Sprintf("%s/%s", claim.Namespace, claim.Name)) if err != nil { - logger.Error(err, "listing pods from cache") + logger.Error(err, "Failed to list pods from cache") return } if len(objs) == 0 { - logger.V(6).Info("unrelated to any known pod", "claim", klog.KObj(claim)) + logger.V(6).Info("ResourceClaim change unrelated to any known pod", "claim", klog.KObj(claim)) return } for _, obj := range objs { @@ -495,7 +559,7 @@ func (ec *Controller) syncPod(ctx context.Context, namespace, name string) error pod, err := ec.podLister.Pods(namespace).Get(name) if err != nil { if apierrors.IsNotFound(err) { - logger.V(5).Info("nothing to do for pod, it is gone") + logger.V(5).Info("Nothing to do for pod, it is gone") return nil } return err @@ -503,7 +567,7 @@ func (ec *Controller) syncPod(ctx context.Context, namespace, name string) error // Ignore pods which are already getting deleted. if pod.DeletionTimestamp != nil { - logger.V(5).Info("nothing to do for pod, it is marked for deletion") + logger.V(5).Info("Nothing to do for pod, it is marked for deletion") return nil } @@ -532,7 +596,7 @@ func (ec *Controller) syncPod(ctx context.Context, namespace, name string) error if pod.Spec.NodeName == "" { // Scheduler will handle reservations. - logger.V(5).Info("nothing to do for pod, scheduler will deal with it") + logger.V(5).Info("Nothing to do for pod, scheduler will deal with it") return nil } @@ -561,7 +625,7 @@ func (ec *Controller) syncPod(ctx context.Context, namespace, name string) error if claim.Status.Allocation != nil && !resourceclaim.IsReservedForPod(pod, claim) && resourceclaim.CanBeReserved(claim) { - logger.V(5).Info("reserve claim for pod", "resourceClaim", klog.KObj(claim)) + logger.V(5).Info("Reserve claim for pod", "resourceClaim", klog.KObj(claim)) if err := ec.reserveForPod(ctx, pod, claim); err != nil { return err } @@ -575,7 +639,7 @@ func (ec *Controller) syncPod(ctx context.Context, namespace, name string) error func (ec *Controller) handleClaim(ctx context.Context, pod *v1.Pod, podClaim v1.PodResourceClaim, newPodClaims *map[string]string) error { logger := klog.LoggerWithValues(klog.FromContext(ctx), "podClaim", podClaim.Name) ctx = klog.NewContext(ctx, logger) - logger.V(5).Info("checking", "podClaim", podClaim.Name) + logger.V(5).Info("Checking", "podClaim", podClaim.Name) // resourceclaim.Name checks for the situation that the client doesn't // know some future addition to the API. Therefore it gets called here @@ -606,10 +670,10 @@ func (ec *Controller) handleClaim(ctx context.Context, pod *v1.Pod, podClaim v1. } if err == nil { // Already created, nothing more to do. - logger.V(5).Info("claim already created", "podClaim", podClaim.Name, "resourceClaim", claimName) + logger.V(5).Info("Claim already created", "podClaim", podClaim.Name, "resourceClaim", claimName) return nil } - logger.Error(err, "claim that was created for the pod is no longer owned by the pod, creating a new one", "podClaim", podClaim.Name, "resourceClaim", claimName) + logger.Error(err, "Claim that was created for the pod is no longer owned by the pod, creating a new one", "podClaim", podClaim.Name, "resourceClaim", claimName) } } @@ -771,7 +835,7 @@ func (ec *Controller) syncClaim(ctx context.Context, namespace, name string) err claim, err := ec.claimLister.ResourceClaims(namespace).Get(name) if err != nil { if apierrors.IsNotFound(err) { - logger.V(5).Info("nothing to do for claim, it is gone") + logger.V(5).Info("Nothing to do for claim, it is gone") return nil } return err @@ -815,14 +879,14 @@ func (ec *Controller) syncClaim(ctx context.Context, namespace, name string) err return err } if pod == nil || pod.UID != reservedFor.UID { - logger.V(6).Info("remove reservation because pod is gone or got replaced", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name)) + logger.V(6).Info("Remove reservation because pod is gone or got replaced", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name)) keepEntry = false } case pod.UID != reservedFor.UID: - logger.V(6).Info("remove reservation because pod got replaced with new instance", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name)) + logger.V(6).Info("Remove reservation because pod got replaced with new instance", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name)) keepEntry = false case isPodDone(pod): - logger.V(6).Info("remove reservation because pod will not run anymore", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name)) + logger.V(6).Info("Remove reservation because pod will not run anymore", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name)) keepEntry = false } } @@ -838,7 +902,7 @@ func (ec *Controller) syncClaim(ctx context.Context, namespace, name string) err } builtinControllerFinalizer := slices.Index(claim.Finalizers, resourceapi.Finalizer) - logger.V(5).Info("claim reserved for counts", "currentCount", len(claim.Status.ReservedFor), "claim", klog.KRef(namespace, name), "updatedCount", len(valid), "builtinController", builtinControllerFinalizer >= 0) + logger.V(5).Info("Claim reserved for counts", "currentCount", len(claim.Status.ReservedFor), "claim", klog.KRef(namespace, name), "updatedCount", len(valid), "builtinController", builtinControllerFinalizer >= 0) if len(valid) < len(claim.Status.ReservedFor) { // This is not using a patch because we want the update to fail if anything // changed in the meantime. @@ -921,23 +985,23 @@ func (ec *Controller) syncClaim(ctx context.Context, namespace, name string) err if pod.UID != podUID || isPodDone(pod) { // We are certain that the owning pod is not going to need // the claim and therefore remove the claim. - logger.V(5).Info("deleting unused generated claim", "claim", klog.KObj(claim), "pod", klog.KObj(pod)) + logger.V(5).Info("Deleting unused generated claim", "claim", klog.KObj(claim), "pod", klog.KObj(pod)) err := ec.kubeClient.ResourceV1().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{}) if err != nil { return fmt.Errorf("delete claim %s: %w", klog.KObj(claim), err) } } else { - logger.V(6).Info("wrong pod content, not deleting claim", "claim", klog.KObj(claim), "podUID", podUID, "podContent", pod) + logger.V(6).Info("Wrong pod content, not deleting claim", "claim", klog.KObj(claim), "podUID", podUID, "podContent", pod) } case apierrors.IsNotFound(err): // We might not know the pod *yet*. Instead of doing an expensive API call, // let the garbage collector handle the case that the pod is truly gone. - logger.V(5).Info("pod for claim not found", "claim", klog.KObj(claim), "pod", klog.KRef(claim.Namespace, podName)) + logger.V(5).Info("Pod for claim not found", "claim", klog.KObj(claim), "pod", klog.KRef(claim.Namespace, podName)) default: return fmt.Errorf("lookup pod: %v", err) } } else { - logger.V(5).Info("claim not generated for a pod", "claim", klog.KObj(claim)) + logger.V(5).Info("Claim not generated for a pod", "claim", klog.KObj(claim)) } } @@ -955,6 +1019,24 @@ func owningPod(claim *resourceapi.ResourceClaim) (string, types.UID) { return "", "" } +func podResourceClaimTemplateIndexFunc(obj interface{}) ([]string, error) { + pod, ok := obj.(*v1.Pod) + if !ok { + return []string{}, nil + } + + keySet := sets.NewString() + + for _, podClaim := range pod.Spec.ResourceClaims { + if podClaim.ResourceClaimTemplateName != nil { + resourceTemplate := *podClaim.ResourceClaimTemplateName + keySet.Insert(fmt.Sprintf("%s/%s", pod.Namespace, resourceTemplate)) + } + } + + return keySet.List(), nil +} + // podResourceClaimIndexFunc is an index function that returns ResourceClaim keys (= // namespace/name) for ResourceClaim or ResourceClaimTemplates in a given pod. func podResourceClaimIndexFunc(obj interface{}) ([]string, error) { diff --git a/pkg/controller/resourceclaim/controller_test.go b/pkg/controller/resourceclaim/controller_test.go index 33e207effd7..8208fb6b328 100644 --- a/pkg/controller/resourceclaim/controller_test.go +++ b/pkg/controller/resourceclaim/controller_test.go @@ -70,6 +70,7 @@ var ( testClaimReserved = reserveClaim(testClaimAllocated, testPodWithResource) testClaimReservedTwice = reserveClaim(testClaimReserved, otherTestPod) testClaimKey = claimKeyPrefix + testClaim.Namespace + "/" + testClaim.Name + testPodKey = podKeyPrefix + testNamespace + "/" + testPodName templatedTestClaim = makeTemplatedClaim(podResourceClaimName, testPodName+"-"+podResourceClaimName+"-", testNamespace, className, 1, makeOwnerReference(testPodWithResource, true), nil) templatedTestClaimAllocated = allocateClaim(templatedTestClaim) @@ -498,6 +499,139 @@ func TestSyncHandler(t *testing.T) { } } +func TestResourceClaimTemplateEventHandler(t *testing.T) { + tCtx := ktesting.Init(t) + tCtx = ktesting.WithCancel(tCtx) + + fakeKubeClient := createTestClient() + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) + podInformer := informerFactory.Core().V1().Pods() + claimInformer := informerFactory.Resource().V1().ResourceClaims() + templateInformer := informerFactory.Resource().V1().ResourceClaimTemplates() + claimTemplateClient := fakeKubeClient.ResourceV1().ResourceClaimTemplates(testNamespace) + claimTemplateTmpClient := fakeKubeClient.ResourceV1().ResourceClaimTemplates("tmp") + podClient := fakeKubeClient.CoreV1().Pods(testNamespace) + podTmpClient := fakeKubeClient.CoreV1().Pods("tmp") + + ec, err := NewController(tCtx.Logger(), Features{}, fakeKubeClient, podInformer, claimInformer, templateInformer) + tCtx.ExpectNoError(err, "creating ephemeral controller") + + informerFactory.Start(tCtx.Done()) + stopInformers := func() { + tCtx.Cancel("stopping informers") + informerFactory.Shutdown() + } + defer stopInformers() + + expectQueue := func(tCtx ktesting.TContext, expectedKeys []string, expectedIndexerKeys []string) { + g := gomega.NewWithT(tCtx) + tCtx.Helper() + + lenDiffMessage := func() string { + actualKeys := []string{} + for ec.queue.Len() > 0 { + actual, _ := ec.queue.Get() + actualKeys = append(actualKeys, actual) + ec.queue.Forget(actual) + ec.queue.Done(actual) + } + return "Workqueue does not contain expected number of elements\n" + + "Diff of elements (- expected, + actual):\n" + + diff.Diff(expectedKeys, actualKeys) + } + + g.Eventually(ec.queue.Len). + WithTimeout(5*time.Second). + Should(gomega.Equal(len(expectedKeys)), lenDiffMessage) + g.Consistently(ec.queue.Len). + WithTimeout(1*time.Second). + Should(gomega.Equal(len(expectedKeys)), lenDiffMessage) + + g.Eventually(func() int { return len(ec.podIndexer.ListIndexFuncValues(podResourceClaimTemplateIndexKey)) }). + WithTimeout(5 * time.Second). + Should(gomega.Equal(len(expectedIndexerKeys))) + g.Consistently(func() int { return len(ec.podIndexer.ListIndexFuncValues(podResourceClaimTemplateIndexKey)) }). + WithTimeout(1 * time.Second). + Should(gomega.Equal(len(expectedIndexerKeys))) + + for _, expected := range expectedKeys { + actual, shuttingDown := ec.queue.Get() + g.Expect(shuttingDown).To(gomega.BeFalseBecause("workqueue is unexpectedly shutting down")) + g.Expect(actual).To(gomega.Equal(expected)) + ec.queue.Forget(actual) + ec.queue.Done(actual) + } + + for _, src := range expectedIndexerKeys { + objects, err := ec.podIndexer.ByIndex(podResourceClaimTemplateIndexKey, src) + g.Expect(err).NotTo(gomega.HaveOccurred(), "should not error when getting objects by index for key %s", src) + g.Expect(objects).NotTo(gomega.BeEmpty(), "should have at least one object indexed for key %s", src) + + // Verify that the indexed objects are the expected pods + found := false + for _, obj := range objects { + pod, ok := obj.(*v1.Pod) + if !ok { + continue + } + // Check if this pod matches the expected template reference + for _, claim := range pod.Spec.ResourceClaims { + if claim.ResourceClaimTemplateName != nil { + // Build the expected index key for this pod + expectedKey := pod.Namespace + "/" + *claim.ResourceClaimTemplateName + if expectedKey == src { + found = true + break + } + } + } + } + g.Expect(found).To(gomega.BeTrueBecause("should find a pod with template %s in index for key %s", templateName, src)) + } + } + + tmpNamespace := "tmp" + + expectQueue(tCtx, []string{}, []string{}) + + // Create two pods: + // - testPodWithResource in the my-namespace namespace + // - fake-1 in the tmp namespace + _, err = podClient.Create(tCtx, testPodWithResource, metav1.CreateOptions{}) + _, err1 := podTmpClient.Create(tCtx, makePod("fake-1", tmpNamespace, "uidpod2", *makePodResourceClaim(podResourceClaimName, templateName)), metav1.CreateOptions{}) + ktesting.Step(tCtx, "create pod", func(tCtx ktesting.TContext) { + tCtx.ExpectNoError(err) + tCtx.ExpectNoError(err1) + expectQueue(tCtx, []string{testPodKey, podKeyPrefix + tmpNamespace + "/" + "fake-1"}, []string{testNamespace + "/" + templateName, tmpNamespace + "/" + templateName}) + }) + + // The item has been forgotten and marked as done in the workqueue,so queue is nil + ktesting.Step(tCtx, "expect queue is nil", func(tCtx ktesting.TContext) { + expectQueue(tCtx, []string{}, []string{testNamespace + "/" + templateName, tmpNamespace + "/" + templateName}) + }) + + // After create claim template,queue should have test pod key + _, err = claimTemplateClient.Create(tCtx, template, metav1.CreateOptions{}) + ktesting.Step(tCtx, "create claim template after pod backoff", func(tCtx ktesting.TContext) { + tCtx.ExpectNoError(err) + expectQueue(tCtx, []string{testPodKey}, []string{testNamespace + "/" + templateName, tmpNamespace + "/" + templateName}) + }) + + // The item has been forgotten and marked as done in the workqueue,so queue is nil + ktesting.Step(tCtx, "expect queue is nil", func(tCtx ktesting.TContext) { + expectQueue(tCtx, []string{}, []string{testNamespace + "/" + templateName, tmpNamespace + "/" + templateName}) + }) + + // After create tmp namespace claim template,queue should have fake pod key + TmpNamespaceTemplate := makeTemplate(templateName, "tmp", className, nil) + _, err = claimTemplateTmpClient.Create(tCtx, TmpNamespaceTemplate, metav1.CreateOptions{}) + ktesting.Step(tCtx, "create claim template in tmp namespace after pod backoff in test namespace", func(tCtx ktesting.TContext) { + tCtx.ExpectNoError(err) + expectQueue(tCtx, []string{podKeyPrefix + tmpNamespace + "/" + "fake-1"}, []string{testNamespace + "/" + templateName, tmpNamespace + "/" + templateName}) + }) + +} + func TestResourceClaimEventHandler(t *testing.T) { tCtx := ktesting.Init(t) tCtx = ktesting.WithCancel(tCtx)