From cdcfc4eeb3b5fd6407b2f7dafc315a9f6c6b1677 Mon Sep 17 00:00:00 2001 From: Praveen Krishna Date: Wed, 4 Mar 2026 14:43:54 +0000 Subject: [PATCH] Add integration tests. --- .../nodeallocatabledynamicresources.go | 30 +- test/e2e/node/pod_admission.go | 41 +- test/integration/dra/core.go | 7 +- test/integration/dra/dra.go | 12 +- test/integration/dra/helpers.go | 5 +- .../dra/node_allocatable_resources.go | 669 ++++++++++++++++++ 6 files changed, 726 insertions(+), 38 deletions(-) create mode 100644 test/integration/dra/node_allocatable_resources.go diff --git a/pkg/scheduler/framework/plugins/dynamicresources/nodeallocatabledynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/nodeallocatabledynamicresources.go index c07605098ca..31c49165f14 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/nodeallocatabledynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/nodeallocatabledynamicresources.go @@ -24,7 +24,6 @@ import ( v1 "k8s.io/api/core/v1" resourceapi "k8s.io/api/resource/v1" - apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" @@ -92,6 +91,7 @@ func (pl *DynamicResources) calculateAndCheckNodeAllocatableResources(ctx contex totalPodDemand, nodeAllocatableClaimStatus, status := pl.getPodNodeAllocatableResourceFootprint(logger, nodeInfo, pod, state, allocations, nodeAllocatableClaims) if status != nil { + logger.V(5).Info("calculateAndCheckNodeAllocatableResources: getPodNodeAllocatableResourceFootprint failed", "status", status) return nil, status } @@ -290,6 +290,14 @@ func (pl *DynamicResources) getPodNodeAllocatableResourceFootprint(logger klog.L return nil, nil, statusError(logger, err) } + for _, status := range nodeAllocatableStatus { + // TODO(KEP-5517): Evaluate if its ok to have no containers referencing a node allocatable resource claim. + // This is pending on defining kubelet cgroup enforcement. + if len(status.Containers) == 0 { + return nil, nil, fwk.NewStatus(fwk.UnschedulableAndUnresolvable, fmt.Sprintf("claim %s: node-allocatable resource claim not referenced by any container within the pod", status.ResourceClaimName)) + } + } + // Calculate the final totalPodDemand to be used for node fitting optsTotal := resourcehelper.PodResourcesOptions{ SkipPodLevelResources: !pl.fts.EnablePodLevelResources, @@ -401,20 +409,22 @@ func (pl *DynamicResources) patchNodeAllocatableResourceClaimStatus(ctx context. if len(nodeAllocatableClaimStatus) == 0 { return nil } - logger := klog.FromContext(ctx) - podStatusCopy := pod.Status.DeepCopy() - if apiequality.Semantic.DeepEqual(podStatusCopy.NodeAllocatableResourceClaimStatuses, nodeAllocatableClaimStatus) { - logger.V(6).Info("NodeAllocatableResourceClaimStatuses is already up-to-date", "pod", klog.KObj(pod)) - return nil - } - podStatusCopy.NodeAllocatableResourceClaimStatuses = nodeAllocatableClaimStatus + // The incoming 'pod' is from the scheduler cache and would have NodeAllocatableResourceClaimStatus + // pre-populated in the assume phase without persisting to the API server. + // schedutil.PatchPodStatus skips patching if the old and new status are identical. + // To ensure the status is persisted to the API server we clear it in the baseStatus, forcing a patch. + baseStatus := pod.Status.DeepCopy() + baseStatus.NodeAllocatableResourceClaimStatuses = nil - if err := schedutil.PatchPodStatus(ctx, pl.clientset, pod.Name, pod.Namespace, &pod.Status, podStatusCopy); err != nil { + targetStatus := pod.Status.DeepCopy() + + targetStatus.NodeAllocatableResourceClaimStatuses = nodeAllocatableClaimStatus + if err := schedutil.PatchPodStatus(ctx, pl.clientset, pod.Name, pod.Namespace, baseStatus, targetStatus); err != nil { return statusError(logger, fmt.Errorf("updating pod %s/%s NodeAllocatableResourceClaimStatuses: %w", pod.Namespace, pod.Name, err)) } - logger.V(5).Info("Patched pod status with NodeAllocatableResourceClaimStatuses", "pod", klog.KObj(pod), "status", podStatusCopy.NodeAllocatableResourceClaimStatuses) + logger.V(5).Info("Patched pod status with NodeAllocatableResourceClaimStatuses", "pod", klog.KObj(pod), "status", targetStatus.NodeAllocatableResourceClaimStatuses) return nil } diff --git a/test/e2e/node/pod_admission.go b/test/e2e/node/pod_admission.go index cbc828761c7..2e50f7e115c 100644 --- a/test/e2e/node/pod_admission.go +++ b/test/e2e/node/pod_admission.go @@ -111,26 +111,27 @@ var _ = SIGDescribe("PodRejectionStatus", func() { // This detects if there are any new fields in Status that were dropped by the pod rejection. // These new fields either should be kept by kubelet's admission or added explicitly in the list of fields that are having a different value or must be cleared. gomega.Expect(gotPod.Status).To(gstruct.MatchAllFields(gstruct.Fields{ - "ObservedGeneration": gstruct.Ignore(), - "Phase": gstruct.Ignore(), - "Conditions": gstruct.Ignore(), - "Message": gstruct.Ignore(), - "Reason": gstruct.Ignore(), - "NominatedNodeName": gstruct.Ignore(), - "HostIP": gstruct.Ignore(), - "HostIPs": gstruct.Ignore(), - "PodIP": gstruct.Ignore(), - "PodIPs": gstruct.Ignore(), - "StartTime": gstruct.Ignore(), - "InitContainerStatuses": gstruct.Ignore(), - "ContainerStatuses": gstruct.Ignore(), - "QOSClass": gomega.Equal(pod.Status.QOSClass), // QOSClass should be kept - "EphemeralContainerStatuses": gstruct.Ignore(), - "Resize": gstruct.Ignore(), - "ResourceClaimStatuses": gstruct.Ignore(), - "ExtendedResourceClaimStatus": gstruct.Ignore(), - "Resources": gstruct.Ignore(), - "AllocatedResources": gstruct.Ignore(), + "ObservedGeneration": gstruct.Ignore(), + "Phase": gstruct.Ignore(), + "Conditions": gstruct.Ignore(), + "Message": gstruct.Ignore(), + "Reason": gstruct.Ignore(), + "NominatedNodeName": gstruct.Ignore(), + "HostIP": gstruct.Ignore(), + "HostIPs": gstruct.Ignore(), + "PodIP": gstruct.Ignore(), + "PodIPs": gstruct.Ignore(), + "StartTime": gstruct.Ignore(), + "InitContainerStatuses": gstruct.Ignore(), + "ContainerStatuses": gstruct.Ignore(), + "QOSClass": gomega.Equal(pod.Status.QOSClass), // QOSClass should be kept + "EphemeralContainerStatuses": gstruct.Ignore(), + "Resize": gstruct.Ignore(), + "ResourceClaimStatuses": gstruct.Ignore(), + "ExtendedResourceClaimStatus": gstruct.Ignore(), + "Resources": gstruct.Ignore(), + "AllocatedResources": gstruct.Ignore(), + "NodeAllocatableResourceClaimStatuses": gstruct.Ignore(), })) }) }) diff --git a/test/integration/dra/core.go b/test/integration/dra/core.go index 50edadf73cc..fe720728f04 100644 --- a/test/integration/dra/core.go +++ b/test/integration/dra/core.go @@ -336,9 +336,10 @@ func testPublishResourceSlices(tCtx ktesting.TContext, haveLatestAPI bool, disab } return expected }()...), - "BindingConditions": gomega.Equal(device.BindingConditions), - "BindingFailureConditions": gomega.Equal(device.BindingFailureConditions), - "BindsToNode": gomega.Equal(device.BindsToNode), + "BindingConditions": gomega.Equal(device.BindingConditions), + "BindingFailureConditions": gomega.Equal(device.BindingFailureConditions), + "BindsToNode": gomega.Equal(device.BindsToNode), + "NodeAllocatableResourceMappings": gomega.Equal(device.NodeAllocatableResourceMappings), })) } return expected diff --git a/test/integration/dra/dra.go b/test/integration/dra/dra.go index 1a0cc51819c..68edc0b5390 100644 --- a/test/integration/dra/dra.go +++ b/test/integration/dra/dra.go @@ -87,8 +87,10 @@ var ( ) const ( - numNodes = 8 - maxPodsPerNode = 5000 // This should never be the limiting factor, no matter how many tests run in parallel. + numNodes = 8 + maxPodsPerNode = 5000 // This should never be the limiting factor, no matter how many tests run in parallel. + nodeCPUCapacity = "100" + nodeMemoryCapacity = "1k" // schedulingTimeout is the time we grant the scheduler for one scheduling attempt, // whether it's successful or not. @@ -220,6 +222,7 @@ func run(tCtx ktesting.TContext, whatRE string) { features.DRAPrioritizedList: true, features.DRAResourceClaimDeviceStatus: true, features.DRAExtendedResource: true, + features.DRANodeAllocatableResources: true, }, f: func(tCtx ktesting.TContext) { // These tests must run in parallel as much as possible to keep overall runtime low! @@ -246,6 +249,7 @@ func run(tCtx ktesting.TContext, whatRE string) { runSubTest(tCtx, "FilterTimeout", func(tCtx ktesting.TContext) { testFilterTimeout(tCtx, 21) }) runSubTest(tCtx, "ShareResourceClaimSequentially", testShareResourceClaimSequentially) runSubTest(tCtx, "UsesAllResources", testUsesAllResources) + runSubTest(tCtx, "DRANodeAllocatableResources", func(tCtx ktesting.TContext) { testNodeAllocatableResources(tCtx, true) }) }, }, } { @@ -368,8 +372,8 @@ func createNodes(tCtx ktesting.TContext) { // Make the node ready. node.Status = v1.NodeStatus{ Capacity: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("100"), - v1.ResourceMemory: resource.MustParse("1000"), + v1.ResourceCPU: resource.MustParse(nodeCPUCapacity), + v1.ResourceMemory: resource.MustParse(nodeMemoryCapacity), v1.ResourcePods: *resource.NewScaledQuantity(maxPodsPerNode, 0), }, Phase: v1.NodeRunning, diff --git a/test/integration/dra/helpers.go b/test/integration/dra/helpers.go index 96f6967fe33..f8118acd3c6 100644 --- a/test/integration/dra/helpers.go +++ b/test/integration/dra/helpers.go @@ -191,7 +191,10 @@ var createPodInternal = func(tCtx ktesting.TContext, namespace string, suffix st container.Resources.Limits[res] = resource.MustParse(qty) } } - pod.Spec.ResourceClaims = resourceClaims + if len(claims) > 0 { + // Update the field only if claims are passed. + pod.Spec.ResourceClaims = resourceClaims + } pod, err := tCtx.Client().CoreV1().Pods(namespace).Create(tCtx, pod, metav1.CreateOptions{}) tCtx.ExpectNoError(err, "create pod "+podName) tCtx.CleanupCtx(func(tCtx ktesting.TContext) { diff --git a/test/integration/dra/node_allocatable_resources.go b/test/integration/dra/node_allocatable_resources.go new file mode 100644 index 00000000000..b021a74eaca --- /dev/null +++ b/test/integration/dra/node_allocatable_resources.go @@ -0,0 +1,669 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dra + +import ( + "fmt" + "time" + + "github.com/onsi/gomega" + "github.com/onsi/gomega/gstruct" + gtypes "github.com/onsi/gomega/types" + v1 "k8s.io/api/core/v1" + resourceapi "k8s.io/api/resource/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + st "k8s.io/kubernetes/pkg/scheduler/testing" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + "k8s.io/kubernetes/test/utils/ktesting" + "k8s.io/utils/ptr" +) + +type testEnv struct { + namespace string + nodeName string + class *resourceapi.DeviceClass + driverName string + poolName string +} + +func testNodeAllocatableResources(tCtx ktesting.TContext, enabled bool) { + if !enabled { + return + } + tCtx.Run("ConsumablePool", testNodeAllocatableResourcesConsumablePool) + tCtx.Run("IndividualDevices", testNodeAllocatableResourcesIndividualDevices) + tCtx.Run("ClaimSharing", testNodeAllocatableResourceClaimSharing) + tCtx.Run("PodLevelResourceValidation", testPodLevelResourceValidation) + tCtx.Run("InsufficientNodeResources", testInsufficientNodeResources) + tCtx.Run("ClaimTemplateBasedAllocation", testNodeAllocatableResourcesWithClaimTemplate) + tCtx.Run("UnreferencedClaimInPod", testNodeAllocatableUnreferencedClaimInPod) +} + +func setupTestEnv(tCtx ktesting.TContext, nodeNum int) *testEnv { + + nodeName := fmt.Sprintf("worker-%d", nodeNum) + namespace := createTestNamespace(tCtx, nil) + class, driverName := createTestClass(tCtx, namespace) + poolName := namespace + "-pool" + + return &testEnv{ + namespace: namespace, + nodeName: nodeName, + class: class, + driverName: driverName, + poolName: poolName, + } +} + +func createSliceAndStartScheduler(tCtx ktesting.TContext, slice *resourceapi.ResourceSlice) { + tCtx.Helper() + createSlice(tCtx, slice) + startScheduler(tCtx) +} + +func makeSlice(env *testEnv, devices []resourceapi.Device) *resourceapi.ResourceSlice { + return &resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: env.namespace + "-", + }, + Spec: resourceapi.ResourceSliceSpec{ + NodeName: &env.nodeName, + Pool: resourceapi.ResourcePool{ + Name: env.poolName, + ResourceSliceCount: 1, + }, + Driver: env.driverName, + Devices: devices, + }, + } +} + +func expectPodUnschedulable(tCtx ktesting.TContext, pod *v1.Pod, reason string) { + tCtx.Helper() + tCtx.ExpectNoError(e2epod.WaitForPodNameUnschedulableInNamespace(tCtx, tCtx.Client(), pod.Name, pod.Namespace), fmt.Sprintf("expected pod to be unschedulable because %q", reason)) + pod, err := tCtx.Client().CoreV1().Pods(pod.Namespace).Get(tCtx, pod.Name, metav1.GetOptions{}) + tCtx.ExpectNoError(err) + gomega.NewWithT(tCtx).Expect(pod).To(gomega.HaveField("Status.Conditions", gomega.ContainElement(gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{ + "Type": gomega.Equal(v1.PodScheduled), + "Status": gomega.Equal(v1.ConditionFalse), + "Reason": gomega.Equal(v1.PodReasonUnschedulable), + "Message": gomega.ContainSubstring(reason), + })))) +} + +func verifyPodNodeAllocatableStatus(tCtx ktesting.TContext, namespace, podName string, expectedStatus []v1.NodeAllocatableResourceClaimStatus) { + tCtx.Helper() + var statusMatchers []gtypes.GomegaMatcher + for _, expected := range expectedStatus { + statusMatchers = append(statusMatchers, gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{ + "ResourceClaimName": gomega.ContainSubstring(expected.ResourceClaimName), + "Containers": gomega.Equal(expected.Containers), + "Resources": gomega.Equal(expected.Resources), + })) + } + tCtx.Eventually(func(tCtx ktesting.TContext) []v1.NodeAllocatableResourceClaimStatus { + pod, err := tCtx.Client().CoreV1().Pods(namespace).Get(tCtx, podName, metav1.GetOptions{}) + if err != nil { + tCtx.Logf("Error getting pod: %v", err) + return nil + } + return pod.Status.NodeAllocatableResourceClaimStatuses + }).WithTimeout(30*time.Second).WithPolling(200*time.Millisecond).Should(gomega.ConsistOf(statusMatchers), "pod node allocatable resource claim status") +} + +func createPodWithNodeAllocatableClaim(tCtx ktesting.TContext, env *testEnv, claimName, podName string, numContainers int) (*v1.Pod, *resourceapi.ResourceClaim) { + tCtx.Helper() + // Create a ResourceClaim for the node allocatable resource class + claim := st.MakeResourceClaim(). + Name(claimName). + Namespace(env.namespace). + Request(env.class.Name). + Obj() + claim = createClaim(tCtx, env.namespace, "", env.class, claim) + + // Create a Pod that uses the node allocatable claim + pod := st.MakePod().Name(podName).Namespace(env.namespace).Obj() + + containers := make([]v1.Container, 0, numContainers) + for i := range numContainers { + containers = append(containers, v1.Container{ + Name: fmt.Sprintf("my-container-%d", i+1), + Image: "test-image", + Resources: v1.ResourceRequirements{ + Claims: []v1.ResourceClaim{ + {Name: claimName}, + }, + }, + }) + } + pod.Spec.Containers = containers + pod.Spec.NodeSelector = map[string]string{"kubernetes.io/hostname": env.nodeName} + + // createPod will generate the pod.Spec.ResourceClaims entry + pod = createPod(tCtx, env.namespace, "", pod, claim) + return pod, claim +} + +func testNodeAllocatableResourcesConsumablePool(tCtx ktesting.TContext) { + tCtx.Parallel() + env := setupTestEnv(tCtx, 0) + + cpuCapacityKey := resourceapi.QualifiedName("dra.example.com/cpu") + memCapacityKey := resourceapi.QualifiedName("dra.example.com/memory") + + devices := []resourceapi.Device{ + { + Name: "node-allocatable-device-0", + AllowMultipleAllocations: ptr.To(true), + Capacity: map[resourceapi.QualifiedName]resourceapi.DeviceCapacity{ + cpuCapacityKey: {Value: resource.MustParse(nodeCPUCapacity)}, + memCapacityKey: {Value: resource.MustParse(nodeMemoryCapacity)}, + }, + NodeAllocatableResourceMappings: map[v1.ResourceName]resourceapi.NodeAllocatableResourceMapping{ + v1.ResourceCPU: {CapacityKey: &cpuCapacityKey}, + v1.ResourceMemory: {CapacityKey: &memCapacityKey}, + }, + }, + } + slice := makeSlice(env, devices) + env.poolName = env.namespace + "-node-allocatable-pool" // Override poolName for this test + slice.Spec.Pool.Name = env.poolName + createSliceAndStartScheduler(tCtx, slice) + + podName := "test-pod-consumable-claim" + claimName := "node-allocatable-claim-consumable" + pod, claim := createPodWithNodeAllocatableClaim(tCtx, env, claimName, podName, 2) + + waitForPodScheduled(tCtx, env.namespace, pod.Name) + allocatedClaim := waitForClaimAllocatedToDevice(tCtx, env.namespace, claim.Name, schedulingTimeout) + + gomega.NewWithT(tCtx).Expect(allocatedClaim).To( + gomega.HaveField("Status.Allocation", gstruct.PointTo( + gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{ + "Devices": gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{ + "Results": gomega.ConsistOf( + gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{ + "Request": gomega.Equal(claim.Spec.Devices.Requests[0].Name), + "Driver": gomega.Equal(env.driverName), + "Pool": gomega.Equal(env.poolName), + "Device": gomega.Equal("node-allocatable-device-0"), + "ConsumedCapacity": gomega.Equal(map[resourceapi.QualifiedName]resource.Quantity{ + cpuCapacityKey: resource.MustParse(nodeCPUCapacity), + memCapacityKey: resource.MustParse(nodeMemoryCapacity), + }), + "ShareID": gomega.Not(gomega.BeNil()), + }), + ), + }), + }), + )), + "node allocatable claim allocation", + ) + + expectedStatus := []v1.NodeAllocatableResourceClaimStatus{{ + ResourceClaimName: claim.Name, + Containers: []string{"my-container-1", "my-container-2"}, + Resources: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse(nodeCPUCapacity), + v1.ResourceMemory: resource.MustParse(nodeMemoryCapacity), + }, + }} + verifyPodNodeAllocatableStatus(tCtx, env.namespace, pod.Name, expectedStatus) + + anotherPod := st.MakePod().Name("another-pod").Namespace(env.namespace).Obj() + anotherPod.Spec.NodeSelector = map[string]string{"kubernetes.io/hostname": env.nodeName} + anotherPod.Spec.Containers = []v1.Container{ + { + Name: "test-container", + Image: "test-image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("2")}, + }, + }, + } + anotherPod = createPod(tCtx, env.namespace, "", anotherPod) + expectPodUnschedulable(tCtx, anotherPod, "Insufficient cpu") +} + +func testNodeAllocatableResourcesIndividualDevices(tCtx ktesting.TContext) { + tCtx.Parallel() + env := setupTestEnv(tCtx, 1) + + numCPUsPerDevice := resource.MustParse("16") + devices := []resourceapi.Device{ + { + Name: "numa-0-cpus", + NodeAllocatableResourceMappings: map[v1.ResourceName]resourceapi.NodeAllocatableResourceMapping{ + v1.ResourceCPU: {AllocationMultiplier: &numCPUsPerDevice}, + }, + }, + { + Name: "numa-1-cpus", + NodeAllocatableResourceMappings: map[v1.ResourceName]resourceapi.NodeAllocatableResourceMapping{ + v1.ResourceCPU: {AllocationMultiplier: &numCPUsPerDevice}, + }, + }, + } + slice := makeSlice(env, devices) + env.poolName = env.namespace + "-node-allocatable-pool" // Override poolName for this test + slice.Spec.Pool.Name = env.poolName + createSliceAndStartScheduler(tCtx, slice) + + podName := "test-pod-individual" + claimName := "node-allocatable-claim-individual" + pod, claim := createPodWithNodeAllocatableClaim(tCtx, env, claimName, podName, 2) + + waitForPodScheduled(tCtx, env.namespace, pod.Name) + allocatedClaim := waitForClaimAllocatedToDevice(tCtx, env.namespace, claim.Name, schedulingTimeout) + + gomega.NewWithT(tCtx).Expect(allocatedClaim).To( + gomega.HaveField("Status.Allocation", gstruct.PointTo( + gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{ + "Devices": gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{ + "Results": gomega.ConsistOf( + gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{ + "Request": gomega.Equal(claim.Spec.Devices.Requests[0].Name), + "Driver": gomega.Equal(env.driverName), + "Pool": gomega.Equal(env.poolName), + "Device": gomega.MatchRegexp("numa-[0-1]+-cpus"), + }), + ), + }), + }), + )), + "node allocatable claim allocation", + ) + + expectedStatus := []v1.NodeAllocatableResourceClaimStatus{{ + ResourceClaimName: claim.Name, + Containers: []string{"my-container-1", "my-container-2"}, + Resources: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: numCPUsPerDevice, + }, + }} + verifyPodNodeAllocatableStatus(tCtx, env.namespace, pod.Name, expectedStatus) +} + +func testNodeAllocatableResourceClaimSharing(tCtx ktesting.TContext) { + tCtx.Parallel() + env := setupTestEnv(tCtx, 2) + + cpuMultiplier := resource.MustParse("1") + devices := []resourceapi.Device{ + { + Name: "dev-sharetest", + NodeAllocatableResourceMappings: map[v1.ResourceName]resourceapi.NodeAllocatableResourceMapping{ + v1.ResourceCPU: {AllocationMultiplier: &cpuMultiplier}, + }, + }, + } + createSliceAndStartScheduler(tCtx, makeSlice(env, devices)) + + pod1, claim := createPodWithNodeAllocatableClaim(tCtx, env, "node-allocatable-claim", "pod1", 2) + waitForPodScheduled(tCtx, env.namespace, pod1.Name) + _ = waitForClaimAllocatedToDevice(tCtx, env.namespace, claim.Name, schedulingTimeout) + + // Pod 2 - Should NOT schedule as the claim is already used on this node + pod2 := st.MakePod().Name("pod2").Namespace(env.namespace).Obj() + container1 := v1.Container{ + Name: "c1", + Image: "test-image", + Resources: v1.ResourceRequirements{ + Claims: []v1.ResourceClaim{ + {Name: claim.Name}, // USE EXISTING CLAIM + }, + }, + } + pod2.Spec.Containers = []v1.Container{container1} + pod2.Spec.NodeSelector = map[string]string{"kubernetes.io/hostname": env.nodeName} + pod2 = createPod(tCtx, env.namespace, "", pod2, claim) // Pass the existing claim + expectPodUnschedulable(tCtx, pod2, "is already used by another pod") +} + +func testPodLevelResourceValidation(tCtx ktesting.TContext) { + tCtx.Parallel() + env := setupTestEnv(tCtx, 3) + + cpuMultiplier := resource.MustParse("4") + devices := []resourceapi.Device{ + { + Name: "dev0", + NodeAllocatableResourceMappings: map[v1.ResourceName]resourceapi.NodeAllocatableResourceMapping{ + v1.ResourceCPU: {AllocationMultiplier: &cpuMultiplier}, + }, + }, + { + Name: "dev1", + NodeAllocatableResourceMappings: map[v1.ResourceName]resourceapi.NodeAllocatableResourceMapping{ + v1.ResourceCPU: {AllocationMultiplier: &cpuMultiplier}, + }, + }, + } + createSliceAndStartScheduler(tCtx, makeSlice(env, devices)) + + tCtx.Run("SufficientPodLevelRequest", func(tCtx ktesting.TContext) { + claimName := "claim-podlevel-sufficient" + podName := "pod-podlevel-sufficient" + claim := st.MakeResourceClaim(). + Name(claimName). + Namespace(env.namespace). + Request(env.class.Name). + Obj() + claim = createClaim(tCtx, env.namespace, "", env.class, claim) + + pod := st.MakePod().Name(podName).Namespace(env.namespace).Obj() + pod.Spec.Resources = &v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("10"), // 2 (container) + 4 (DRA) < 10 + }, + } + container1 := v1.Container{ + Name: "c1", + Image: "test-image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("2"), + }, + Claims: []v1.ResourceClaim{ + {Name: claimName}, + }, + }, + } + pod.Spec.Containers = []v1.Container{container1} + pod.Spec.NodeSelector = map[string]string{"kubernetes.io/hostname": env.nodeName} + pod = createPod(tCtx, env.namespace, "", pod, claim) + + waitForPodScheduled(tCtx, env.namespace, pod.Name) + }) + + tCtx.Run("InsufficientPodLevelRequest", func(tCtx ktesting.TContext) { + claimName := "claim-podlevel-insufficient" + podName := "pod-podlevel-insufficient" + claim := st.MakeResourceClaim(). + Name(claimName). + Namespace(env.namespace). + Request(env.class.Name). + Obj() + claim = createClaim(tCtx, env.namespace, "", env.class, claim) + + pod := st.MakePod().Name(podName).Namespace(env.namespace).Obj() + pod.Spec.Resources = &v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("5"), // 2 (container) + 4 (DRA) > 5 + }, + } + container1 := v1.Container{ + Name: "c1", + Image: "test-image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("2"), + }, + Claims: []v1.ResourceClaim{ + {Name: claimName}, + }, + }, + } + pod.Spec.Containers = []v1.Container{container1} + pod.Spec.NodeSelector = map[string]string{"kubernetes.io/hostname": env.nodeName} + pod = createPod(tCtx, env.namespace, "", pod, claim) + + expectPodUnschedulable(tCtx, pod, "pod level request for cpu is insufficient to cover the aggregated container and node-allocatable DRA requests") + }) +} + +func testInsufficientNodeResources(tCtx ktesting.TContext) { + tCtx.Parallel() + env := setupTestEnv(tCtx, 4) + + tCtx.Run("ExceedNodeResourceCapacityMultiplier", func(tCtx ktesting.TContext) { + cpuMultiplier := resource.MustParse(nodeCPUCapacity) + cpuMultiplier.Add(resource.MustParse("1")) + + devices := []resourceapi.Device{ + { + Name: "dev-exceed-cpu", + NodeAllocatableResourceMappings: map[v1.ResourceName]resourceapi.NodeAllocatableResourceMapping{ + v1.ResourceCPU: {AllocationMultiplier: &cpuMultiplier}, + }, + }, + } + createSliceAndStartScheduler(tCtx, makeSlice(env, devices)) + + pod, _ := createPodWithNodeAllocatableClaim(tCtx, env, "claim-exceed-cpu", "pod-exceed-cpu", 1) + expectPodUnschedulable(tCtx, pod, "Insufficient cpu") + }) + + tCtx.Run("ExceedNodeCapacityWithCapacityKey", func(tCtx ktesting.TContext) { + cpuCapacityKey := resourceapi.QualifiedName("dra.example.com/cpu") + exceedCPU := resource.MustParse(nodeCPUCapacity) + exceedCPU.Add(resource.MustParse("1")) + + devices := []resourceapi.Device{ + { + Name: "dev-exceed-cpu-capkey", + Capacity: map[resourceapi.QualifiedName]resourceapi.DeviceCapacity{ + cpuCapacityKey: {Value: exceedCPU}, + }, + NodeAllocatableResourceMappings: map[v1.ResourceName]resourceapi.NodeAllocatableResourceMapping{ + v1.ResourceCPU: {CapacityKey: &cpuCapacityKey}, + }, + AllowMultipleAllocations: ptr.To(true), + }, + } + createSliceAndStartScheduler(tCtx, makeSlice(env, devices)) + + pod, _ := createPodWithNodeAllocatableClaim(tCtx, env, "claim-exceed-cpu-capkey", "pod-exceed-cpu-capkey", 1) + expectPodUnschedulable(tCtx, pod, "Insufficient cpu") + }) + + tCtx.Run("StandardPodFailsDueToInsufficientResourcesAfterDRAConsumption", func(tCtx ktesting.TContext) { + cpuMultiplier := resource.MustParse("98") // Consume most of the CPU + devices := []resourceapi.Device{ + { + Name: "dev-consume-most", + NodeAllocatableResourceMappings: map[v1.ResourceName]resourceapi.NodeAllocatableResourceMapping{ + v1.ResourceCPU: {AllocationMultiplier: &cpuMultiplier}, + }, + }, + } + createSliceAndStartScheduler(tCtx, makeSlice(env, devices)) + + pod1, _ := createPodWithNodeAllocatableClaim(tCtx, env, "claim-consume-most", "pod-consume-most", 1) + waitForPodScheduled(tCtx, env.namespace, pod1.Name) + waitForClaimAllocatedToDevice(tCtx, env.namespace, "claim-consume-most", schedulingTimeout) + + pod2 := st.MakePod().Name("pod-standard-fails").Namespace(env.namespace).Obj() + pod2.Spec.Containers = []v1.Container{ + { + Name: "c2", + Image: "test-image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("5"), + }, + }, + }, + } + pod2.Spec.NodeSelector = map[string]string{"kubernetes.io/hostname": env.nodeName} + pod2 = createPod(tCtx, env.namespace, "-std-fails", pod2) + expectPodUnschedulable(tCtx, pod2, "Insufficient cpu") + }) + + tCtx.Run("StandardPodSucceedsAfterDRA", func(tCtx ktesting.TContext) { + cpuMultiplier := resource.MustParse("50") // Consume half of the CPU + devices := []resourceapi.Device{ + { + Name: "dev-consume-half", + NodeAllocatableResourceMappings: map[v1.ResourceName]resourceapi.NodeAllocatableResourceMapping{ + v1.ResourceCPU: {AllocationMultiplier: &cpuMultiplier}, + }, + }, + } + createSliceAndStartScheduler(tCtx, makeSlice(env, devices)) + + pod1, _ := createPodWithNodeAllocatableClaim(tCtx, env, "claim-consume-half", "pod-consume-half", 1) + waitForPodScheduled(tCtx, env.namespace, pod1.Name) + waitForClaimAllocatedToDevice(tCtx, env.namespace, "claim-consume-half", schedulingTimeout) + + pod2 := st.MakePod().Name("pod-standard-succeeds").Namespace(env.namespace).Obj() + pod2.Spec.Containers = []v1.Container{ + { + Name: "c2", + Image: "test-image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("5"), + }, + }, + }, + } + pod2.Spec.NodeSelector = map[string]string{"kubernetes.io/hostname": env.nodeName} + pod2 = createPod(tCtx, env.namespace, "-std-succeeds", pod2) + waitForPodScheduled(tCtx, env.namespace, pod2.Name) + }) +} + +func testNodeAllocatableResourcesWithClaimTemplate(tCtx ktesting.TContext) { + tCtx.Parallel() + env := setupTestEnv(tCtx, 6) // Use a different node index + + cpuCapacityKey := resourceapi.QualifiedName("dra.example.com/cpu") + memCapacityKey := resourceapi.QualifiedName("dra.example.com/memory") + + devices := []resourceapi.Device{ + { + Name: "dev0", + AllowMultipleAllocations: ptr.To(true), + Capacity: map[resourceapi.QualifiedName]resourceapi.DeviceCapacity{ + cpuCapacityKey: {Value: resource.MustParse(nodeCPUCapacity)}, + memCapacityKey: {Value: resource.MustParse(nodeMemoryCapacity)}, + }, + NodeAllocatableResourceMappings: map[v1.ResourceName]resourceapi.NodeAllocatableResourceMapping{ + v1.ResourceCPU: {CapacityKey: &cpuCapacityKey}, + v1.ResourceMemory: {CapacityKey: &memCapacityKey}, + }, + }, + } + slice := makeSlice(env, devices) + env.poolName = env.namespace + "-pool-template" + slice.Spec.Pool.Name = env.poolName + createSliceAndStartScheduler(tCtx, slice) + startClaimController(tCtx) + + templateName := "node-allocatable-resource-template" + podClaimName := "claim1" + podName := "pod-with-claim-template" + + claimTemplate := &resourceapi.ResourceClaimTemplate{ + ObjectMeta: metav1.ObjectMeta{ + Name: templateName, + Namespace: env.namespace, + }, + Spec: resourceapi.ResourceClaimTemplateSpec{ + Spec: resourceapi.ResourceClaimSpec{ + Devices: resourceapi.DeviceClaim{ + Requests: []resourceapi.DeviceRequest{ + { + Name: "req", + Exactly: &resourceapi.ExactDeviceRequest{ + DeviceClassName: env.class.Name, + Capacity: &resourceapi.CapacityRequirements{ + Requests: map[resourceapi.QualifiedName]resource.Quantity{ + cpuCapacityKey: resource.MustParse("10"), + memCapacityKey: resource.MustParse("100"), + }, + }, + }, + }, + }, + }, + }, + }, + } + _, err := tCtx.Client().ResourceV1().ResourceClaimTemplates(env.namespace).Create(tCtx, claimTemplate, metav1.CreateOptions{}) + tCtx.ExpectNoError(err) + + claimTemplateRef := v1.PodResourceClaim{ + Name: podClaimName, + ResourceClaimTemplateName: &claimTemplate.Name, + } + + pod := st.MakePod().Name(podName).Namespace(env.namespace).PodResourceClaims(claimTemplateRef). + Containers([]v1.Container{{Name: "c1", Image: "test", Resources: v1.ResourceRequirements{Claims: []v1.ResourceClaim{{Name: podClaimName}}}}}). + NodeSelector(map[string]string{"kubernetes.io/hostname": env.nodeName}). + Obj() + pod = createPod(tCtx, env.namespace, "", pod) + + waitForPodScheduled(tCtx, env.namespace, pod.Name) + + expectedStatus := []v1.NodeAllocatableResourceClaimStatus{{ + ResourceClaimName: podName, // The genereate claim based on template contains pod name + Containers: []string{"c1"}, + Resources: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("10"), + v1.ResourceMemory: resource.MustParse("100"), + }, + }} + verifyPodNodeAllocatableStatus(tCtx, env.namespace, pod.Name, expectedStatus) +} + +func testNodeAllocatableUnreferencedClaimInPod(tCtx ktesting.TContext) { + tCtx.Parallel() + env := setupTestEnv(tCtx, 7) // Use a different node index + + cpuCapacityKey := resourceapi.QualifiedName("dra.example.com/cpu") + devices := []resourceapi.Device{ + { + Name: "dev0", + AllowMultipleAllocations: ptr.To(true), + Capacity: map[resourceapi.QualifiedName]resourceapi.DeviceCapacity{ + cpuCapacityKey: {Value: resource.MustParse(nodeCPUCapacity)}, + }, + NodeAllocatableResourceMappings: map[v1.ResourceName]resourceapi.NodeAllocatableResourceMapping{ + v1.ResourceCPU: {CapacityKey: &cpuCapacityKey}, + }, + }, + } + slice := makeSlice(env, devices) + env.poolName = env.namespace + "-pool-unused" + slice.Spec.Pool.Name = env.poolName + createSliceAndStartScheduler(tCtx, slice) + startClaimController(tCtx) + + claimName := "unused-claim" + podName := "pod-with-unused-claim" + + // Create a ResourceClaim for the node allocatable resource class + claim := st.MakeResourceClaim(). + Name(claimName). + Namespace(env.namespace). + Request(env.class.Name). + Obj() + claim = createClaim(tCtx, env.namespace, "", env.class, claim) + + // Create a Pod that has the node allocatable claim in spec.ResourceClaims, but no container uses it. + pod := st.MakePod().Name(podName).Namespace(env.namespace). + Containers([]v1.Container{{Name: "c1", Image: "test"}}). + NodeSelector(map[string]string{"kubernetes.io/hostname": env.nodeName}). + Obj() + pod = createPod(tCtx, env.namespace, "", pod, claim) + + expectedErrorMsg := "node-allocatable resource claim not referenced by any container within the pod" + expectPodUnschedulable(tCtx, pod, expectedErrorMsg) +}