Merge pull request #136270 from bart0sh/PR213-DRA-refactor-getting-claims

DRA Kubelet: refactor getting claims
This commit is contained in:
Kubernetes Prow Robot 2026-02-27 06:47:57 +05:30 committed by GitHub
commit ee22853ef9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 130 additions and 72 deletions

View file

@ -50,6 +50,7 @@ const (
cdiID = "test-driver/test=cdi-test-device" // CDI device ID
poolName = "test-pool"
requestName = "test-request"
requestName2 = "test-request-2"
claimName = "test-claim"
claimUID = types.UID(claimName + "-uid")
podUID = "test-pod-uid"

View file

@ -416,7 +416,7 @@ func (m *Manager) prepareResources(ctx context.Context, pod *v1.Pod) error {
err := m.cache.withLock(logger, func() error {
info, exists := m.cache.get(claim.Name, claim.Namespace)
if !exists {
return fmt.Errorf("internal error: unable to get claim info for ResourceClaim %s", claim.Name)
return fmt.Errorf("internal error: unable to get claim info for ResourceClaim %s in namespace %s", claim.Name, claim.Namespace)
}
for _, device := range result.GetDevices() {
info.addDevice(plugin.DriverName(), state.Device{PoolName: device.PoolName,
@ -443,7 +443,7 @@ func (m *Manager) prepareResources(ctx context.Context, pod *v1.Pod) error {
for _, claim := range resourceClaims {
info, exists := m.cache.get(claim.Name, claim.Namespace)
if !exists {
return fmt.Errorf("internal error: unable to get claim info for ResourceClaim %s", claim.Name)
return fmt.Errorf("internal error: unable to get claim info for ResourceClaim %s in namespace %s", claim.Name, claim.Namespace)
}
info.setPrepared()
}
@ -478,8 +478,25 @@ func lookupClaimRequest(claims []*drapb.Claim, claimUID string) *drapb.Claim {
func (m *Manager) GetResources(pod *v1.Pod, container *v1.Container) (*ContainerInfo, error) {
cdiDevices := []kubecontainer.CDIDevice{}
// claimRequests maps claimName -> []requestNames for this container
claimRequests := make(map[string][]string)
// Collect regular ResourceClaims
containerClaimsMap := make(map[string][]string, len(container.Resources.Claims))
for _, claim := range container.Resources.Claims {
if _, ok := containerClaimsMap[claim.Name]; !ok {
containerClaimsMap[claim.Name] = []string{}
}
containerClaimsMap[claim.Name] = append(containerClaimsMap[claim.Name], claim.Request)
}
for i := range pod.Spec.ResourceClaims {
podClaim := &pod.Spec.ResourceClaims[i]
requests, ok := containerClaimsMap[podClaim.Name]
if !ok {
continue
}
claimName, _, err := resourceclaim.Name(pod, podClaim)
if err != nil {
// No wrapping, error is already informative.
@ -491,61 +508,54 @@ func (m *Manager) GetResources(pod *v1.Pod, container *v1.Container) (*Container
if claimName == nil {
continue
}
for _, claim := range container.Resources.Claims {
if podClaim.Name != claim.Name {
continue
}
err := m.cache.withRLock(func() error {
claimInfo, exists := m.cache.get(*claimName, pod.Namespace)
if !exists {
return fmt.Errorf("internal error: unable to get claim info for ResourceClaim %s", *claimName)
}
// As of Kubernetes 1.31, CDI device IDs are not passed via annotations anymore.
cdiDevices = append(cdiDevices, claimInfo.cdiDevicesAsList(claim.Request)...)
return nil
})
if err != nil {
// No wrapping, this is the error above.
return nil, err
}
}
claimRequests[*claimName] = append(claimRequests[*claimName], requests...)
}
// Collect extended resource claims if feature is enabled
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DRAExtendedResource) && pod.Status.ExtendedResourceClaimStatus != nil {
claimName := pod.Status.ExtendedResourceClaimStatus.ResourceClaimName
// if the container has requests for extended resources backed by DRA,
// they must have been allocated via the extendedResourceClaim created
// by the kube-scheduler.
// Build map of this container's extended resource requests
extendedResourceRequests := make(map[string]bool)
for rName, rValue := range container.Resources.Requests {
if !rValue.IsZero() && schedutil.IsDRAExtendedResourceName(rName) {
extendedResourceRequests[rName.String()] = true
}
}
// Collect request names matching this container's extended resources
for _, rm := range pod.Status.ExtendedResourceClaimStatus.RequestMappings {
// allow multiple device requests per container per resource.
if rm.ContainerName == container.Name && extendedResourceRequests[rm.ResourceName] {
claimRequests[claimName] = append(claimRequests[claimName], rm.RequestName)
}
}
}
// Process all collected claims
for claimName, requestNames := range claimRequests {
err := m.cache.withRLock(func() error {
claimInfo, exists := m.cache.get(claimName, pod.Namespace)
if !exists {
return fmt.Errorf("unable to get claim info for claim %s in namespace %s", claimName, pod.Namespace)
return fmt.Errorf("internal error: unable to get claim info for ResourceClaim %s in namespace %s", claimName, pod.Namespace)
}
for rName, rValue := range container.Resources.Requests {
if rValue.IsZero() {
// We only care about the resources requested by the pod
continue
}
if schedutil.IsDRAExtendedResourceName(rName) {
for _, rm := range pod.Status.ExtendedResourceClaimStatus.RequestMappings {
// allow multiple device requests per container per resource.
if rm.ContainerName == container.Name && rm.ResourceName == rName.String() {
// As of Kubernetes 1.31, CDI device IDs are not passed via annotations anymore.
cdiDevices = append(cdiDevices, claimInfo.cdiDevicesAsList(rm.RequestName)...)
}
}
}
for _, requestName := range requestNames {
// As of Kubernetes 1.31, CDI device IDs are not passed via annotations anymore.
cdiDevices = append(cdiDevices, claimInfo.cdiDevicesAsList(requestName)...)
}
return nil
})
if err != nil {
// No wrapping, this is the error above.
return nil, err
}
}
return &ContainerInfo{CDIDevices: cdiDevices}, nil
}
@ -711,7 +721,18 @@ func (m *Manager) PodMightNeedToUnprepareResources(uid types.UID) bool {
func (m *Manager) GetContainerClaimInfos(pod *v1.Pod, container *v1.Container) ([]*ClaimInfo, error) {
claimInfos := make([]*ClaimInfo, 0, len(pod.Spec.ResourceClaims))
// Build a map of container claims for O(1) lookup
containerClaimsMap := make(map[string]bool, len(container.Resources.Claims))
for _, claim := range container.Resources.Claims {
containerClaimsMap[claim.Name] = true
}
for i, podResourceClaim := range pod.Spec.ResourceClaims {
// Only process claims that this container actually uses
if !containerClaimsMap[podResourceClaim.Name] {
continue
}
claimName, _, err := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
if err != nil {
// No wrapping, the error is already informative.
@ -725,23 +746,17 @@ func (m *Manager) GetContainerClaimInfos(pod *v1.Pod, container *v1.Container) (
// Ownership doesn't get checked here, this should have been done before.
for _, claim := range container.Resources.Claims {
if podResourceClaim.Name != claim.Name {
continue
}
err := m.cache.withRLock(func() error {
claimInfo, exists := m.cache.get(*claimName, pod.Namespace)
if !exists {
return fmt.Errorf("unable to get information for ResourceClaim %s", *claimName)
}
claimInfos = append(claimInfos, claimInfo.DeepCopy())
return nil
})
if err != nil {
// No wrapping, this is the error above.
return nil, err
err = m.cache.withRLock(func() error {
claimInfo, exists := m.cache.get(*claimName, pod.Namespace)
if !exists {
return fmt.Errorf("unable to get information for ResourceClaim %s", *claimName)
}
claimInfos = append(claimInfos, claimInfo.DeepCopy())
return nil
})
if err != nil {
// No wrapping, this is the error above.
return nil, err
}
}
@ -804,18 +819,18 @@ func (m *Manager) UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStat
}
}
// Build a map of pod claim statuses for O(1) lookup
podClaimStatusMap := make(map[string]string, len(pod.Status.ResourceClaimStatuses))
for _, podClaimStatus := range pod.Status.ResourceClaimStatuses {
if podClaimStatus.ResourceClaimName != nil {
podClaimStatusMap[podClaimStatus.Name] = *podClaimStatus.ResourceClaimName
}
}
// Iterate through the claims requested by this specific container.
for _, claim := range containerSpec.Resources.Claims {
// Find the actual name of the ResourceClaim object.
var actualClaimName string
for _, podClaimStatus := range pod.Status.ResourceClaimStatuses {
if podClaimStatus.Name == claim.Name {
if podClaimStatus.ResourceClaimName != nil {
actualClaimName = *podClaimStatus.ResourceClaimName
}
break
}
}
actualClaimName := podClaimStatusMap[claim.Name]
if actualClaimName == "" {
logger.V(4).Info("Could not find generated name for resource claim in pod status", "container", containerSpec.Name, "claimName", claim.Name)

View file

@ -48,6 +48,7 @@ import (
metricstestutil "k8s.io/component-base/metrics/testutil"
"k8s.io/dynamic-resource-allocation/resourceclaim"
"k8s.io/klog/v2"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
drahealthv1alpha1 "k8s.io/kubelet/pkg/apis/dra-health/v1alpha1"
@ -638,11 +639,12 @@ func TestGetResources(t *testing.T) {
kubeClient := fake.NewSimpleClientset()
for _, test := range []struct {
description string
container *v1.Container
pod *v1.Pod
claimInfo *ClaimInfo
wantErr bool
description string
container *v1.Container
pod *v1.Pod
claimInfo *ClaimInfo
wantErr bool
wantCDIDevices []kubecontainer.CDIDevice
}{
{
description: "claim info with devices",
@ -656,8 +658,9 @@ func TestGetResources(t *testing.T) {
},
},
},
pod: genTestPod(),
claimInfo: genTestClaimInfo(claimUID, nil, false, nil),
pod: genTestPod(),
claimInfo: genTestClaimInfo(claimUID, nil, false, nil),
wantCDIDevices: []kubecontainer.CDIDevice{{Name: cdiID}},
},
{
description: "nil claiminfo",
@ -684,8 +687,47 @@ func TestGetResources(t *testing.T) {
},
},
},
pod: genTestPodWithExtendedResource(),
claimInfo: genTestClaimInfoWithExtendedResource(nil, false),
pod: genTestPodWithExtendedResource(),
claimInfo: genTestClaimInfoWithExtendedResource(nil, false),
wantCDIDevices: []kubecontainer.CDIDevice{{Name: cdiID}},
},
{
description: "same claim referenced by multiple requests",
container: &v1.Container{
Name: containerName,
Resources: v1.ResourceRequirements{
Claims: []v1.ResourceClaim{
{
Name: claimName,
Request: requestName,
},
{
Name: claimName,
Request: requestName2,
},
},
},
},
pod: genTestPod(),
claimInfo: &ClaimInfo{
ClaimInfoState: state.ClaimInfoState{
ClaimUID: claimUID,
ClaimName: claimName,
Namespace: namespace,
PodUIDs: sets.New[string](),
DriverState: map[string]state.DriverState{
driverName: {
Devices: []state.Device{{
PoolName: poolName,
DeviceName: deviceName,
RequestNames: []string{requestName, requestName2},
CDIDeviceIDs: []string{cdiID},
}},
},
},
},
},
wantCDIDevices: []kubecontainer.CDIDevice{{Name: cdiID}, {Name: cdiID}},
},
} {
t.Run(test.description, func(t *testing.T) {
@ -703,7 +745,7 @@ func TestGetResources(t *testing.T) {
assert.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, test.claimInfo.DriverState[driverName].Devices[0].CDIDeviceIDs[0], containerInfo.CDIDevices[0].Name)
assert.Equal(t, test.wantCDIDevices, containerInfo.CDIDevices)
}
})
}