From fb228c4704843dd2fa584a7ccfa630dfb7f3fde2 Mon Sep 17 00:00:00 2001 From: Sunyanan Choochotkaew Date: Wed, 17 Sep 2025 15:37:52 +0900 Subject: [PATCH] Fix DRAConsumableCapacity to be able to allocate the same device that previously consumed the counterSet Signed-off-by: Sunyanan Choochotkaew --- .../allocatortesting/allocator_testing.go | 150 ++++++++++++++++-- .../experimental/allocator_experimental.go | 15 +- 2 files changed, 151 insertions(+), 14 deletions(-) diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/allocatortesting/allocator_testing.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/allocatortesting/allocator_testing.go index 3b6d4d38cb0..6acaf27710b 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/allocatortesting/allocator_testing.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/allocatortesting/allocator_testing.go @@ -4876,17 +4876,45 @@ func TestAllocator(t *testing.T, }, "allow-multiple-allocations-with-partitionable-device": { features: Features{ + PrioritizedList: true, PartitionableDevices: true, ConsumableCapacity: true, }, claimsToAllocate: objects( - claimWithRequests(claim0, nil, request(req0, classA, 1)), - claimWithRequests(claim1, nil, request(req0, classA, 1)), + claim(claim0).withRequests( + deviceRequest(req0, classA, 1), + requestWithPrioritizedList(req1, + subRequest(subReq0, classA, 1, resourceapi.DeviceSelector{ + CEL: &resourceapi.CELDeviceSelector{ + Expression: fmt.Sprintf(`device.capacity["%s"].memory.compareTo(quantity("6Gi")) >= 0`, driverA), + }}, + ), + subRequest(subReq1, classA, 1, resourceapi.DeviceSelector{ + CEL: &resourceapi.CELDeviceSelector{ + Expression: fmt.Sprintf(`device.capacity["%s"].memory.compareTo(quantity("4Gi")) >= 0`, driverA), + }}, + ), + ), + ), ), - classes: objects(class(classA, driverA)), + classes: objects(classWithAllowMultipleAllocations(classA, driverA, true)), slices: unwrap( slice(slice1, node1, pool1, driverA, - device(device1, nil, nil).withDeviceCounterConsumption( + device(device1, fromCounters, nil).withDeviceCounterConsumption( + deviceCounterConsumption(counterSet1, + map[string]resource.Quantity{ + "memory": resource.MustParse("4Gi"), + }, + ), + ).withAllowMultipleAllocations(), + device(device2, fromCounters, nil).withDeviceCounterConsumption( + deviceCounterConsumption(counterSet1, + map[string]resource.Quantity{ + "memory": resource.MustParse("6Gi"), + }, + ), + ).withAllowMultipleAllocations(), + device(device3, fromCounters, nil).withDeviceCounterConsumption( deviceCounterConsumption(counterSet1, map[string]resource.Quantity{ "memory": resource.MustParse("4Gi"), @@ -4902,16 +4930,112 @@ func TestAllocator(t *testing.T, ), ), node: node(node1, region1), - expectResults: []any{ - allocationResult( - localNodeSelector(node1), - deviceRequestAllocationResult(req0, driverA, pool1, device1).withConsumedCapacity(&fixedShareID, nil), - ), - allocationResult( - localNodeSelector(node1), - deviceRequestAllocationResult(req0, driverA, pool1, device1).withConsumedCapacity(&fixedShareID, nil), - ), + expectResults: []any{allocationResult( + localNodeSelector(node1), + deviceRequestAllocationResult(req0, driverA, pool1, device1).withConsumedCapacity(&fixedShareID, map[resourceapi.QualifiedName]resource.Quantity{"memory": resource.MustParse("4Gi")}), + deviceRequestAllocationResult(req1SubReq1, driverA, pool1, device3).withConsumedCapacity(&fixedShareID, map[resourceapi.QualifiedName]resource.Quantity{"memory": resource.MustParse("4Gi")}), + )}, + }, + "consumable-capacity-with-partitionable-device-multiple-capacity-pools": { + // This test case combines integration of PrioritizedList, PartitionableDevices, and ConsumableCapacity features. + features: Features{ + PrioritizedList: true, + PartitionableDevices: true, + ConsumableCapacity: true, }, + // There are two basic requests (req0 and req2), requesting 1 and 2 units of capacity0, respectively. + // In addition, there is one request (req1) with a prioritized list. + // The prioritized request prefers devices with capacity0 >= 4 over those with capacity0 >= 2. + claimsToAllocate: objects( + claim(claim0).withRequests( + deviceRequest(req0, classA, 1).withCapacityRequest(ptr.To(one)), + deviceRequest(req2, classA, 1).withCapacityRequest(ptr.To(two)), + requestWithPrioritizedList(req1, + subRequest(subReq0, classA, 1, resourceapi.DeviceSelector{ + CEL: &resourceapi.CELDeviceSelector{ + Expression: fmt.Sprintf(`device.capacity["%s"]["%s"].compareTo(quantity("4")) >= 0`, driverA, capacity0), + }}).withCapacityRequest(ptr.To(one)), + subRequest(subReq1, classA, 1, resourceapi.DeviceSelector{ + CEL: &resourceapi.CELDeviceSelector{ + Expression: fmt.Sprintf(`device.capacity["%s"]["%s"].compareTo(quantity("2")) >= 0`, driverA, capacity0), + }}).withCapacityRequest(ptr.To(one)), + ), + ), + ), + classes: objects(class(classA, driverA)), + // There three device option consuming counters from the PartitionableDevices's CounterSet. + // All devices can be allocated multiple times with `allowMultipleAllocations=true`. + // CounterSet (4, 8) + // device1, device2, and device3 consume (2,4), (4,4), and (2,4) for (capacity0, capacity1) respectively. + // i.e., only two options are valid those are [device1, device3] or [device2]. + // Capacity.RequestPolicy of ConsumableCapacity forces the capacity1 consuming with range policy (min,step,max)=(2,2,4). + slices: unwrap( + slice(slice1, node1, pool1, driverA, + device(device1, fromCounters, nil).withDeviceCounterConsumption( + deviceCounterConsumption(counterSet1, + map[string]resource.Quantity{ + capacity0: two, + }, + ), + deviceCounterConsumption(counterSet2, + map[string]resource.Quantity{ + capacity1: four, + }, + ), + ).withAllowMultipleAllocations().withCapacityRequestPolicyRange((map[resourceapi.QualifiedName]resource.Quantity{capacity1: four})), + device(device2, fromCounters, nil).withDeviceCounterConsumption( + deviceCounterConsumption(counterSet1, + map[string]resource.Quantity{ + capacity0: four, + }, + ), + deviceCounterConsumption(counterSet2, + map[string]resource.Quantity{ + capacity1: four, + }, + ), + ).withAllowMultipleAllocations().withCapacityRequestPolicyRange((map[resourceapi.QualifiedName]resource.Quantity{capacity1: four})), + device(device3, fromCounters, nil).withDeviceCounterConsumption( + deviceCounterConsumption(counterSet1, + map[string]resource.Quantity{ + capacity0: two, + }, + ), + deviceCounterConsumption(counterSet2, + map[string]resource.Quantity{ + capacity1: four, + }, + ), + ).withAllowMultipleAllocations().withCapacityRequestPolicyRange((map[resourceapi.QualifiedName]resource.Quantity{capacity1: four})), + ).withCounterSet( + counterSet(counterSet1, + map[string]resource.Quantity{ + capacity0: four, + }, + ), + counterSet(counterSet2, + map[string]resource.Quantity{ + capacity1: resource.MustParse("8"), + }, + ), + ), + ), + node: node(node1, region1), + // Expected results: + // - [req0] The scheduler should be able to allocate device1 to req0. + // 1 unit of capacity0 is consumed according to the requested capacity, and 2 units of capacity1 are consumed according to the RequestPolicy. + // - [req2] device1 does not have enough capacity0 remaining for req2, which requests 2 units (only 1 unit remains). + // The scheduler must also skip device2 due to the CounterSet constraint. + // Therefore, device3 is allocated to req2, providing 2 units of capacity0 and 2 units of capacity1. + // - [req1] The first prioritized subrequest of req1 cannot find a device because device2 cannot be selected. + // The second prioritized subrequest is then chosen and consumes the remaining 1 unit of capacity0 on device1. + // Consequently, device1 is allocated to subreq1 of req1 with 1 unit of capacity0 and 2 units of capacity1. + expectResults: []any{allocationResult( + localNodeSelector(node1), + deviceRequestAllocationResult(req0, driverA, pool1, device1).withConsumedCapacity(&fixedShareID, map[resourceapi.QualifiedName]resource.Quantity{capacity0: one, capacity1: two}), + deviceRequestAllocationResult(req2, driverA, pool1, device3).withConsumedCapacity(&fixedShareID, map[resourceapi.QualifiedName]resource.Quantity{capacity0: two, capacity1: two}), + deviceRequestAllocationResult(req1SubReq1, driverA, pool1, device1).withConsumedCapacity(&fixedShareID, map[resourceapi.QualifiedName]resource.Quantity{capacity0: one, capacity1: two}), + )}, }, "distinct-constraint-one-multi-allocatable-device-with-distinct-constraint": { features: Features{ diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/experimental/allocator_experimental.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/experimental/allocator_experimental.go index 1b3f3999d9f..0eef3b310df 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/experimental/allocator_experimental.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/experimental/allocator_experimental.go @@ -1235,8 +1235,11 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, mus return false, nil, nil } + // Skip counter availability check for devices that allow multiple allocation and some capacity has already in-use. + skipCounterCheck := allowMultipleAllocations && alloc.deviceCapacityInUse(device.id) + // The API validation logic has checked the ConsumesCounters referred should exist inside SharedCounters. - if len(device.ConsumesCounters) > 0 { + if !skipCounterCheck && len(device.ConsumesCounters) > 0 { // If a device consumes counters from a counter set, verify that // there is sufficient counters available. ok, err := alloc.checkAvailableCounters(device) @@ -1502,6 +1505,11 @@ func (alloc *allocator) deviceInUse(deviceID DeviceID) bool { return alloc.allocatedState.AllocatedDevices.Has(deviceID) || alloc.allocatingDeviceForAnyClaim(deviceID) } +func (alloc *allocator) deviceCapacityInUse(deviceID DeviceID) bool { + _, found := alloc.allocatedState.AggregatedCapacity[deviceID] + return found || alloc.allocatingCapacityForAnyClaim(deviceID) +} + func (alloc *allocator) allocatingDeviceForAnyClaim(deviceID DeviceID) bool { return alloc.allocatingDevices[deviceID].Len() > 0 } @@ -1510,6 +1518,11 @@ func (alloc *allocator) allocatingDeviceForClaim(deviceID DeviceID, claimIndex i return alloc.allocatingDevices[deviceID].Has(claimIndex) } +func (alloc *allocator) allocatingCapacityForAnyClaim(deviceID DeviceID) bool { + _, found := alloc.allocatingCapacity[deviceID] + return found +} + // deallocateCountersForDevice subtracts the consumed counters of the provided // device from the consumedCounters data structure. func (alloc *allocator) deallocateCountersForDevice(device deviceWithID) {