From d2c2de72e13627ca1abd205cd3ea38eb15efa3b7 Mon Sep 17 00:00:00 2001 From: Ashvin Deodhar Date: Wed, 13 May 2026 06:39:29 -0700 Subject: [PATCH] DRA: account for shared devices when rebuilding counters When DRAConsumableCapacity is enabled, multi-allocatable device allocations are tracked via AllocatedSharedDeviceIDs and AggregatedCapacity instead of AllocatedDevices. The incubating and experimental allocators only checked AllocatedDevices when rebuilding available SharedCounters, so committed shared allocations could be missed across scheduling cycles. Add an internal IsDeviceAllocated helper that checks all committed allocation-state sources, and use it when reconstructing consumed counters. Add a regression test for a fresh allocator seeing committed shared-device counter consumption. --- .../structured/allocator.go | 29 +---------- .../structured/internal/allocatedstate.go | 26 ++++++++++ .../allocatortesting/allocator_testing.go | 51 +++++++++++++++++++ .../experimental/allocator_experimental.go | 2 +- .../incubating/allocator_incubating.go | 2 +- 5 files changed, 81 insertions(+), 29 deletions(-) diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go index ca252a9bbb4..e4f2d72b5e3 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go @@ -260,32 +260,7 @@ func NodeMatches(features Features, node *v1.Node, nodeNameToMatch string, allNo return false, fmt.Errorf("internal error: no NodeMatches implementation available for feature set %v", features) } -// IsDeviceAllocated checks if a device is allocated, considering both fully allocated devices -// and partially consumed devices when consumable capacity is enabled. +// IsDeviceAllocated checks if a device has committed allocation state. func IsDeviceAllocated(deviceID DeviceID, allocatedState *AllocatedState) bool { - // Check if device is fully allocated (traditional case) - if allocatedState.AllocatedDevices.Has(deviceID) { - return true - } - - // Check if device is partially consumed via shared allocations (consumable capacity case). - // We need to check if any shared device ID corresponds to our device. - for sharedDeviceID := range allocatedState.AllocatedSharedDeviceIDs { - // Extract the base device ID from the shared device ID by recreating it - baseDeviceID := MakeDeviceID( - sharedDeviceID.Driver.String(), - sharedDeviceID.Pool.String(), - sharedDeviceID.Device.String(), - ) - if baseDeviceID == deviceID { - return true - } - } - - // Check if device has consumed capacity tracked (consumable capacity case) - if _, hasConsumedCapacity := allocatedState.AggregatedCapacity[deviceID]; hasConsumedCapacity { - return true - } - - return false + return internal.IsDeviceAllocated(deviceID, allocatedState) } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/allocatedstate.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/allocatedstate.go index ad7ec445858..d02b916ec7e 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/allocatedstate.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/allocatedstate.go @@ -58,6 +58,32 @@ func NewDeviceConsumedCapacity(deviceID DeviceID, consumedCapacity map[resourcea return schedulerapi.NewDeviceConsumedCapacity(deviceID, consumedCapacity) } +// IsDeviceAllocated checks if a device is allocated, considering both fully allocated devices +// and partially consumed devices when consumable capacity is enabled. +func IsDeviceAllocated(deviceID DeviceID, allocatedState *AllocatedState) bool { + // Check if device is fully allocated (traditional case). + if allocatedState.AllocatedDevices.Has(deviceID) { + return true + } + + // Check if device is partially consumed via shared allocations (consumable capacity case). + // We need to check if any shared device ID corresponds to our device. + for sharedDeviceID := range allocatedState.AllocatedSharedDeviceIDs { + if sharedDeviceID.GetDeviceID() == deviceID { + return true + } + } + + // For scheduler-generated state, consumed capacity is recorded together with + // a shared device ID. Keep this check to preserve IsDeviceAllocated semantics + // and handle manually constructed or future AllocatedState producers. + if _, hasConsumedCapacity := allocatedState.AggregatedCapacity[deviceID]; hasConsumedCapacity { + return true + } + + return false +} + // GenerateShareID is a helper function that generates a new share ID. // This remains in the internal package as it's a utility function. func GenerateShareID() *types.UID { diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/allocatortesting/allocator_testing.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/allocatortesting/allocator_testing.go index b10509031e3..c6a68c2043d 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/allocatortesting/allocator_testing.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/allocatortesting/allocator_testing.go @@ -5425,6 +5425,57 @@ func TestAllocator(t *testing.T, deviceRequestAllocationResult(req1SubReq1, driverA, pool1, device3).withConsumedCapacity(&fixedShareID, map[resourceapi.QualifiedName]resource.Quantity{"memory": resource.MustParse("4Gi")}), )}, }, + "allow-multiple-allocations-with-shared-device-counter-cross-cycle": { + features: Features{ + PartitionableDevices: true, + ConsumableCapacity: true, + }, + claimsToAllocate: objects( + claim(claim0).withRequests( + deviceRequest(req0, classA, 1). + withCapacityRequest(ptr.To(one)). + withSelectors(resourceapi.DeviceSelector{ + CEL: &resourceapi.CELDeviceSelector{ + Expression: fmt.Sprintf(`device.attributes["%s"].mode == "b"`, driverA), + }, + }), + ), + ), + allocatedSharedDeviceIDs: sets.New( + internal.MakeSharedDeviceID(MakeDeviceID(driverA, pool1, device1), &fixedShareID), + ), + allocatedCapacityDevices: ConsumedCapacityCollection{ + MakeDeviceID(driverA, pool1, device1): ConsumedCapacity{ + capacity0: ptr.To(one), + }, + }, + classes: objects(classWithAllowMultipleAllocations(classA, driverA, true)), + slices: unwrapResourceSlices( + sliceWithDevices(slice1, node1, resourcePool(pool1, 2), driverA, + device(device1, fromCounters, map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{ + "mode": {StringValue: ptr.To("a")}, + }).withDeviceCounterConsumption( + deviceCounterConsumption(counterSet1, map[string]resource.Quantity{ + capacity0: one, + }), + ).withAllowMultipleAllocations(), + device(device2, fromCounters, map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{ + "mode": {StringValue: ptr.To("b")}, + }).withDeviceCounterConsumption( + deviceCounterConsumption(counterSet1, map[string]resource.Quantity{ + capacity0: one, + }), + ).withAllowMultipleAllocations(), + ), + sliceWithCounterSets(slice2, node1, resourcePool(pool1, 2), driverA, + counterSet(counterSet1, map[string]resource.Quantity{ + capacity0: one, + }), + ), + ), + node: node(node1, region1), + expectResults: []any{}, + }, "consumable-capacity-with-partitionable-device-multiple-capacity-pools": { // This test case combines integration of PrioritizedList, PartitionableDevices, and ConsumableCapacity features. features: Features{ diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/experimental/allocator_experimental.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/experimental/allocator_experimental.go index 4540e795095..5f388736a65 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/experimental/allocator_experimental.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/experimental/allocator_experimental.go @@ -1650,7 +1650,7 @@ func (alloc *allocator) checkAvailableCounters(device deviceWithID) (bool, error } // Devices that aren't allocated doesn't consume any counters, so we don't // need to consider them. - if !alloc.allocatedState.AllocatedDevices.Has(deviceID) { + if !internal.IsDeviceAllocated(deviceID, &alloc.allocatedState) { continue } for _, deviceCounterConsumption := range device.ConsumesCounters { diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/incubating/allocator_incubating.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/incubating/allocator_incubating.go index 0a9013320b0..c23708ac291 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/incubating/allocator_incubating.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/incubating/allocator_incubating.go @@ -1522,7 +1522,7 @@ func (alloc *allocator) checkAvailableCounters(device deviceWithID) (bool, error } // Devices that aren't allocated doesn't consume any counters, so we don't // need to consider them. - if !alloc.allocatedState.AllocatedDevices.Has(deviceID) { + if !internal.IsDeviceAllocated(deviceID, &alloc.allocatedState) { continue } for _, deviceCounterConsumption := range device.ConsumesCounters {