Add tests to validate ShareID does not interfere with DRA Resource Health Status

Signed-off-by: Harshal Patil <12152047+harche@users.noreply.github.com>
This commit is contained in:
Harshal Patil 2026-02-04 15:26:25 -05:00
parent a773eb4838
commit 62710a14d5
2 changed files with 436 additions and 13 deletions

View file

@ -56,6 +56,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/dra/state"
"k8s.io/kubernetes/pkg/kubelet/cm/resourceupdates"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr"
)
const (
@ -536,7 +537,7 @@ func genTestClaimWithExtendedResource(name, driver, device, podUID string) *reso
}
// genTestClaimInfo generates claim info object
func genTestClaimInfo(claimUID types.UID, podUIDs []string, prepared bool) *ClaimInfo {
func genTestClaimInfo(claimUID types.UID, podUIDs []string, prepared bool, shareID *types.UID) *ClaimInfo {
return &ClaimInfo{
ClaimInfoState: state.ClaimInfoState{
ClaimUID: claimUID,
@ -548,6 +549,7 @@ func genTestClaimInfo(claimUID types.UID, podUIDs []string, prepared bool) *Clai
Devices: []state.Device{{
PoolName: poolName,
DeviceName: deviceName,
ShareID: shareID,
RequestNames: []string{requestName},
CDIDeviceIDs: []string{cdiID},
}},
@ -655,7 +657,7 @@ func TestGetResources(t *testing.T) {
},
},
pod: genTestPod(),
claimInfo: genTestClaimInfo(claimUID, nil, false),
claimInfo: genTestClaimInfo(claimUID, nil, false, nil),
},
{
description: "nil claiminfo",
@ -851,7 +853,7 @@ dra_operations_duration_seconds_count{is_error="false",operation_name="PrepareRe
driverName: driverName,
pod: genTestPod(),
claim: genTestClaim(claimName, driverName, deviceName, podUID),
claimInfo: genTestClaimInfo(claimUID, []string{podUID}, true),
claimInfo: genTestClaimInfo(claimUID, []string{podUID}, true, nil),
expectedClaimInfoState: genClaimInfoState(cdiID),
resp: genPrepareResourcesResponse(claimUID),
expectedMetric: `# HELP dra_operations_duration_seconds [ALPHA] Latency histogram in seconds for the duration of handling all ResourceClaims referenced by a pod when the pod starts or stops. Identified by the name of the operation (PrepareResources or UnprepareResources) and separated by the success of the operation. The number of failed operations is provided through the histogram's overall count.
@ -937,7 +939,7 @@ dra_operations_duration_seconds_count{is_error="false",operation_name="PrepareRe
driverName: driverName,
pod: genTestPod(),
claim: genTestClaim(claimName, driverName, deviceName, podUID),
claimInfo: genTestClaimInfo(anotherClaimUID, []string{podUID}, false),
claimInfo: genTestClaimInfo(anotherClaimUID, []string{podUID}, false, nil),
expectedErrMsg: fmt.Sprintf("old ResourceClaim with same name %s and different UID %s still exists", claimName, anotherClaimUID),
expectedMetric: `# HELP dra_operations_duration_seconds [ALPHA] Latency histogram in seconds for the duration of handling all ResourceClaims referenced by a pod when the pod starts or stops. Identified by the name of the operation (PrepareResources or UnprepareResources) and separated by the success of the operation. The number of failed operations is provided through the histogram's overall count.
# TYPE dra_operations_duration_seconds histogram
@ -1197,7 +1199,7 @@ dra_operations_duration_seconds_count{is_error="true",operation_name="UnprepareR
description: "resource claim referenced by other pod(s)",
driverName: driverName,
pod: genTestPod(),
claimInfo: genTestClaimInfo(claimUID, []string{podUID, "another-pod-uid"}, true),
claimInfo: genTestClaimInfo(claimUID, []string{podUID, "another-pod-uid"}, true, nil),
wantResourceSkipped: true,
expectedMetric: `# HELP dra_operations_duration_seconds [ALPHA] Latency histogram in seconds for the duration of handling all ResourceClaims referenced by a pod when the pod starts or stops. Identified by the name of the operation (PrepareResources or UnprepareResources) and separated by the success of the operation. The number of failed operations is provided through the histogram's overall count.
# TYPE dra_operations_duration_seconds histogram
@ -1210,7 +1212,7 @@ dra_operations_duration_seconds_count{is_error="false",operation_name="Unprepare
description: "should timeout",
driverName: driverName,
pod: genTestPod(),
claimInfo: genTestClaimInfo(claimUID, []string{podUID}, true),
claimInfo: genTestClaimInfo(claimUID, []string{podUID}, true, nil),
wantTimeout: true,
expectedUnprepareCalls: 1,
expectedErrMsg: "NodeUnprepareResources: rpc error: code = DeadlineExceeded",
@ -1230,7 +1232,7 @@ dra_operations_duration_seconds_count{is_error="true",operation_name="UnprepareR
description: "should fail when driver returns empty response",
driverName: driverName,
pod: genTestPod(),
claimInfo: genTestClaimInfo(claimUID, []string{podUID}, true),
claimInfo: genTestClaimInfo(claimUID, []string{podUID}, true, nil),
resp: &drapb.NodeUnprepareResourcesResponse{Claims: map[string]*drapb.NodeUnprepareResourceResponse{}},
expectedUnprepareCalls: 1,
expectedErrMsg: "NodeUnprepareResources skipped 1 ResourceClaims",
@ -1270,7 +1272,7 @@ dra_operations_duration_seconds_count{is_error="false",operation_name="Unprepare
driverName: driverName,
pod: genTestPod(),
claim: genTestClaim(claimName, driverName, deviceName, podUID),
claimInfo: genTestClaimInfo(claimUID, []string{podUID}, true),
claimInfo: genTestClaimInfo(claimUID, []string{podUID}, true, nil),
expectedUnprepareCalls: 1,
expectedMetric: `# HELP dra_grpc_operations_duration_seconds [ALPHA] Duration in seconds of the DRA gRPC operations
# TYPE dra_grpc_operations_duration_seconds histogram
@ -1288,7 +1290,7 @@ dra_operations_duration_seconds_count{is_error="false",operation_name="Unprepare
description: "should unprepare resource when driver returns nil value",
driverName: driverName,
pod: genTestPod(),
claimInfo: genTestClaimInfo(claimUID, []string{podUID}, true),
claimInfo: genTestClaimInfo(claimUID, []string{podUID}, true, nil),
resp: &drapb.NodeUnprepareResourcesResponse{Claims: map[string]*drapb.NodeUnprepareResourceResponse{string(claimUID): nil}},
expectedUnprepareCalls: 1,
expectedMetric: `# HELP dra_grpc_operations_duration_seconds [ALPHA] Duration in seconds of the DRA gRPC operations
@ -1415,7 +1417,7 @@ func TestGetContainerClaimInfos(t *testing.T) {
description: "should get claim info",
expectedClaimName: claimName,
pod: genTestPod(),
claimInfo: genTestClaimInfo(claimUID, []string{podUID}, false),
claimInfo: genTestClaimInfo(claimUID, []string{podUID}, false, nil),
},
{
description: "should get extended resource claim info",
@ -1457,7 +1459,7 @@ func TestGetContainerClaimInfos(t *testing.T) {
},
},
},
claimInfo: genTestClaimInfo(claimUID, []string{podUID}, false),
claimInfo: genTestClaimInfo(claimUID, []string{podUID}, false, nil),
expectedErrMsg: "none of the supported fields are set",
},
{
@ -1664,7 +1666,7 @@ func TestHandleWatchResourcesStream(t *testing.T) {
defer stCancel()
// Setup: Create a manager with a relevant claim already in its cache.
initialClaim := genTestClaimInfo(claimUID, []string{string(podUID)}, true)
initialClaim := genTestClaimInfo(claimUID, []string{string(podUID)}, true, nil)
manager, runStreamTest := setupNewManagerAndRunStreamTest(t, stCtx, initialClaim)
t.Log("HealthChangeForAllocatedDevice: Test Case Started")
@ -1781,7 +1783,7 @@ func TestHandleWatchResourcesStream(t *testing.T) {
defer stCancel()
// Setup: Manager with a claim and the device already marked Unhealthy in health cache
initialClaim := genTestClaimInfo(claimUID, []string{string(podUID)}, true)
initialClaim := genTestClaimInfo(claimUID, []string{string(podUID)}, true, nil)
manager, runStreamTest := setupNewManagerAndRunStreamTest(t, stCtx, initialClaim)
// Pre-populate health cache
@ -1897,6 +1899,192 @@ func TestHandleWatchResourcesStream(t *testing.T) {
require.Error(t, finalErr)
assert.True(t, errors.Is(finalErr, context.Canceled) || errors.Is(finalErr, context.DeadlineExceeded))
})
// Test Case 6: Health change for a device allocated with ShareID
// This test validates that ShareID does not interfere with health status reporting.
t.Run("HealthChangeForDeviceWithShareID", func(t *testing.T) {
stCtx, stCancel := context.WithCancel(overallTestCtx)
defer stCancel()
// Setup: Create a manager with a claim that has ShareID set on the device
testShareUID := types.UID("test-share-uid-for-health")
initialClaim := genTestClaimInfo(claimUID, []string{string(podUID)}, true, &testShareUID)
manager, runStreamTest := setupNewManagerAndRunStreamTest(t, stCtx, initialClaim)
t.Log("HealthChangeForDeviceWithShareID: Test Case Started")
responses := make(chan struct {
Resp *drahealthv1alpha1.NodeWatchResourcesResponse
Err error
}, 1)
updateChan, done, streamErrChan := runStreamTest(stCtx, responses)
// Send health update for the device (same device that has ShareID in the claim)
unhealthyDeviceMsg := &drahealthv1alpha1.DeviceHealth{
Device: &drahealthv1alpha1.DeviceIdentifier{
PoolName: poolName,
DeviceName: deviceName,
},
Health: drahealthv1alpha1.HealthStatus_UNHEALTHY,
LastUpdatedTime: time.Now().Unix(),
}
t.Logf("HealthChangeForDeviceWithShareID: Sending health update: %+v", unhealthyDeviceMsg)
responses <- struct {
Resp *drahealthv1alpha1.NodeWatchResourcesResponse
Err error
}{
Resp: &drahealthv1alpha1.NodeWatchResourcesResponse{Devices: []*drahealthv1alpha1.DeviceHealth{unhealthyDeviceMsg}},
}
t.Log("HealthChangeForDeviceWithShareID: Waiting for update on manager channel")
select {
case upd := <-updateChan:
t.Logf("HealthChangeForDeviceWithShareID: Received update: %+v", upd)
assert.ElementsMatch(t, []string{string(podUID)}, upd.PodUIDs, "Expected pod UID in update for device with ShareID")
case <-time.After(2 * time.Second):
t.Fatal("HealthChangeForDeviceWithShareID: Timeout waiting for pod update - ShareID may be interfering with health reporting")
}
// Verify health cache is updated correctly
cachedHealth := manager.healthInfoCache.getHealthInfo(driverName, poolName, deviceName)
assert.Equal(t, state.DeviceHealthStatus("Unhealthy"), cachedHealth, "Health cache should be updated for device with ShareID")
// Verify the claim still has the ShareID set (it shouldn't be lost during health updates)
claimFromCache, exists := manager.cache.get(claimName, namespace)
require.True(t, exists, "Claim should still exist in cache")
devices := claimFromCache.DriverState[driverName].Devices
require.Len(t, devices, 1, "Claim should have one device")
assert.NotNil(t, devices[0].ShareID, "ShareID should still be set on the device")
assert.Equal(t, testShareUID, *devices[0].ShareID, "ShareID value should be preserved")
t.Log("HealthChangeForDeviceWithShareID: Closing responses channel to signal EOF")
close(responses)
t.Log("HealthChangeForDeviceWithShareID: Waiting on done channel")
var finalErr error
select {
case <-done:
finalErr = <-streamErrChan
t.Log("HealthChangeForDeviceWithShareID: done channel closed, stream goroutine finished.")
case <-time.After(1 * time.Second):
t.Fatal("HealthChangeForDeviceWithShareID: Timed out waiting for HandleWatchResourcesStream to finish")
}
assert.True(t, finalErr == nil || errors.Is(finalErr, io.EOF), "Expected nil or io.EOF, got %v", finalErr)
})
// Test Case 7: Health change affects multiple pods sharing the same device via ShareID
// When two pods share the same physical device (identified by pool/device, but with different ShareIDs),
// a health change should notify both pods since they're using the same underlying hardware.
t.Run("HealthChangeForMultiplePodsWithSharedDevice", func(t *testing.T) {
stCtx, stCancel := context.WithCancel(overallTestCtx)
defer stCancel()
// Setup: Two claims from different pods, both using the same device with different ShareIDs
pod1UID := types.UID("pod-1-sharing-device")
pod2UID := types.UID("pod-2-sharing-device")
claim1UID := types.UID("claim-1-shared")
claim2UID := types.UID("claim-2-shared")
shareUID1 := types.UID("share-uid-1")
shareUID2 := types.UID("share-uid-2")
// Both claims reference the same physical device (same pool/device) but with different ShareIDs
claim1 := &ClaimInfo{
ClaimInfoState: state.ClaimInfoState{
ClaimUID: claim1UID,
ClaimName: "shared-claim-1",
Namespace: namespace,
PodUIDs: sets.New[string](string(pod1UID)),
DriverState: map[string]state.DriverState{
driverName: {
Devices: []state.Device{{
PoolName: poolName,
DeviceName: deviceName, // Same device as claim2
ShareID: &shareUID1,
RequestNames: []string{requestName},
CDIDeviceIDs: []string{cdiID},
}},
},
},
},
prepared: true,
}
claim2 := &ClaimInfo{
ClaimInfoState: state.ClaimInfoState{
ClaimUID: claim2UID,
ClaimName: "shared-claim-2",
Namespace: namespace,
PodUIDs: sets.New[string](string(pod2UID)),
DriverState: map[string]state.DriverState{
driverName: {
Devices: []state.Device{{
PoolName: poolName,
DeviceName: deviceName, // Same device as claim1
ShareID: &shareUID2,
RequestNames: []string{requestName},
CDIDeviceIDs: []string{cdiID},
}},
},
},
},
prepared: true,
}
manager, runStreamTest := setupNewManagerAndRunStreamTest(t, stCtx, claim1, claim2)
t.Log("HealthChangeForMultiplePodsWithSharedDevice: Test Case Started")
responses := make(chan struct {
Resp *drahealthv1alpha1.NodeWatchResourcesResponse
Err error
}, 1)
updateChan, done, streamErrChan := runStreamTest(stCtx, responses)
// Send health update for the shared device
unhealthyDeviceMsg := &drahealthv1alpha1.DeviceHealth{
Device: &drahealthv1alpha1.DeviceIdentifier{
PoolName: poolName,
DeviceName: deviceName,
},
Health: drahealthv1alpha1.HealthStatus_UNHEALTHY,
LastUpdatedTime: time.Now().Unix(),
}
t.Logf("HealthChangeForMultiplePodsWithSharedDevice: Sending health update: %+v", unhealthyDeviceMsg)
responses <- struct {
Resp *drahealthv1alpha1.NodeWatchResourcesResponse
Err error
}{
Resp: &drahealthv1alpha1.NodeWatchResourcesResponse{Devices: []*drahealthv1alpha1.DeviceHealth{unhealthyDeviceMsg}},
}
t.Log("HealthChangeForMultiplePodsWithSharedDevice: Waiting for update on manager channel")
select {
case upd := <-updateChan:
t.Logf("HealthChangeForMultiplePodsWithSharedDevice: Received update: %+v", upd)
// Both pods should be notified since they share the same device
assert.Len(t, upd.PodUIDs, 2, "Both pods sharing the device should be notified")
assert.Contains(t, upd.PodUIDs, string(pod1UID), "Pod 1 should be notified")
assert.Contains(t, upd.PodUIDs, string(pod2UID), "Pod 2 should be notified")
case <-time.After(2 * time.Second):
t.Fatal("HealthChangeForMultiplePodsWithSharedDevice: Timeout waiting for pod update")
}
// Verify health cache is updated
cachedHealth := manager.healthInfoCache.getHealthInfo(driverName, poolName, deviceName)
assert.Equal(t, state.DeviceHealthStatus("Unhealthy"), cachedHealth, "Health cache should show Unhealthy for shared device")
t.Log("HealthChangeForMultiplePodsWithSharedDevice: Closing responses channel to signal EOF")
close(responses)
var finalErr error
select {
case <-done:
finalErr = <-streamErrChan
t.Log("HealthChangeForMultiplePodsWithSharedDevice: done channel closed")
case <-time.After(1 * time.Second):
t.Fatal("HealthChangeForMultiplePodsWithSharedDevice: Timed out waiting for stream to finish")
}
assert.True(t, finalErr == nil || errors.Is(finalErr, io.EOF), "Expected nil or io.EOF, got %v", finalErr)
})
}
// TestUpdateAllocatedResourcesStatus verifies that the manager can correctly
@ -2029,6 +2217,51 @@ func TestUpdateAllocatedResourcesStatus(t *testing.T) {
},
},
},
// Test case for ShareID: Verifies that health status is correctly reported for devices with ShareID set.
// This validates that KEP-4860 (ShareID) does not interfere with KEP-4680 (Resource Health Status).
{
name: "Claim with ShareID",
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "pod-shareid", UID: "pod-shareid-uid"},
Spec: v1.PodSpec{
Containers: []v1.Container{
{Name: "container1", Resources: v1.ResourceRequirements{Claims: []v1.ResourceClaim{{Name: "shareid-claim"}}}},
},
ResourceClaims: []v1.PodResourceClaim{
{Name: "shareid-claim", ResourceClaimName: ptr.To("shareid-claim-object")},
},
},
Status: v1.PodStatus{
ResourceClaimStatuses: []v1.PodResourceClaimStatus{
{Name: "shareid-claim", ResourceClaimName: ptr.To("shareid-claim-object")},
},
},
},
claimInfos: []*ClaimInfo{
{
ClaimInfoState: state.ClaimInfoState{
ClaimName: "shareid-claim-object",
PodUIDs: sets.New("pod-shareid-uid"),
DriverState: map[string]state.DriverState{
"test-driver": {Devices: []state.Device{{
PoolName: "pool",
DeviceName: "dev-shared",
ShareID: ptr.To(types.UID("test-share-id")), // Device has ShareID
}}},
},
},
},
},
initialStatus: &v1.PodStatus{ContainerStatuses: []v1.ContainerStatus{{Name: "container1"}}},
expectedAllocatedResourcesStatus: []v1.ResourceStatus{
{
Name: "claim:shareid-claim",
Resources: []v1.ResourceHealth{
{ResourceID: "test-driver/pool/dev-shared", Health: v1.ResourceHealthStatusHealthy},
},
},
},
},
}
for _, tc := range testCases {

View file

@ -1189,6 +1189,80 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), feature.Dynami
}).WithContext(ctx).WithTimeout(30*time.Second).WithPolling(2*time.Second).Should(gomega.Succeed(), "The allocatedResourcesStatus field should be absent when the feature gate is disabled")
})
// These tests validate that health status reporting works correctly for devices
// allocated with ShareID, ensuring both features integrate properly.
f.Context("Resource Health with ShareID", framework.WithFeatureGate(features.ResourceHealthStatus), framework.WithFeatureGate(features.DRAConsumableCapacity), f.WithSerial(), func() {
ginkgo.BeforeEach(func() {
// Skip if feature gates are already enabled (we need to enable them ourselves)
if e2eskipper.IsFeatureGateEnabled(features.ResourceHealthStatus) {
e2eskipper.Skipf("feature %s is already enabled", features.ResourceHealthStatus)
}
if e2eskipper.IsFeatureGateEnabled(features.DRAConsumableCapacity) {
e2eskipper.Skipf("feature %s is already enabled", features.DRAConsumableCapacity)
}
})
// Verifies that device health transitions are correctly reflected in Pod status
// for devices allocated with ShareID.
ginkgo.It("should reflect device health changes for devices with ShareID", func(ctx context.Context) {
ginkgo.By("Starting the test driver")
kubeletPlugin := newKubeletPlugin(ctx, f.ClientSet, f.Namespace.Name, getNodeName(ctx, f), driverName)
ginkgo.By("wait for registration to complete")
gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(pluginRegistrationTimeout).Should(testdrivergomega.BeRegistered)
poolName := "shareid-health-pool"
deviceName := "shareid-health-device"
testShareID := uuid.NewUUID()
// Create test objects with ShareID in the allocation result
pod := createTestObjectsWithShareID(ctx, f, driverName, "shareid-health-class", "shareid-health-claim", "shareid-health-pod", poolName, deviceName, &testShareID)
ginkgo.By("wait for NodePrepareResources call to succeed")
gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(retryTestTimeout).Should(testdrivergomega.NodePrepareResourcesSucceeded)
ginkgo.By("wait for pod to be running")
framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod))
ginkgo.By("Setting device health to Healthy via control channel")
kubeletPlugin.HealthControlChan <- testdriver.DeviceHealthUpdate{
PoolName: poolName,
DeviceName: deviceName,
Health: "Healthy",
}
ginkgo.By("Verifying device health is Healthy in the pod status")
gomega.Eventually(ctx, func(ctx context.Context) (string, error) {
return getDeviceHealthFromAPIServer(f, pod.Namespace, pod.Name, driverName, "shareid-health-claim", poolName, deviceName)
}).WithTimeout(60*time.Second).WithPolling(2*time.Second).Should(gomega.Equal("Healthy"), "Device with ShareID should show Healthy status")
ginkgo.By("Setting device health to Unhealthy via control channel")
kubeletPlugin.HealthControlChan <- testdriver.DeviceHealthUpdate{
PoolName: poolName,
DeviceName: deviceName,
Health: "Unhealthy",
}
ginkgo.By("Verifying device health is now Unhealthy in the pod status")
gomega.Eventually(ctx, func(ctx context.Context) (string, error) {
return getDeviceHealthFromAPIServer(f, pod.Namespace, pod.Name, driverName, "shareid-health-claim", poolName, deviceName)
}).WithTimeout(60*time.Second).WithPolling(2*time.Second).Should(gomega.Equal("Unhealthy"), "Device with ShareID should update to Unhealthy")
ginkgo.By("Setting device health back to Healthy")
kubeletPlugin.HealthControlChan <- testdriver.DeviceHealthUpdate{
PoolName: poolName,
DeviceName: deviceName,
Health: "Healthy",
}
ginkgo.By("Verifying device health has recovered to Healthy")
gomega.Eventually(ctx, func(ctx context.Context) (string, error) {
return getDeviceHealthFromAPIServer(f, pod.Namespace, pod.Name, driverName, "shareid-health-claim", poolName, deviceName)
}).WithTimeout(60*time.Second).WithPolling(2*time.Second).Should(gomega.Equal("Healthy"), "Device with ShareID should recover to Healthy")
})
})
f.Context("Device ShareID", framework.WithFeatureGate(features.DRAConsumableCapacity), f.WithSerial(), func() {
ginkgo.BeforeEach(func() {
@ -1839,6 +1913,122 @@ func setupAndVerifyHealthyPod(
return pod, claimName, poolName, deviceName
}
// createTestObjectsWithShareID creates test objects (DeviceClass, ResourceClaim, Pod) for testing
// ShareID with health status integration. The ShareID is included in the device allocation result.
func createTestObjectsWithShareID(ctx context.Context, f *framework.Framework, driverName, className, claimName, podName, poolName, deviceName string, shareID *apitypes.UID) *v1.Pod {
ginkgo.By(fmt.Sprintf("Creating DeviceClass %q", className))
dc := &resourceapi.DeviceClass{
ObjectMeta: metav1.ObjectMeta{
Name: className,
},
}
_, err := f.ClientSet.ResourceV1().DeviceClasses().Create(ctx, dc, metav1.CreateOptions{})
framework.ExpectNoError(err, "failed to create DeviceClass "+className)
ginkgo.DeferCleanup(func(ctx context.Context) {
err := f.ClientSet.ResourceV1().DeviceClasses().Delete(ctx, className, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
framework.Failf("Failed to delete DeviceClass %s: %v", className, err)
}
})
ginkgo.By(fmt.Sprintf("Creating ResourceClaim %q", claimName))
claim := &resourceapi.ResourceClaim{
ObjectMeta: metav1.ObjectMeta{
Name: claimName,
},
Spec: resourceapi.ResourceClaimSpec{
Devices: resourceapi.DeviceClaim{
Requests: []resourceapi.DeviceRequest{{
Name: "my-request",
Exactly: &resourceapi.ExactDeviceRequest{
DeviceClassName: className,
},
}},
},
},
}
_, err = f.ClientSet.ResourceV1().ResourceClaims(f.Namespace.Name).Create(ctx, claim, metav1.CreateOptions{})
framework.ExpectNoError(err, "failed to create ResourceClaim "+claimName)
ginkgo.DeferCleanup(func(ctx context.Context) {
err := f.ClientSet.ResourceV1().ResourceClaims(f.Namespace.Name).Delete(ctx, claimName, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
framework.Failf("Failed to delete ResourceClaim %s: %v", claimName, err)
}
})
ginkgo.By(fmt.Sprintf("Creating long-running Pod %q", podName))
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: f.Namespace.Name,
},
Spec: v1.PodSpec{
NodeName: getNodeName(ctx, f),
RestartPolicy: v1.RestartPolicyNever,
ResourceClaims: []v1.PodResourceClaim{
{Name: claimName, ResourceClaimName: &claimName},
},
Containers: []v1.Container{
{
Name: "testcontainer",
Image: e2epod.GetDefaultTestImage(),
Command: []string{"/bin/sh", "-c", "sleep 600"},
Resources: v1.ResourceRequirements{
Claims: []v1.ResourceClaim{{Name: claimName, Request: "my-request"}},
},
},
},
},
}
createdPod, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{})
framework.ExpectNoError(err, "failed to create Pod "+podName)
ginkgo.DeferCleanup(func(ctx context.Context) {
e2epod.DeletePodOrFail(ctx, f.ClientSet, createdPod.Namespace, createdPod.Name)
})
// Patch the Pod's status to include the ResourceClaimStatuses
patch, err := json.Marshal(v1.Pod{
Status: v1.PodStatus{
ResourceClaimStatuses: []v1.PodResourceClaimStatus{
{Name: claimName, ResourceClaimName: &claimName},
},
},
})
framework.ExpectNoError(err, "failed to marshal patch for Pod status")
_, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Patch(ctx, createdPod.Name, apitypes.StrategicMergePatchType, patch, metav1.PatchOptions{}, "status")
framework.ExpectNoError(err, "failed to patch Pod status with ResourceClaimStatuses")
ginkgo.By(fmt.Sprintf("Allocating claim %q with ShareID", claimName))
claimToUpdate, err := f.ClientSet.ResourceV1().ResourceClaims(f.Namespace.Name).Get(ctx, claimName, metav1.GetOptions{})
framework.ExpectNoError(err, "failed to get latest version of ResourceClaim "+claimName)
// Update the claims status with ShareID in the allocation result
claimToUpdate.Status = resourceapi.ResourceClaimStatus{
ReservedFor: []resourceapi.ResourceClaimConsumerReference{
{Resource: "pods", Name: createdPod.Name, UID: createdPod.UID},
},
Allocation: &resourceapi.AllocationResult{
Devices: resourceapi.DeviceAllocationResult{
Results: []resourceapi.DeviceRequestAllocationResult{
{
Driver: driverName,
Pool: poolName,
Device: deviceName,
Request: "my-request",
ShareID: shareID, // Include ShareID in allocation
},
},
},
},
}
_, err = f.ClientSet.ResourceV1().ResourceClaims(f.Namespace.Name).UpdateStatus(ctx, claimToUpdate, metav1.UpdateOptions{})
framework.ExpectNoError(err, "failed to update ResourceClaim status with ShareID")
return createdPod
}
// errorOnCloseListener is a mock net.Listener that blocks on Accept()
// until Close() is called, at which point Accept() returns a predefined error.
//