diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index c01507e23d8..ad9b49d775c 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -668,7 +668,7 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state fwk.CycleState, if err != nil { return nil, statusError(logger, err) } - features := allocatorFeatures(pl.fts) + features := AllocatorFeatures(pl.fts) allocator, err := structured.NewAllocator(ctx, features, *allocatedState, pl.draManager.DeviceClasses(), slices, pl.celCache) if err != nil { return nil, statusError(logger, err) @@ -680,7 +680,7 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state fwk.CycleState, return nil, nil } -func allocatorFeatures(fts feature.Features) structured.Features { +func AllocatorFeatures(fts feature.Features) structured.Features { return structured.Features{ AdminAccess: fts.EnableDRAAdminAccess, PrioritizedList: fts.EnableDRAPrioritizedList, diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go index 3f50cf5cbea..e222f2bb50b 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go @@ -3809,7 +3809,7 @@ func TestAllocatorSelection(t *testing.T) { featureGate := utilfeature.DefaultFeatureGate.DeepCopy() tCtx.ExpectNoError(featureGate.Set(tc.features), "set features") fts := feature.NewSchedulerFeaturesFromGates(featureGate) - features := allocatorFeatures(fts) + features := AllocatorFeatures(fts) // Slightly hacky: most arguments are not valid and the constructor // is expected to not use them yet. diff --git a/pkg/scheduler/framework/plugins/noderesources/balanced_allocation.go b/pkg/scheduler/framework/plugins/noderesources/balanced_allocation.go index 8371a0a36f2..48e4caf6c9b 100644 --- a/pkg/scheduler/framework/plugins/noderesources/balanced_allocation.go +++ b/pkg/scheduler/framework/plugins/noderesources/balanced_allocation.go @@ -135,6 +135,7 @@ func NewBalancedAllocation(_ context.Context, baArgs runtime.Object, h fwk.Handl Name: BalancedAllocationName, enableInPlacePodVerticalScaling: fts.EnableInPlacePodVerticalScaling, enablePodLevelResources: fts.EnablePodLevelResources, + enableDRAExtendedResource: fts.EnableDRAExtendedResource, scorer: balancedResourceScorer, useRequested: true, resources: args.Resources, diff --git a/pkg/scheduler/framework/plugins/noderesources/fit.go b/pkg/scheduler/framework/plugins/noderesources/fit.go index 7cec2f79f9e..ba584a4471b 100644 --- a/pkg/scheduler/framework/plugins/noderesources/fit.go +++ b/pkg/scheduler/framework/plugins/noderesources/fit.go @@ -27,12 +27,14 @@ import ( "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/component-helpers/resource" + "k8s.io/dynamic-resource-allocation/cel" "k8s.io/klog/v2" fwk "k8s.io/kube-scheduler/framework" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/validation" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/dynamicresources" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" schedutil "k8s.io/kubernetes/pkg/scheduler/util" @@ -173,6 +175,16 @@ func NewFit(_ context.Context, plArgs runtime.Object, h fwk.Handle, fts feature. return nil, fmt.Errorf("scoring strategy %s is not supported", strategy) } + scorer := scorePlugin(args) + if fts.EnableDRAExtendedResource { + scorer.enableDRAExtendedResource = true + scorer.draManager = h.SharedDRAManager() + scorer.draFeatures = dynamicresources.AllocatorFeatures(fts) + // Create a CEL cache for device class selector compilation + // This cache improves performance by avoiding recompilation of the same CEL expressions + scorer.celCache = cel.NewCache(10, cel.Features{EnableConsumableCapacity: fts.EnableDRAConsumableCapacity}) + } + return &Fit{ ignoredResources: sets.New(args.IgnoredResources...), ignoredResourceGroups: sets.New(args.IgnoredResourceGroups...), @@ -182,7 +194,7 @@ func NewFit(_ context.Context, plArgs runtime.Object, h fwk.Handle, fts feature. handle: h, enablePodLevelResources: fts.EnablePodLevelResources, enableDRAExtendedResource: fts.EnableDRAExtendedResource, - resourceAllocationScorer: *scorePlugin(args), + resourceAllocationScorer: *scorer, }, nil } diff --git a/pkg/scheduler/framework/plugins/noderesources/resource_allocation.go b/pkg/scheduler/framework/plugins/noderesources/resource_allocation.go index 33a46e5a871..aaf7fa0faba 100644 --- a/pkg/scheduler/framework/plugins/noderesources/resource_allocation.go +++ b/pkg/scheduler/framework/plugins/noderesources/resource_allocation.go @@ -21,11 +21,16 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/dynamic-resource-allocation/cel" "k8s.io/klog/v2" + "k8s.io/utils/ptr" + resourceapi "k8s.io/api/resource/v1" resourcehelper "k8s.io/component-helpers/resource" + "k8s.io/dynamic-resource-allocation/structured" fwk "k8s.io/kube-scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/apis/config" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/dynamicresources/extended" schedutil "k8s.io/kubernetes/pkg/scheduler/util" ) @@ -37,11 +42,15 @@ type resourceAllocationScorer struct { Name string enableInPlacePodVerticalScaling bool enablePodLevelResources bool + enableDRAExtendedResource bool // used to decide whether to use Requested or NonZeroRequested for // cpu and memory. useRequested bool scorer func(requested, allocable []int64) int64 resources []config.ResourceSpec + draFeatures structured.Features + draManager fwk.SharedDRAManager + celCache *cel.Cache } // score will use `scorer` function to calculate the score. @@ -61,7 +70,7 @@ func (r *resourceAllocationScorer) score( requested := make([]int64, len(r.resources)) allocatable := make([]int64, len(r.resources)) for i := range r.resources { - alloc, req := r.calculateResourceAllocatableRequest(logger, nodeInfo, v1.ResourceName(r.resources[i].Name), podRequests[i]) + alloc, req := r.calculateResourceAllocatableRequest(ctx, nodeInfo, v1.ResourceName(r.resources[i].Name), podRequests[i]) // Only fill the extended resource entry when it's non-zero. if alloc == 0 { continue @@ -86,7 +95,7 @@ func (r *resourceAllocationScorer) score( // - 1st param: quantity of allocatable resource on the node. // - 2nd param: aggregated quantity of requested resource on the node. // Note: if it's an extended resource, and the pod doesn't request it, (0, 0) is returned. -func (r *resourceAllocationScorer) calculateResourceAllocatableRequest(logger klog.Logger, nodeInfo fwk.NodeInfo, resource v1.ResourceName, podRequest int64) (int64, int64) { +func (r *resourceAllocationScorer) calculateResourceAllocatableRequest(ctx context.Context, nodeInfo fwk.NodeInfo, resource v1.ResourceName, podRequest int64) (int64, int64) { requested := nodeInfo.GetNonZeroRequested() if r.useRequested { requested = nodeInfo.GetRequested() @@ -105,11 +114,20 @@ func (r *resourceAllocationScorer) calculateResourceAllocatableRequest(logger kl case v1.ResourceEphemeralStorage: return nodeInfo.GetAllocatable().GetEphemeralStorage(), (nodeInfo.GetRequested().GetEphemeralStorage() + podRequest) default: - if _, exists := nodeInfo.GetAllocatable().GetScalarResources()[resource]; exists { - return nodeInfo.GetAllocatable().GetScalarResources()[resource], (nodeInfo.GetRequested().GetScalarResources()[resource] + podRequest) + allocatable, exists := nodeInfo.GetAllocatable().GetScalarResources()[resource] + if allocatable == 0 && r.enableDRAExtendedResource { + // Allocatable 0 means that this resource is not handled by device plugin. + // Calculate allocatable and requested for resources backed by DRA. + allocatable, allocated := r.calculateDRAExtendedResourceAllocatableRequest(ctx, nodeInfo.Node(), resource) + if allocatable > 0 { + return allocatable, allocated + podRequest + } + } + if exists { + return allocatable, (nodeInfo.GetRequested().GetScalarResources()[resource] + podRequest) } } - logger.V(10).Info("Requested resource is omitted for node score calculation", "resourceName", resource) + klog.FromContext(ctx).V(10).Info("Requested resource is omitted for node score calculation", "resourceName", resource) return 0, 0 } @@ -155,3 +173,152 @@ func (r *resourceAllocationScorer) isBestEffortPod(podRequests []int64) bool { } return true } + +// calculateDRAExtendedResourceAllocatableRequest calculates allocatable and allocated +// quantities for extended resources backed by DRA. +func (r *resourceAllocationScorer) calculateDRAExtendedResourceAllocatableRequest(ctx context.Context, node *v1.Node, resource v1.ResourceName) (int64, int64) { + logger := klog.FromContext(ctx) + // Get device class mapping to find the device class for this resource + deviceClassMapping, err := extended.DeviceClassMapping(r.draManager) + if err != nil { + logger.Error(err, "Failed to get device class mapping for DRA extended resource scoring") + return 0, 0 + } + + deviceClassName, exists := deviceClassMapping[resource] + if !exists { + logger.Error(nil, "Extended resource not found in device class mapping", "resource", resource) + return 0, 0 + } + + deviceClass, err := r.draManager.DeviceClasses().Get(deviceClassName) + if err != nil { + logger.Error(err, "Failed to get device class for DRA extended resource scoring", "resource", resource, "deviceClass", deviceClassName) + return 0, 0 + } + + capacity, allocated, err := r.calculateDRAResourceTotals(ctx, node, deviceClass) + if err != nil { + logger.Error(err, "Failed to calculate DRA resource capacity and allocated", "node", node.Name, "resource", resource, "deviceClass", deviceClassName) + return 0, 0 + } + + logger.V(7).Info("DRA extended resource calculation", "node", node.Name, "resource", resource, "deviceClass", deviceClassName, "capacity", capacity, "allocated", allocated) + return capacity, allocated +} + +// calculateDRAResourceTotals computes the total capacity and total allocated count of devices +// matching the specified Device Class on the given node. It queries the DRA manager for resource +// slices and allocated devices, filters devices by class and driver, and returns the counts. +// Returns an error if resource information cannot be retrieved or if node matching fails. +// +// Parameters: +// +// ctx - context for cancellation and deadlines +// node - the node to evaluate device resources on +// deviceClass - the device class to filter devices by +// +// Returns: +// +// totalCapacity - total number of devices matching the device class on the node +// totalAllocated - number of devices currently allocated from the matching set +// error - any error encountered during processing +func (r *resourceAllocationScorer) calculateDRAResourceTotals(ctx context.Context, node *v1.Node, deviceClass *resourceapi.DeviceClass) (int64, int64, error) { + allocatedState, err := r.draManager.ResourceClaims().GatherAllocatedState() + if err != nil { + return 0, 0, err + } + + resourceSlices, err := r.draManager.ResourceSlices().ListWithDeviceTaintRules() + if err != nil { + return 0, 0, err + } + + var totalCapacity, totalAllocated int64 + for _, slice := range resourceSlices { + driver := slice.Spec.Driver + pool := slice.Spec.Pool.Name + var devices []resourceapi.Device + // Handle per-device node selection vs slice-level node selection + if ptr.Deref(slice.Spec.PerDeviceNodeSelection, false) { + devices = []resourceapi.Device{} + // When per-device node selection is enabled, check each device individually + for _, device := range slice.Spec.Devices { + // Check if this specific device matches the node + deviceMatches, err := structured.NodeMatches(r.draFeatures, node, ptr.Deref(device.NodeName, ""), ptr.Deref(device.AllNodes, false), device.NodeSelector) + if err != nil { + return 0, 0, err + } + if deviceMatches { + devices = append(devices, device) + } + } + } else { + // When per-device node selection is disabled, check slice-level node selection first + matches, err := structured.NodeMatches(r.draFeatures, node, ptr.Deref(slice.Spec.NodeName, ""), ptr.Deref(slice.Spec.AllNodes, false), slice.Spec.NodeSelector) + if err != nil { + return 0, 0, err + } + if !matches { + // Skip this slice as it doesn't match the node + continue + } + devices = slice.Spec.Devices + } + // Count devices that match the device class + for _, device := range devices { + matches, err := r.deviceMatchesClass(ctx, device, deviceClass, driver) + if err != nil { + return 0, 0, err + } + if matches { + totalCapacity++ + // Count allocated devices (both fully allocated and partially consumed) + deviceID := structured.MakeDeviceID(driver, pool, device.Name) + if structured.IsDeviceAllocated(deviceID, allocatedState) { + totalAllocated++ + } + } + } + } + + return totalCapacity, totalAllocated, nil +} + +// deviceMatchesClass checks if a device matches the selectors of a device class. +// Note: This method assumes the device class has ExtendedResourceName set, as filtering +// should be done by the caller to ensure we only process DRA resources meant for extended +// resource scoring. +func (r *resourceAllocationScorer) deviceMatchesClass(ctx context.Context, device resourceapi.Device, deviceClass *resourceapi.DeviceClass, driver string) (bool, error) { + // If no selectors are defined, all devices match + if len(deviceClass.Spec.Selectors) == 0 { + return true, nil + } + + // All selectors must match for the device to be considered a match + for _, selector := range deviceClass.Spec.Selectors { + if selector.CEL == nil { + continue + } + + // Use cached CEL compilation for performance + result := r.celCache.GetOrCompile(selector.CEL.Expression) + if result.Error != nil { + return false, result.Error + } + + // Evaluate the expression against the device + celDevice := cel.Device{ + Driver: driver, + Attributes: device.Attributes, + Capacity: device.Capacity, + } + + matches, _, err := result.DeviceMatches(ctx, celDevice) + if err != nil || !matches { + return false, nil + } + } + + return true, nil +} diff --git a/pkg/scheduler/framework/plugins/noderesources/resource_allocation_test.go b/pkg/scheduler/framework/plugins/noderesources/resource_allocation_test.go index f13efe21d35..25e2eb996d9 100644 --- a/pkg/scheduler/framework/plugins/noderesources/resource_allocation_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/resource_allocation_test.go @@ -17,12 +17,29 @@ limitations under the License. package noderesources import ( + "context" "testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" + resourceapi "k8s.io/api/resource/v1" "k8s.io/apimachinery/pkg/api/resource" - + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apiruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/dynamic-resource-allocation/cel" + "k8s.io/dynamic-resource-allocation/resourceslice/tracker" + "k8s.io/dynamic-resource-allocation/structured" + "k8s.io/klog/v2/ktesting" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/dynamicresources" + st "k8s.io/kubernetes/pkg/scheduler/testing" "k8s.io/kubernetes/pkg/scheduler/util" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" + "k8s.io/utils/ptr" ) func TestResourceAllocationScorerCalculateRequests(t *testing.T) { @@ -238,3 +255,331 @@ func TestResourceAllocationScorerCalculateRequests(t *testing.T) { }) } } + +func TestCalculateResourceAllocatableRequest(t *testing.T) { + // Initialize test variables + nodeName := "resource-node" + driverName := "test-driver" + testClaim := "test-claim" + explicitExtendedResource := v1.ResourceName(extendedResourceName) + implicitExtendedResource := v1.ResourceName(resourceapi.ResourceDeviceClassPrefix + deviceClassName) + celCache := cel.NewCache(10, cel.Features{EnableConsumableCapacity: true}) + draFeatures := structured.Features{ + AdminAccess: true, + PrioritizedList: true, + PartitionableDevices: true, + DeviceTaints: true, + DeviceBindingAndStatus: true, + ConsumableCapacity: true, + } + + // Define test cases + tests := map[string]struct { + enableDRAExtendedResource bool + node *v1.Node + extendedResource v1.ResourceName + objects []apiruntime.Object + podRequest int64 + expectedAllocatable int64 + expectedRequested int64 + }{ + "device-plugin-resource-feature-disabled": { + enableDRAExtendedResource: false, + node: st.MakeNode().Name(nodeName).Capacity(map[v1.ResourceName]string{explicitExtendedResource: "4"}).Obj(), + extendedResource: explicitExtendedResource, + podRequest: 1, + expectedAllocatable: 4, + expectedRequested: 1, + }, + "device-plugin-resource-feature-enabled": { + enableDRAExtendedResource: true, + node: st.MakeNode().Name(nodeName).Capacity(map[v1.ResourceName]string{explicitExtendedResource: "4"}).Obj(), + extendedResource: explicitExtendedResource, + podRequest: 1, + expectedAllocatable: 4, + expectedRequested: 1, + }, + "DRA-backed-resource-explicit": { + enableDRAExtendedResource: true, + node: st.MakeNode().Name(nodeName).Obj(), + extendedResource: explicitExtendedResource, + objects: []apiruntime.Object{ + deviceClassWithExtendResourceName, + st.MakeResourceSlice(nodeName, driverName).Device("device-1").Obj(), + }, + podRequest: 1, + expectedAllocatable: 1, + expectedRequested: 1, + }, + "DRA-backed-resource-implicit": { + enableDRAExtendedResource: true, + node: st.MakeNode().Name(nodeName).Obj(), + extendedResource: implicitExtendedResource, + objects: []apiruntime.Object{ + &resourceapi.DeviceClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: deviceClassName, + }, + }, + st.MakeResourceSlice(nodeName, driverName).Device("device-1").Obj(), + }, + podRequest: 1, + expectedAllocatable: 1, + expectedRequested: 1, + }, + "DRA-backed-resource-no-slices": { + enableDRAExtendedResource: true, + node: st.MakeNode().Name(nodeName).Obj(), + extendedResource: explicitExtendedResource, + objects: []apiruntime.Object{deviceClassWithExtendResourceName}, + podRequest: 1, + expectedAllocatable: 0, + expectedRequested: 0, + }, + "DRA-backed-resource-with-allocated-device": { + enableDRAExtendedResource: true, + node: st.MakeNode().Name(nodeName).Obj(), + extendedResource: explicitExtendedResource, + objects: []apiruntime.Object{ + deviceClassWithExtendResourceName, + st.MakeResourceSlice(nodeName, driverName).Devices("device-1", "device-2").Obj(), + // Create a resource claim that fully allocates device-1 + st.MakeResourceClaim(). + Name(testClaim). + Request(deviceClassName). + Allocation(&resourceapi.AllocationResult{ + Devices: resourceapi.DeviceAllocationResult{ + Results: []resourceapi.DeviceRequestAllocationResult{ + { + Request: "req-1", + Driver: driverName, + Pool: nodeName, + Device: "device-1", + }, + }, + }, + }). + Obj(), + }, + podRequest: 1, + expectedAllocatable: 2, + expectedRequested: 2, // 1 allocated + 1 requested + }, + "DRA-backed-resource-with-shared-device-allocation": { + enableDRAExtendedResource: true, + node: st.MakeNode().Name(nodeName).Obj(), + extendedResource: explicitExtendedResource, + objects: []apiruntime.Object{ + deviceClassWithExtendResourceName, + st.MakeResourceSlice(nodeName, driverName).Devices("device-1", "device-2").Obj(), + // Create a resource claim with shared device allocation (consumable capacity) + st.MakeResourceClaim(). + Name(testClaim). + Request(deviceClassName). + Allocation(&resourceapi.AllocationResult{ + Devices: resourceapi.DeviceAllocationResult{ + Results: []resourceapi.DeviceRequestAllocationResult{ + { + Request: "req-1", + Driver: driverName, + Pool: nodeName, + Device: "device-1", + ShareID: ptr.To(types.UID("share-123")), // Shared device allocation + }, + }, + }, + }). + Obj(), + }, + podRequest: 1, + expectedAllocatable: 2, + expectedRequested: 2, // 1 allocated (shared) + 1 requested + }, + "DRA-backed-resource-multiple-devices-mixed-allocation": { + enableDRAExtendedResource: true, + node: st.MakeNode().Name(nodeName).Obj(), + extendedResource: explicitExtendedResource, + objects: []apiruntime.Object{ + deviceClassWithExtendResourceName, + st.MakeResourceSlice(nodeName, driverName).Devices("device-1", "device-2", "device-3").Obj(), + // Mix of fully allocated and shared device allocations + st.MakeResourceClaim(). + Name("test-claim-1"). + Request(deviceClassName). + Allocation(&resourceapi.AllocationResult{ + Devices: resourceapi.DeviceAllocationResult{ + Results: []resourceapi.DeviceRequestAllocationResult{ + { + Request: "req-1", + Driver: driverName, + Pool: nodeName, + Device: "device-1", + // No ShareID = fully allocated device + }, + }, + }, + }). + Obj(), + st.MakeResourceClaim(). + Name("test-claim-2"). + Request(deviceClassName). + Allocation(&resourceapi.AllocationResult{ + Devices: resourceapi.DeviceAllocationResult{ + Results: []resourceapi.DeviceRequestAllocationResult{ + { + Request: "req-1", + Driver: driverName, + Pool: nodeName, + Device: "device-2", + ShareID: ptr.To(types.UID("share-456")), // Shared device allocation + }, + }, + }, + }). + Obj(), + // device-3 remains unallocated + }, + podRequest: 1, + expectedAllocatable: 3, + expectedRequested: 3, // 2 allocated (1 full + 1 shared) + 1 requested + }, + "DRA-backed-resource-with-per-device-node-selection": { + enableDRAExtendedResource: true, + node: st.MakeNode().Name(nodeName).Obj(), + extendedResource: explicitExtendedResource, + objects: []apiruntime.Object{ + &resourceapi.DeviceClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: deviceClassName, + }, + Spec: resourceapi.DeviceClassSpec{ + ExtendedResourceName: &extendedResourceName, + Selectors: []resourceapi.DeviceSelector{ + { + CEL: &resourceapi.CELDeviceSelector{ + // Realistic GPU selection: match test driver with at least 8GB memory and SOME- model + Expression: `device.driver == "test-driver" && device.capacity["test-driver"].memory.compareTo(quantity("8Gi")) >= 0 && device.attributes["test-driver"].model.startsWith("SOME-")`, + }, + }, + }, + }, + }, + // Create a custom resource slice with per-device node selection + &resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{Name: "per-device-slice"}, + Spec: resourceapi.ResourceSliceSpec{ + Driver: driverName, // Match the CEL expression driver + Pool: resourceapi.ResourcePool{Name: "per-device-pool", ResourceSliceCount: 1}, + PerDeviceNodeSelection: ptr.To(true), // Enable per-device node selection + // Note: No NodeName, AllNodes, or NodeSelector at slice level when PerDeviceNodeSelection is true + Devices: []resourceapi.Device{ + { + Name: "device-1", + NodeName: ptr.To(nodeName), // This device matches the test node + Attributes: map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{ + "model": {StringValue: ptr.To("SOME-XYZ")}, + }, + Capacity: map[resourceapi.QualifiedName]resourceapi.DeviceCapacity{ + "memory": {Value: resource.MustParse("16Gi")}, // 16GB GPU - matches CEL (>= 8GB) + }, + }, + { + Name: "device-2", + NodeName: ptr.To("other-node"), // This device matches a different node + Attributes: map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{ + "model": {StringValue: ptr.To("SOME-ZYX")}, + }, + Capacity: map[resourceapi.QualifiedName]resourceapi.DeviceCapacity{ + "memory": {Value: resource.MustParse("12Gi")}, // 12GB GPU - matches CEL (>= 8GB) + }, + }, + { + Name: "device-3", + AllNodes: ptr.To(true), // This device matches all nodes + Attributes: map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{ + "model": {StringValue: ptr.To("SOME-XZY")}, + }, + Capacity: map[resourceapi.QualifiedName]resourceapi.DeviceCapacity{ + "memory": {Value: resource.MustParse("24Gi")}, // 24GB GPU - matches CEL (>= 8GB) + }, + }, + }, + }, + }, + // Create a resource claim that allocates gpu-1 + st.MakeResourceClaim(). + Name("test-claim"). + Request(deviceClassName). + Allocation(&resourceapi.AllocationResult{ + Devices: resourceapi.DeviceAllocationResult{ + Results: []resourceapi.DeviceRequestAllocationResult{ + { + Request: "req-1", + Driver: driverName, + Pool: "per-device-pool", + Device: "device-1", + }, + }, + }, + }). + Obj(), + }, + podRequest: 1, + expectedAllocatable: 2, // Only device-1 (matches test-node) and device-3 (matches all nodes) + expectedRequested: 2, // 1 allocated (device-1) + 1 requested + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + // Setup environment, create required objects + logger, tCtx := ktesting.NewTestContext(t) + tCtx, cancel := context.WithCancel(tCtx) + defer cancel() + + client := fake.NewClientset(tc.objects...) + informerFactory := informers.NewSharedInformerFactory(client, 0) + resourceSliceTrackerOpts := tracker.Options{ + SliceInformer: informerFactory.Resource().V1().ResourceSlices(), + TaintInformer: informerFactory.Resource().V1alpha3().DeviceTaintRules(), + ClassInformer: informerFactory.Resource().V1().DeviceClasses(), + KubeClient: client, + } + resourceSliceTracker, err := tracker.StartTracker(tCtx, resourceSliceTrackerOpts) + require.NoError(t, err, "couldn't start resource slice tracker") + draManager := dynamicresources.NewDRAManager( + tCtx, + assumecache.NewAssumeCache( + logger, + informerFactory.Resource().V1().ResourceClaims().Informer(), + "resource claim", + "", + nil), + resourceSliceTracker, + informerFactory) + + informerFactory.Start(tCtx.Done()) + t.Cleanup(func() { + // Now we can wait for all goroutines to stop. + informerFactory.Shutdown() + }) + informerFactory.WaitForCacheSync(tCtx.Done()) + + nodeInfo := framework.NewNodeInfo() + nodeInfo.SetNode(tc.node) + + scorer := &resourceAllocationScorer{ + enableDRAExtendedResource: tc.enableDRAExtendedResource, + draManager: draManager, + draFeatures: draFeatures, + celCache: celCache, + } + + // Test calculateResourceAllocatableRequest API + allocatable, requested := scorer.calculateResourceAllocatableRequest(tCtx, nodeInfo, tc.extendedResource, tc.podRequest) + assert.Equal(t, tc.expectedAllocatable, allocatable) + assert.Equal(t, tc.expectedRequested, requested) + }) + } +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go index 387632f390f..3c804d7b818 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go @@ -140,18 +140,18 @@ func NewAllocator(ctx context.Context, // file name!) into "stable", or individual chunks can be copied over. // // Unit tests are shared between all implementations. - var enabledAllocators []string - for _, allocator := range availableAllocators { + enabledAllocators := []string{} + for _, api := range availableAPIs { // Disabled? - if !allocatorEnabled(allocator.name) { + if !allocatorEnabled(api.name) { continue } - enabledAllocators = append(enabledAllocators, allocator.name) + enabledAllocators = append(enabledAllocators, api.name) // All required features supported? - if allocator.supportedFeatures.Set().IsSuperset(features.Set()) { + if api.supportedFeatures.Set().IsSuperset(features.Set()) { // Use it! - return allocator.newAllocator(ctx, features, allocatedState, classLister, slices, celCache) + return api.newAllocator(ctx, features, allocatedState, classLister, slices, celCache) } } return nil, fmt.Errorf("internal error: no allocator available for feature set %+v, enabled allocators: %s", features, strings.Join(enabledAllocators, ", ")) @@ -172,7 +172,7 @@ func allocatorEnabled(name string) bool { return len(explicitlyEnabledAllocators) == 0 || explicitlyEnabledAllocators.Has(name) } -var availableAllocators = []struct { +var availableAPIs = []struct { name string supportedFeatures Features newAllocator func(ctx context.Context, @@ -182,6 +182,11 @@ var availableAllocators = []struct { slices []*resourceapi.ResourceSlice, celCache *cel.Cache, ) (Allocator, error) + nodeMatches func(node *v1.Node, + nodeNameToMatch string, + allNodesMatch bool, + nodeSelector *v1.NodeSelector, + ) (bool, error) }{ // Most stable first. { @@ -196,6 +201,7 @@ var availableAllocators = []struct { ) (Allocator, error) { return stable.NewAllocator(ctx, features, allocatedState.AllocatedDevices, classLister, slices, celCache) }, + nodeMatches: stable.NodeMatches, }, { name: "incubating", @@ -209,6 +215,7 @@ var availableAllocators = []struct { ) (Allocator, error) { return incubating.NewAllocator(ctx, features, allocatedState.AllocatedDevices, classLister, slices, celCache) }, + nodeMatches: incubating.NodeMatches, }, { name: "experimental", @@ -222,5 +229,49 @@ var availableAllocators = []struct { ) (Allocator, error) { return experimental.NewAllocator(ctx, features, allocateState, classLister, slices, celCache) }, + nodeMatches: experimental.NodeMatches, }, } + +// NodeMatches determines whether a given Kubernetes node matches the specified criteria. +// It calls one of the available implementations(stable, incubating, experimental) based +// on the provided DRA features. +func NodeMatches(features Features, node *v1.Node, nodeNameToMatch string, allNodesMatch bool, nodeSelector *v1.NodeSelector) (bool, error) { + for _, api := range availableAPIs { + if api.supportedFeatures.Set().IsSuperset(features.Set()) { + return api.nodeMatches(node, nodeNameToMatch, allNodesMatch, nodeSelector) + } + } + + return false, fmt.Errorf("internal error: no NodeMatches API available for feature set %v", features) +} + +// IsDeviceAllocated checks if a device is allocated, considering both fully allocated devices +// and partially consumed devices when consumable capacity is enabled. +func IsDeviceAllocated(deviceID DeviceID, allocatedState *AllocatedState) bool { + // Check if device is fully allocated (traditional case) + if allocatedState.AllocatedDevices.Has(deviceID) { + return true + } + + // Check if device is partially consumed via shared allocations (consumable capacity case) + // We need to check if any shared device ID corresponds to our device + for sharedDeviceID := range allocatedState.AllocatedSharedDeviceIDs { + // Extract the base device ID from the shared device ID by recreating it + baseDeviceID := MakeDeviceID( + sharedDeviceID.Driver.String(), + sharedDeviceID.Pool.String(), + sharedDeviceID.Device.String(), + ) + if baseDeviceID == deviceID { + return true + } + } + + // Check if device has consumed capacity tracked (consumable capacity case) + if _, hasConsumedCapacity := allocatedState.AggregatedCapacity[deviceID]; hasConsumedCapacity { + return true + } + + return false +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/experimental/allocator_experimental.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/experimental/allocator_experimental.go index b618d7aabe0..8907cc00f5b 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/experimental/allocator_experimental.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/experimental/allocator_experimental.go @@ -1148,7 +1148,7 @@ func (alloc *allocator) isSelectable(r requestIndices, requestData requestData, } if ptr.Deref(slice.Spec.PerDeviceNodeSelection, false) { - matches, err := nodeMatches(alloc.node, ptr.Deref(device.NodeName, ""), ptr.Deref(device.AllNodes, false), device.NodeSelector) + matches, err := NodeMatches(alloc.node, ptr.Deref(device.NodeName, ""), ptr.Deref(device.AllNodes, false), device.NodeSelector) if err != nil { return false, err } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/experimental/pools_experimental.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/experimental/pools_experimental.go index a124ea7a027..2e7e7e09fd5 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/experimental/pools_experimental.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/experimental/pools_experimental.go @@ -28,7 +28,7 @@ import ( "k8s.io/utils/ptr" ) -func nodeMatches(node *v1.Node, nodeNameToMatch string, allNodesMatch bool, nodeSelector *v1.NodeSelector) (bool, error) { +func NodeMatches(node *v1.Node, nodeNameToMatch string, allNodesMatch bool, nodeSelector *v1.NodeSelector) (bool, error) { switch { case nodeNameToMatch != "": return node != nil && node.Name == nodeNameToMatch, nil @@ -61,7 +61,7 @@ func GatherPools(ctx context.Context, slices []*resourceapi.ResourceSlice, node } if nodeName, allNodes := ptr.Deref(slice.Spec.NodeName, ""), ptr.Deref(slice.Spec.AllNodes, false); nodeName != "" || allNodes || slice.Spec.NodeSelector != nil { - match, err := nodeMatches(node, nodeName, allNodes, slice.Spec.NodeSelector) + match, err := NodeMatches(node, nodeName, allNodes, slice.Spec.NodeSelector) if err != nil { return nil, fmt.Errorf("failed to perform node selection for slice %s: %w", slice.Name, err) } @@ -80,7 +80,7 @@ func GatherPools(ctx context.Context, slices []*resourceapi.ResourceSlice, node } } else if ptr.Deref(slice.Spec.PerDeviceNodeSelection, false) { for _, device := range slice.Spec.Devices { - match, err := nodeMatches(node, ptr.Deref(device.NodeName, ""), ptr.Deref(device.AllNodes, false), device.NodeSelector) + match, err := NodeMatches(node, ptr.Deref(device.NodeName, ""), ptr.Deref(device.AllNodes, false), device.NodeSelector) if err != nil { return nil, fmt.Errorf("failed to perform node selection for device %s in slice %s: %w", device.String(), slice.Name, err) diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/incubating/allocator_incubating.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/incubating/allocator_incubating.go index 6c4188cc2c0..33a712845e0 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/incubating/allocator_incubating.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/incubating/allocator_incubating.go @@ -1017,7 +1017,7 @@ func (alloc *allocator) isSelectable(r requestIndices, requestData requestData, } if ptr.Deref(slice.Spec.PerDeviceNodeSelection, false) { - matches, err := nodeMatches(alloc.node, ptr.Deref(device.NodeName, ""), ptr.Deref(device.AllNodes, false), device.NodeSelector) + matches, err := NodeMatches(alloc.node, ptr.Deref(device.NodeName, ""), ptr.Deref(device.AllNodes, false), device.NodeSelector) if err != nil { return false, err } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/incubating/pools_incubating.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/incubating/pools_incubating.go index a7ea15d75c6..59465de3402 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/incubating/pools_incubating.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/incubating/pools_incubating.go @@ -28,7 +28,7 @@ import ( "k8s.io/utils/ptr" ) -func nodeMatches(node *v1.Node, nodeNameToMatch string, allNodesMatch bool, nodeSelector *v1.NodeSelector) (bool, error) { +func NodeMatches(node *v1.Node, nodeNameToMatch string, allNodesMatch bool, nodeSelector *v1.NodeSelector) (bool, error) { switch { case nodeNameToMatch != "": return node != nil && node.Name == nodeNameToMatch, nil @@ -60,7 +60,7 @@ func GatherPools(ctx context.Context, slices []*resourceapi.ResourceSlice, node } if nodeName, allNodes := ptr.Deref(slice.Spec.NodeName, ""), ptr.Deref(slice.Spec.AllNodes, false); nodeName != "" || allNodes || slice.Spec.NodeSelector != nil { - match, err := nodeMatches(node, nodeName, allNodes, slice.Spec.NodeSelector) + match, err := NodeMatches(node, nodeName, allNodes, slice.Spec.NodeSelector) if err != nil { return nil, fmt.Errorf("failed to perform node selection for slice %s: %w", slice.Name, err) } @@ -71,7 +71,7 @@ func GatherPools(ctx context.Context, slices []*resourceapi.ResourceSlice, node } } else if ptr.Deref(slice.Spec.PerDeviceNodeSelection, false) { for _, device := range slice.Spec.Devices { - match, err := nodeMatches(node, ptr.Deref(device.NodeName, ""), ptr.Deref(device.AllNodes, false), device.NodeSelector) + match, err := NodeMatches(node, ptr.Deref(device.NodeName, ""), ptr.Deref(device.AllNodes, false), device.NodeSelector) if err != nil { return nil, fmt.Errorf("failed to perform node selection for device %s in slice %s: %w", device.String(), slice.Name, err) diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/stable/allocator_stable.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/stable/allocator_stable.go index 40e2237e9b4..83df31f93a4 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/stable/allocator_stable.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/stable/allocator_stable.go @@ -926,7 +926,7 @@ func (alloc *allocator) isSelectable(r requestIndices, requestData requestData, } if ptr.Deref(slice.Spec.PerDeviceNodeSelection, false) { - matches, err := nodeMatches(alloc.node, ptr.Deref(device.NodeName, ""), ptr.Deref(device.AllNodes, false), device.NodeSelector) + matches, err := NodeMatches(alloc.node, ptr.Deref(device.NodeName, ""), ptr.Deref(device.AllNodes, false), device.NodeSelector) if err != nil { return false, err } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/stable/pools_stable.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/stable/pools_stable.go index 7bf3c880471..951c7234219 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/stable/pools_stable.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/stable/pools_stable.go @@ -28,7 +28,7 @@ import ( "k8s.io/utils/ptr" ) -func nodeMatches(node *v1.Node, nodeNameToMatch string, allNodesMatch bool, nodeSelector *v1.NodeSelector) (bool, error) { +func NodeMatches(node *v1.Node, nodeNameToMatch string, allNodesMatch bool, nodeSelector *v1.NodeSelector) (bool, error) { switch { case nodeNameToMatch != "": return node != nil && node.Name == nodeNameToMatch, nil @@ -60,7 +60,7 @@ func GatherPools(ctx context.Context, slices []*resourceapi.ResourceSlice, node } if nodeName, allNodes := ptr.Deref(slice.Spec.NodeName, ""), ptr.Deref(slice.Spec.AllNodes, false); nodeName != "" || allNodes || slice.Spec.NodeSelector != nil { - match, err := nodeMatches(node, nodeName, allNodes, slice.Spec.NodeSelector) + match, err := NodeMatches(node, nodeName, allNodes, slice.Spec.NodeSelector) if err != nil { return nil, fmt.Errorf("failed to perform node selection for slice %s: %w", slice.Name, err) } @@ -71,7 +71,7 @@ func GatherPools(ctx context.Context, slices []*resourceapi.ResourceSlice, node } } else if ptr.Deref(slice.Spec.PerDeviceNodeSelection, false) { for _, device := range slice.Spec.Devices { - match, err := nodeMatches(node, ptr.Deref(device.NodeName, ""), ptr.Deref(device.AllNodes, false), device.NodeSelector) + match, err := NodeMatches(node, ptr.Deref(device.NodeName, ""), ptr.Deref(device.AllNodes, false), device.NodeSelector) if err != nil { return nil, fmt.Errorf("failed to perform node selection for device %s in slice %s: %w", device.String(), slice.Name, err)