diff --git a/pkg/kubelet/cm/dra/claiminfo_test.go b/pkg/kubelet/cm/dra/claiminfo_test.go index 1c9baf42026..21bd1b1221b 100644 --- a/pkg/kubelet/cm/dra/claiminfo_test.go +++ b/pkg/kubelet/cm/dra/claiminfo_test.go @@ -50,6 +50,7 @@ const ( cdiID = "test-driver/test=cdi-test-device" // CDI device ID poolName = "test-pool" requestName = "test-request" + requestName2 = "test-request-2" claimName = "test-claim" claimUID = types.UID(claimName + "-uid") podUID = "test-pod-uid" diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index 8ef0fb26e8c..c6417304c95 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -416,7 +416,7 @@ func (m *Manager) prepareResources(ctx context.Context, pod *v1.Pod) error { err := m.cache.withLock(logger, func() error { info, exists := m.cache.get(claim.Name, claim.Namespace) if !exists { - return fmt.Errorf("internal error: unable to get claim info for ResourceClaim %s", claim.Name) + return fmt.Errorf("internal error: unable to get claim info for ResourceClaim %s in namespace %s", claim.Name, claim.Namespace) } for _, device := range result.GetDevices() { info.addDevice(plugin.DriverName(), state.Device{PoolName: device.PoolName, @@ -443,7 +443,7 @@ func (m *Manager) prepareResources(ctx context.Context, pod *v1.Pod) error { for _, claim := range resourceClaims { info, exists := m.cache.get(claim.Name, claim.Namespace) if !exists { - return fmt.Errorf("internal error: unable to get claim info for ResourceClaim %s", claim.Name) + return fmt.Errorf("internal error: unable to get claim info for ResourceClaim %s in namespace %s", claim.Name, claim.Namespace) } info.setPrepared() } @@ -478,8 +478,25 @@ func lookupClaimRequest(claims []*drapb.Claim, claimUID string) *drapb.Claim { func (m *Manager) GetResources(pod *v1.Pod, container *v1.Container) (*ContainerInfo, error) { cdiDevices := []kubecontainer.CDIDevice{} + // claimRequests maps claimName -> []requestNames for this container + claimRequests := make(map[string][]string) + + // Collect regular ResourceClaims + containerClaimsMap := make(map[string][]string, len(container.Resources.Claims)) + for _, claim := range container.Resources.Claims { + if _, ok := containerClaimsMap[claim.Name]; !ok { + containerClaimsMap[claim.Name] = []string{} + } + containerClaimsMap[claim.Name] = append(containerClaimsMap[claim.Name], claim.Request) + } + for i := range pod.Spec.ResourceClaims { podClaim := &pod.Spec.ResourceClaims[i] + requests, ok := containerClaimsMap[podClaim.Name] + if !ok { + continue + } + claimName, _, err := resourceclaim.Name(pod, podClaim) if err != nil { // No wrapping, error is already informative. @@ -491,61 +508,54 @@ func (m *Manager) GetResources(pod *v1.Pod, container *v1.Container) (*Container if claimName == nil { continue } - for _, claim := range container.Resources.Claims { - if podClaim.Name != claim.Name { - continue - } - err := m.cache.withRLock(func() error { - claimInfo, exists := m.cache.get(*claimName, pod.Namespace) - if !exists { - return fmt.Errorf("internal error: unable to get claim info for ResourceClaim %s", *claimName) - } - - // As of Kubernetes 1.31, CDI device IDs are not passed via annotations anymore. - cdiDevices = append(cdiDevices, claimInfo.cdiDevicesAsList(claim.Request)...) - - return nil - }) - if err != nil { - // No wrapping, this is the error above. - return nil, err - } - } + claimRequests[*claimName] = append(claimRequests[*claimName], requests...) } + // Collect extended resource claims if feature is enabled if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DRAExtendedResource) && pod.Status.ExtendedResourceClaimStatus != nil { claimName := pod.Status.ExtendedResourceClaimStatus.ResourceClaimName // if the container has requests for extended resources backed by DRA, // they must have been allocated via the extendedResourceClaim created // by the kube-scheduler. + + // Build map of this container's extended resource requests + extendedResourceRequests := make(map[string]bool) + for rName, rValue := range container.Resources.Requests { + if !rValue.IsZero() && schedutil.IsDRAExtendedResourceName(rName) { + extendedResourceRequests[rName.String()] = true + } + } + + // Collect request names matching this container's extended resources + for _, rm := range pod.Status.ExtendedResourceClaimStatus.RequestMappings { + // allow multiple device requests per container per resource. + if rm.ContainerName == container.Name && extendedResourceRequests[rm.ResourceName] { + claimRequests[claimName] = append(claimRequests[claimName], rm.RequestName) + } + } + } + + // Process all collected claims + for claimName, requestNames := range claimRequests { err := m.cache.withRLock(func() error { claimInfo, exists := m.cache.get(claimName, pod.Namespace) if !exists { - return fmt.Errorf("unable to get claim info for claim %s in namespace %s", claimName, pod.Namespace) + return fmt.Errorf("internal error: unable to get claim info for ResourceClaim %s in namespace %s", claimName, pod.Namespace) } - for rName, rValue := range container.Resources.Requests { - if rValue.IsZero() { - // We only care about the resources requested by the pod - continue - } - if schedutil.IsDRAExtendedResourceName(rName) { - for _, rm := range pod.Status.ExtendedResourceClaimStatus.RequestMappings { - // allow multiple device requests per container per resource. - if rm.ContainerName == container.Name && rm.ResourceName == rName.String() { - // As of Kubernetes 1.31, CDI device IDs are not passed via annotations anymore. - cdiDevices = append(cdiDevices, claimInfo.cdiDevicesAsList(rm.RequestName)...) - } - } - } + for _, requestName := range requestNames { + // As of Kubernetes 1.31, CDI device IDs are not passed via annotations anymore. + cdiDevices = append(cdiDevices, claimInfo.cdiDevicesAsList(requestName)...) } return nil }) if err != nil { + // No wrapping, this is the error above. return nil, err } } + return &ContainerInfo{CDIDevices: cdiDevices}, nil } @@ -711,7 +721,18 @@ func (m *Manager) PodMightNeedToUnprepareResources(uid types.UID) bool { func (m *Manager) GetContainerClaimInfos(pod *v1.Pod, container *v1.Container) ([]*ClaimInfo, error) { claimInfos := make([]*ClaimInfo, 0, len(pod.Spec.ResourceClaims)) + // Build a map of container claims for O(1) lookup + containerClaimsMap := make(map[string]bool, len(container.Resources.Claims)) + for _, claim := range container.Resources.Claims { + containerClaimsMap[claim.Name] = true + } + for i, podResourceClaim := range pod.Spec.ResourceClaims { + // Only process claims that this container actually uses + if !containerClaimsMap[podResourceClaim.Name] { + continue + } + claimName, _, err := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i]) if err != nil { // No wrapping, the error is already informative. @@ -725,23 +746,17 @@ func (m *Manager) GetContainerClaimInfos(pod *v1.Pod, container *v1.Container) ( // Ownership doesn't get checked here, this should have been done before. - for _, claim := range container.Resources.Claims { - if podResourceClaim.Name != claim.Name { - continue - } - - err := m.cache.withRLock(func() error { - claimInfo, exists := m.cache.get(*claimName, pod.Namespace) - if !exists { - return fmt.Errorf("unable to get information for ResourceClaim %s", *claimName) - } - claimInfos = append(claimInfos, claimInfo.DeepCopy()) - return nil - }) - if err != nil { - // No wrapping, this is the error above. - return nil, err + err = m.cache.withRLock(func() error { + claimInfo, exists := m.cache.get(*claimName, pod.Namespace) + if !exists { + return fmt.Errorf("unable to get information for ResourceClaim %s", *claimName) } + claimInfos = append(claimInfos, claimInfo.DeepCopy()) + return nil + }) + if err != nil { + // No wrapping, this is the error above. + return nil, err } } @@ -804,18 +819,18 @@ func (m *Manager) UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStat } } + // Build a map of pod claim statuses for O(1) lookup + podClaimStatusMap := make(map[string]string, len(pod.Status.ResourceClaimStatuses)) + for _, podClaimStatus := range pod.Status.ResourceClaimStatuses { + if podClaimStatus.ResourceClaimName != nil { + podClaimStatusMap[podClaimStatus.Name] = *podClaimStatus.ResourceClaimName + } + } + // Iterate through the claims requested by this specific container. for _, claim := range containerSpec.Resources.Claims { // Find the actual name of the ResourceClaim object. - var actualClaimName string - for _, podClaimStatus := range pod.Status.ResourceClaimStatuses { - if podClaimStatus.Name == claim.Name { - if podClaimStatus.ResourceClaimName != nil { - actualClaimName = *podClaimStatus.ResourceClaimName - } - break - } - } + actualClaimName := podClaimStatusMap[claim.Name] if actualClaimName == "" { logger.V(4).Info("Could not find generated name for resource claim in pod status", "container", containerSpec.Name, "claimName", claim.Name) diff --git a/pkg/kubelet/cm/dra/manager_test.go b/pkg/kubelet/cm/dra/manager_test.go index a04c6b75f5c..f6a4abbad09 100644 --- a/pkg/kubelet/cm/dra/manager_test.go +++ b/pkg/kubelet/cm/dra/manager_test.go @@ -48,6 +48,7 @@ import ( metricstestutil "k8s.io/component-base/metrics/testutil" "k8s.io/dynamic-resource-allocation/resourceclaim" "k8s.io/klog/v2" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics" drahealthv1alpha1 "k8s.io/kubelet/pkg/apis/dra-health/v1alpha1" @@ -638,11 +639,12 @@ func TestGetResources(t *testing.T) { kubeClient := fake.NewSimpleClientset() for _, test := range []struct { - description string - container *v1.Container - pod *v1.Pod - claimInfo *ClaimInfo - wantErr bool + description string + container *v1.Container + pod *v1.Pod + claimInfo *ClaimInfo + wantErr bool + wantCDIDevices []kubecontainer.CDIDevice }{ { description: "claim info with devices", @@ -656,8 +658,9 @@ func TestGetResources(t *testing.T) { }, }, }, - pod: genTestPod(), - claimInfo: genTestClaimInfo(claimUID, nil, false, nil), + pod: genTestPod(), + claimInfo: genTestClaimInfo(claimUID, nil, false, nil), + wantCDIDevices: []kubecontainer.CDIDevice{{Name: cdiID}}, }, { description: "nil claiminfo", @@ -684,8 +687,47 @@ func TestGetResources(t *testing.T) { }, }, }, - pod: genTestPodWithExtendedResource(), - claimInfo: genTestClaimInfoWithExtendedResource(nil, false), + pod: genTestPodWithExtendedResource(), + claimInfo: genTestClaimInfoWithExtendedResource(nil, false), + wantCDIDevices: []kubecontainer.CDIDevice{{Name: cdiID}}, + }, + { + description: "same claim referenced by multiple requests", + container: &v1.Container{ + Name: containerName, + Resources: v1.ResourceRequirements{ + Claims: []v1.ResourceClaim{ + { + Name: claimName, + Request: requestName, + }, + { + Name: claimName, + Request: requestName2, + }, + }, + }, + }, + pod: genTestPod(), + claimInfo: &ClaimInfo{ + ClaimInfoState: state.ClaimInfoState{ + ClaimUID: claimUID, + ClaimName: claimName, + Namespace: namespace, + PodUIDs: sets.New[string](), + DriverState: map[string]state.DriverState{ + driverName: { + Devices: []state.Device{{ + PoolName: poolName, + DeviceName: deviceName, + RequestNames: []string{requestName, requestName2}, + CDIDeviceIDs: []string{cdiID}, + }}, + }, + }, + }, + }, + wantCDIDevices: []kubecontainer.CDIDevice{{Name: cdiID}, {Name: cdiID}}, }, } { t.Run(test.description, func(t *testing.T) { @@ -703,7 +745,7 @@ func TestGetResources(t *testing.T) { assert.Error(t, err) } else { require.NoError(t, err) - assert.Equal(t, test.claimInfo.DriverState[driverName].Devices[0].CDIDeviceIDs[0], containerInfo.CDIDevices[0].Name) + assert.Equal(t, test.wantCDIDevices, containerInfo.CDIDevices) } }) }