Merge pull request #135983 from Goend/master

Fix the issue of slow creation of ResourceClaim in specific scenarios
This commit is contained in:
Kubernetes Prow Robot 2026-01-07 20:05:46 +05:30 committed by GitHub
commit 18663b347e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 245 additions and 29 deletions

View file

@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
corev1apply "k8s.io/client-go/applyconfigurations/core/v1"
v1informers "k8s.io/client-go/informers/core/v1"
@ -53,9 +54,12 @@ import (
)
const (
// podResourceClaimIndex is the lookup name for the index function which indexes by pod ResourceClaim templates.
// podResourceClaimIndex is the lookup name for the index function which indexes by pod ResourceClaim.
podResourceClaimIndex = "pod-resource-claim-index"
// podResourceClaimTemplateIndexKey is the lookup name for the index function which indexes only by pod ResourceClaim templates.
podResourceClaimTemplateIndexKey = "pod-resource-claim-template-index"
// podResourceClaimAnnotation is the special annotation that generated
// ResourceClaims get. Its value is the pod.spec.resourceClaims[].name
// for which it was generated. This is used only inside the controller
@ -170,24 +174,43 @@ func NewController(
}
if _, err := claimInformer.Informer().AddEventHandlerWithOptions(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
logger.V(6).Info("new claim", "claimDump", obj)
logger.V(6).Info("New claim", "claimDump", obj)
ec.enqueueResourceClaim(logger, nil, obj)
},
UpdateFunc: func(old, updated interface{}) {
logger.V(6).Info("updated claim", "claimDump", updated)
logger.V(6).Info("Updated claim", "claimDump", updated)
ec.enqueueResourceClaim(logger, old, updated)
},
DeleteFunc: func(obj interface{}) {
logger.V(6).Info("deleted claim", "claimDump", obj)
logger.V(6).Info("Deleted claim", "claimDump", obj)
ec.enqueueResourceClaim(logger, obj, nil)
},
}, cache.HandlerOptions{Logger: &logger}); err != nil {
return nil, err
}
if _, err := templateInformer.Informer().AddEventHandlerWithOptions(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
logger.V(6).Info("New claim template", "claimTemplateDump", obj)
ec.enqueueResourceClaimTemplate(logger, obj)
},
UpdateFunc: func(old, updated interface{}) {
logger.V(6).Info("Updated claim template", "claimTemplateDump", updated)
ec.enqueueResourceClaimTemplate(logger, updated)
},
DeleteFunc: func(obj interface{}) {
logger.V(6).Info("Deleted claim template", "claimTemplateDump", obj)
},
}, cache.HandlerOptions{Logger: &logger}); err != nil {
return nil, err
}
if err := ec.podIndexer.AddIndexers(cache.Indexers{podResourceClaimIndex: podResourceClaimIndexFunc}); err != nil {
return nil, fmt.Errorf("could not initialize ResourceClaim controller: %w", err)
}
if err := ec.podIndexer.AddIndexers(cache.Indexers{podResourceClaimTemplateIndexKey: podResourceClaimTemplateIndexFunc}); err != nil {
return nil, fmt.Errorf("could not initialize ResourceClaim controller: %w", err)
}
// The mutation cache acts as an additional layer for the informer
// cache and after a create made by the controller returns that
// object until the informer catches up. That is necessary
@ -213,6 +236,47 @@ func NewController(
return ec, nil
}
func (ec *Controller) enqueueResourceClaimTemplate(logger klog.Logger, obj interface{}) {
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = d.Obj
}
template, ok := obj.(*resourceapi.ResourceClaimTemplate)
if !ok {
// Not a template,return
runtime.HandleErrorWithLogger(logger, nil, "EnqueueResourceClaimTemplate called for unexpected object", "type", fmt.Sprintf("%T", obj))
return
}
logger.V(6).Info("ResourceClaimTemplate added or updated", "resourceClaimTemplate", klog.KObj(template))
// Enqueue all pods with this template name
objects, err := ec.podIndexer.ByIndex(podResourceClaimTemplateIndexKey, fmt.Sprintf("%s/%s", template.Namespace, template.Name))
if err != nil {
runtime.HandleErrorWithLogger(logger, err, "Unable to list pods for claim template", "resourceClaimTemplate", klog.KObj(template))
return
}
if len(objects) == 0 {
logger.V(6).Info("ResourceClaimTemplate change unrelated to any known pod", "resourceClaimTemplate", klog.KObj(template))
return
}
for _, object := range objects {
pod, ok1 := object.(*v1.Pod)
if !ok1 {
// Not a pod?!
runtime.HandleErrorWithLogger(logger, nil, "EnqueueResourceClaimTemplate called for unexpected object", "type", fmt.Sprintf("%T", obj))
return
}
logger.V(4).Info(
"Enqueuing pod due to ResourceClaim change",
"resourceClaimTemplate", klog.KObj(template),
"pod", klog.KObj(pod),
)
ec.enqueuePod(logger, object, false)
}
}
func (ec *Controller) enqueuePod(logger klog.Logger, obj interface{}, deleted bool) {
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = d.Obj
@ -220,7 +284,7 @@ func (ec *Controller) enqueuePod(logger klog.Logger, obj interface{}, deleted bo
pod, ok := obj.(*v1.Pod)
if !ok {
// Not a pod?!
logger.Error(nil, "enqueuePod called for unexpected object", "type", fmt.Sprintf("%T", obj))
logger.Error(nil, "EnqueuePod called for unexpected object", "type", fmt.Sprintf("%T", obj))
return
}
@ -238,11 +302,11 @@ func (ec *Controller) enqueuePod(logger klog.Logger, obj interface{}, deleted bo
}
if deleted {
logger.V(6).Info("pod got deleted", "pod", klog.KObj(pod))
logger.V(6).Info("Pod got deleted", "pod", klog.KObj(pod))
ec.deletedObjects.Add(pod.UID)
}
logger.V(6).Info("pod with resource claims changed", "pod", klog.KObj(pod), "deleted", deleted)
logger.V(6).Info("Pod with resource claims changed", "pod", klog.KObj(pod), "deleted", deleted)
// Release reservations of a deleted or completed pod?
needsClaims, reason := podNeedsClaims(pod, deleted)
@ -282,11 +346,11 @@ func (ec *Controller) enqueuePod(logger klog.Logger, obj interface{}, deleted bo
needsWork, reason := ec.podNeedsWork(pod)
if needsWork {
logger.V(6).Info("enqueing pod", "pod", klog.KObj(pod), "reason", reason)
logger.V(6).Info("Enqueueing pod", "pod", klog.KObj(pod), "reason", reason)
ec.queue.Add(podKeyPrefix + pod.Namespace + "/" + pod.Name)
return
}
logger.V(6).Info("not enqueing pod", "pod", klog.KObj(pod), "reason", reason)
logger.V(6).Info("Not enqueueing pod", "pod", klog.KObj(pod), "reason", reason)
}
func podNeedsClaims(pod *v1.Pod, deleted bool) (bool, string) {
@ -388,21 +452,21 @@ func (ec *Controller) enqueueResourceClaim(logger klog.Logger, oldObj, newObj in
// stale pods in ReservedFor. During an update, a pod might get added
// that already no longer exists.
key := claimKeyPrefix + claim.Namespace + "/" + claim.Name
logger.V(6).Info("enqueing new or updated claim", "claim", klog.KObj(claim), "key", key)
logger.V(6).Info("Enqueueing new or updated claim", "claim", klog.KObj(claim), "key", key)
ec.queue.Add(key)
} else {
logger.V(6).Info("not enqueing deleted claim", "claim", klog.KObj(claim))
logger.V(6).Info("Not enqueueing deleted claim", "claim", klog.KObj(claim))
}
// Also check whether this causes work for any of the currently
// known pods which use the ResourceClaim.
objs, err := ec.podIndexer.ByIndex(podResourceClaimIndex, fmt.Sprintf("%s/%s", claim.Namespace, claim.Name))
if err != nil {
logger.Error(err, "listing pods from cache")
logger.Error(err, "Failed to list pods from cache")
return
}
if len(objs) == 0 {
logger.V(6).Info("unrelated to any known pod", "claim", klog.KObj(claim))
logger.V(6).Info("ResourceClaim change unrelated to any known pod", "claim", klog.KObj(claim))
return
}
for _, obj := range objs {
@ -495,7 +559,7 @@ func (ec *Controller) syncPod(ctx context.Context, namespace, name string) error
pod, err := ec.podLister.Pods(namespace).Get(name)
if err != nil {
if apierrors.IsNotFound(err) {
logger.V(5).Info("nothing to do for pod, it is gone")
logger.V(5).Info("Nothing to do for pod, it is gone")
return nil
}
return err
@ -503,7 +567,7 @@ func (ec *Controller) syncPod(ctx context.Context, namespace, name string) error
// Ignore pods which are already getting deleted.
if pod.DeletionTimestamp != nil {
logger.V(5).Info("nothing to do for pod, it is marked for deletion")
logger.V(5).Info("Nothing to do for pod, it is marked for deletion")
return nil
}
@ -532,7 +596,7 @@ func (ec *Controller) syncPod(ctx context.Context, namespace, name string) error
if pod.Spec.NodeName == "" {
// Scheduler will handle reservations.
logger.V(5).Info("nothing to do for pod, scheduler will deal with it")
logger.V(5).Info("Nothing to do for pod, scheduler will deal with it")
return nil
}
@ -561,7 +625,7 @@ func (ec *Controller) syncPod(ctx context.Context, namespace, name string) error
if claim.Status.Allocation != nil &&
!resourceclaim.IsReservedForPod(pod, claim) &&
resourceclaim.CanBeReserved(claim) {
logger.V(5).Info("reserve claim for pod", "resourceClaim", klog.KObj(claim))
logger.V(5).Info("Reserve claim for pod", "resourceClaim", klog.KObj(claim))
if err := ec.reserveForPod(ctx, pod, claim); err != nil {
return err
}
@ -575,7 +639,7 @@ func (ec *Controller) syncPod(ctx context.Context, namespace, name string) error
func (ec *Controller) handleClaim(ctx context.Context, pod *v1.Pod, podClaim v1.PodResourceClaim, newPodClaims *map[string]string) error {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "podClaim", podClaim.Name)
ctx = klog.NewContext(ctx, logger)
logger.V(5).Info("checking", "podClaim", podClaim.Name)
logger.V(5).Info("Checking", "podClaim", podClaim.Name)
// resourceclaim.Name checks for the situation that the client doesn't
// know some future addition to the API. Therefore it gets called here
@ -606,10 +670,10 @@ func (ec *Controller) handleClaim(ctx context.Context, pod *v1.Pod, podClaim v1.
}
if err == nil {
// Already created, nothing more to do.
logger.V(5).Info("claim already created", "podClaim", podClaim.Name, "resourceClaim", claimName)
logger.V(5).Info("Claim already created", "podClaim", podClaim.Name, "resourceClaim", claimName)
return nil
}
logger.Error(err, "claim that was created for the pod is no longer owned by the pod, creating a new one", "podClaim", podClaim.Name, "resourceClaim", claimName)
logger.Error(err, "Claim that was created for the pod is no longer owned by the pod, creating a new one", "podClaim", podClaim.Name, "resourceClaim", claimName)
}
}
@ -771,7 +835,7 @@ func (ec *Controller) syncClaim(ctx context.Context, namespace, name string) err
claim, err := ec.claimLister.ResourceClaims(namespace).Get(name)
if err != nil {
if apierrors.IsNotFound(err) {
logger.V(5).Info("nothing to do for claim, it is gone")
logger.V(5).Info("Nothing to do for claim, it is gone")
return nil
}
return err
@ -815,14 +879,14 @@ func (ec *Controller) syncClaim(ctx context.Context, namespace, name string) err
return err
}
if pod == nil || pod.UID != reservedFor.UID {
logger.V(6).Info("remove reservation because pod is gone or got replaced", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name))
logger.V(6).Info("Remove reservation because pod is gone or got replaced", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name))
keepEntry = false
}
case pod.UID != reservedFor.UID:
logger.V(6).Info("remove reservation because pod got replaced with new instance", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name))
logger.V(6).Info("Remove reservation because pod got replaced with new instance", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name))
keepEntry = false
case isPodDone(pod):
logger.V(6).Info("remove reservation because pod will not run anymore", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name))
logger.V(6).Info("Remove reservation because pod will not run anymore", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name))
keepEntry = false
}
}
@ -838,7 +902,7 @@ func (ec *Controller) syncClaim(ctx context.Context, namespace, name string) err
}
builtinControllerFinalizer := slices.Index(claim.Finalizers, resourceapi.Finalizer)
logger.V(5).Info("claim reserved for counts", "currentCount", len(claim.Status.ReservedFor), "claim", klog.KRef(namespace, name), "updatedCount", len(valid), "builtinController", builtinControllerFinalizer >= 0)
logger.V(5).Info("Claim reserved for counts", "currentCount", len(claim.Status.ReservedFor), "claim", klog.KRef(namespace, name), "updatedCount", len(valid), "builtinController", builtinControllerFinalizer >= 0)
if len(valid) < len(claim.Status.ReservedFor) {
// This is not using a patch because we want the update to fail if anything
// changed in the meantime.
@ -921,23 +985,23 @@ func (ec *Controller) syncClaim(ctx context.Context, namespace, name string) err
if pod.UID != podUID || isPodDone(pod) {
// We are certain that the owning pod is not going to need
// the claim and therefore remove the claim.
logger.V(5).Info("deleting unused generated claim", "claim", klog.KObj(claim), "pod", klog.KObj(pod))
logger.V(5).Info("Deleting unused generated claim", "claim", klog.KObj(claim), "pod", klog.KObj(pod))
err := ec.kubeClient.ResourceV1().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("delete claim %s: %w", klog.KObj(claim), err)
}
} else {
logger.V(6).Info("wrong pod content, not deleting claim", "claim", klog.KObj(claim), "podUID", podUID, "podContent", pod)
logger.V(6).Info("Wrong pod content, not deleting claim", "claim", klog.KObj(claim), "podUID", podUID, "podContent", pod)
}
case apierrors.IsNotFound(err):
// We might not know the pod *yet*. Instead of doing an expensive API call,
// let the garbage collector handle the case that the pod is truly gone.
logger.V(5).Info("pod for claim not found", "claim", klog.KObj(claim), "pod", klog.KRef(claim.Namespace, podName))
logger.V(5).Info("Pod for claim not found", "claim", klog.KObj(claim), "pod", klog.KRef(claim.Namespace, podName))
default:
return fmt.Errorf("lookup pod: %v", err)
}
} else {
logger.V(5).Info("claim not generated for a pod", "claim", klog.KObj(claim))
logger.V(5).Info("Claim not generated for a pod", "claim", klog.KObj(claim))
}
}
@ -955,6 +1019,24 @@ func owningPod(claim *resourceapi.ResourceClaim) (string, types.UID) {
return "", ""
}
func podResourceClaimTemplateIndexFunc(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, nil
}
keySet := sets.NewString()
for _, podClaim := range pod.Spec.ResourceClaims {
if podClaim.ResourceClaimTemplateName != nil {
resourceTemplate := *podClaim.ResourceClaimTemplateName
keySet.Insert(fmt.Sprintf("%s/%s", pod.Namespace, resourceTemplate))
}
}
return keySet.List(), nil
}
// podResourceClaimIndexFunc is an index function that returns ResourceClaim keys (=
// namespace/name) for ResourceClaim or ResourceClaimTemplates in a given pod.
func podResourceClaimIndexFunc(obj interface{}) ([]string, error) {

View file

@ -70,6 +70,7 @@ var (
testClaimReserved = reserveClaim(testClaimAllocated, testPodWithResource)
testClaimReservedTwice = reserveClaim(testClaimReserved, otherTestPod)
testClaimKey = claimKeyPrefix + testClaim.Namespace + "/" + testClaim.Name
testPodKey = podKeyPrefix + testNamespace + "/" + testPodName
templatedTestClaim = makeTemplatedClaim(podResourceClaimName, testPodName+"-"+podResourceClaimName+"-", testNamespace, className, 1, makeOwnerReference(testPodWithResource, true), nil)
templatedTestClaimAllocated = allocateClaim(templatedTestClaim)
@ -498,6 +499,139 @@ func TestSyncHandler(t *testing.T) {
}
}
func TestResourceClaimTemplateEventHandler(t *testing.T) {
tCtx := ktesting.Init(t)
tCtx = ktesting.WithCancel(tCtx)
fakeKubeClient := createTestClient()
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
podInformer := informerFactory.Core().V1().Pods()
claimInformer := informerFactory.Resource().V1().ResourceClaims()
templateInformer := informerFactory.Resource().V1().ResourceClaimTemplates()
claimTemplateClient := fakeKubeClient.ResourceV1().ResourceClaimTemplates(testNamespace)
claimTemplateTmpClient := fakeKubeClient.ResourceV1().ResourceClaimTemplates("tmp")
podClient := fakeKubeClient.CoreV1().Pods(testNamespace)
podTmpClient := fakeKubeClient.CoreV1().Pods("tmp")
ec, err := NewController(tCtx.Logger(), Features{}, fakeKubeClient, podInformer, claimInformer, templateInformer)
tCtx.ExpectNoError(err, "creating ephemeral controller")
informerFactory.Start(tCtx.Done())
stopInformers := func() {
tCtx.Cancel("stopping informers")
informerFactory.Shutdown()
}
defer stopInformers()
expectQueue := func(tCtx ktesting.TContext, expectedKeys []string, expectedIndexerKeys []string) {
g := gomega.NewWithT(tCtx)
tCtx.Helper()
lenDiffMessage := func() string {
actualKeys := []string{}
for ec.queue.Len() > 0 {
actual, _ := ec.queue.Get()
actualKeys = append(actualKeys, actual)
ec.queue.Forget(actual)
ec.queue.Done(actual)
}
return "Workqueue does not contain expected number of elements\n" +
"Diff of elements (- expected, + actual):\n" +
diff.Diff(expectedKeys, actualKeys)
}
g.Eventually(ec.queue.Len).
WithTimeout(5*time.Second).
Should(gomega.Equal(len(expectedKeys)), lenDiffMessage)
g.Consistently(ec.queue.Len).
WithTimeout(1*time.Second).
Should(gomega.Equal(len(expectedKeys)), lenDiffMessage)
g.Eventually(func() int { return len(ec.podIndexer.ListIndexFuncValues(podResourceClaimTemplateIndexKey)) }).
WithTimeout(5 * time.Second).
Should(gomega.Equal(len(expectedIndexerKeys)))
g.Consistently(func() int { return len(ec.podIndexer.ListIndexFuncValues(podResourceClaimTemplateIndexKey)) }).
WithTimeout(1 * time.Second).
Should(gomega.Equal(len(expectedIndexerKeys)))
for _, expected := range expectedKeys {
actual, shuttingDown := ec.queue.Get()
g.Expect(shuttingDown).To(gomega.BeFalseBecause("workqueue is unexpectedly shutting down"))
g.Expect(actual).To(gomega.Equal(expected))
ec.queue.Forget(actual)
ec.queue.Done(actual)
}
for _, src := range expectedIndexerKeys {
objects, err := ec.podIndexer.ByIndex(podResourceClaimTemplateIndexKey, src)
g.Expect(err).NotTo(gomega.HaveOccurred(), "should not error when getting objects by index for key %s", src)
g.Expect(objects).NotTo(gomega.BeEmpty(), "should have at least one object indexed for key %s", src)
// Verify that the indexed objects are the expected pods
found := false
for _, obj := range objects {
pod, ok := obj.(*v1.Pod)
if !ok {
continue
}
// Check if this pod matches the expected template reference
for _, claim := range pod.Spec.ResourceClaims {
if claim.ResourceClaimTemplateName != nil {
// Build the expected index key for this pod
expectedKey := pod.Namespace + "/" + *claim.ResourceClaimTemplateName
if expectedKey == src {
found = true
break
}
}
}
}
g.Expect(found).To(gomega.BeTrueBecause("should find a pod with template %s in index for key %s", templateName, src))
}
}
tmpNamespace := "tmp"
expectQueue(tCtx, []string{}, []string{})
// Create two pods:
// - testPodWithResource in the my-namespace namespace
// - fake-1 in the tmp namespace
_, err = podClient.Create(tCtx, testPodWithResource, metav1.CreateOptions{})
_, err1 := podTmpClient.Create(tCtx, makePod("fake-1", tmpNamespace, "uidpod2", *makePodResourceClaim(podResourceClaimName, templateName)), metav1.CreateOptions{})
ktesting.Step(tCtx, "create pod", func(tCtx ktesting.TContext) {
tCtx.ExpectNoError(err)
tCtx.ExpectNoError(err1)
expectQueue(tCtx, []string{testPodKey, podKeyPrefix + tmpNamespace + "/" + "fake-1"}, []string{testNamespace + "/" + templateName, tmpNamespace + "/" + templateName})
})
// The item has been forgotten and marked as done in the workqueue,so queue is nil
ktesting.Step(tCtx, "expect queue is nil", func(tCtx ktesting.TContext) {
expectQueue(tCtx, []string{}, []string{testNamespace + "/" + templateName, tmpNamespace + "/" + templateName})
})
// After create claim template,queue should have test pod key
_, err = claimTemplateClient.Create(tCtx, template, metav1.CreateOptions{})
ktesting.Step(tCtx, "create claim template after pod backoff", func(tCtx ktesting.TContext) {
tCtx.ExpectNoError(err)
expectQueue(tCtx, []string{testPodKey}, []string{testNamespace + "/" + templateName, tmpNamespace + "/" + templateName})
})
// The item has been forgotten and marked as done in the workqueue,so queue is nil
ktesting.Step(tCtx, "expect queue is nil", func(tCtx ktesting.TContext) {
expectQueue(tCtx, []string{}, []string{testNamespace + "/" + templateName, tmpNamespace + "/" + templateName})
})
// After create tmp namespace claim template,queue should have fake pod key
TmpNamespaceTemplate := makeTemplate(templateName, "tmp", className, nil)
_, err = claimTemplateTmpClient.Create(tCtx, TmpNamespaceTemplate, metav1.CreateOptions{})
ktesting.Step(tCtx, "create claim template in tmp namespace after pod backoff in test namespace", func(tCtx ktesting.TContext) {
tCtx.ExpectNoError(err)
expectQueue(tCtx, []string{podKeyPrefix + tmpNamespace + "/" + "fake-1"}, []string{testNamespace + "/" + templateName, tmpNamespace + "/" + templateName})
})
}
func TestResourceClaimEventHandler(t *testing.T) {
tCtx := ktesting.Init(t)
tCtx = ktesting.WithCancel(tCtx)