diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go index ca252a9bbb4..e4f2d72b5e3 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go @@ -260,32 +260,7 @@ func NodeMatches(features Features, node *v1.Node, nodeNameToMatch string, allNo return false, fmt.Errorf("internal error: no NodeMatches implementation available for feature set %v", features) } -// IsDeviceAllocated checks if a device is allocated, considering both fully allocated devices -// and partially consumed devices when consumable capacity is enabled. +// IsDeviceAllocated checks if a device has committed allocation state. func IsDeviceAllocated(deviceID DeviceID, allocatedState *AllocatedState) bool { - // Check if device is fully allocated (traditional case) - if allocatedState.AllocatedDevices.Has(deviceID) { - return true - } - - // Check if device is partially consumed via shared allocations (consumable capacity case). - // We need to check if any shared device ID corresponds to our device. - for sharedDeviceID := range allocatedState.AllocatedSharedDeviceIDs { - // Extract the base device ID from the shared device ID by recreating it - baseDeviceID := MakeDeviceID( - sharedDeviceID.Driver.String(), - sharedDeviceID.Pool.String(), - sharedDeviceID.Device.String(), - ) - if baseDeviceID == deviceID { - return true - } - } - - // Check if device has consumed capacity tracked (consumable capacity case) - if _, hasConsumedCapacity := allocatedState.AggregatedCapacity[deviceID]; hasConsumedCapacity { - return true - } - - return false + return internal.IsDeviceAllocated(deviceID, allocatedState) } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/allocatedstate.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/allocatedstate.go index ad7ec445858..d02b916ec7e 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/allocatedstate.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/allocatedstate.go @@ -58,6 +58,32 @@ func NewDeviceConsumedCapacity(deviceID DeviceID, consumedCapacity map[resourcea return schedulerapi.NewDeviceConsumedCapacity(deviceID, consumedCapacity) } +// IsDeviceAllocated checks if a device is allocated, considering both fully allocated devices +// and partially consumed devices when consumable capacity is enabled. +func IsDeviceAllocated(deviceID DeviceID, allocatedState *AllocatedState) bool { + // Check if device is fully allocated (traditional case). + if allocatedState.AllocatedDevices.Has(deviceID) { + return true + } + + // Check if device is partially consumed via shared allocations (consumable capacity case). + // We need to check if any shared device ID corresponds to our device. + for sharedDeviceID := range allocatedState.AllocatedSharedDeviceIDs { + if sharedDeviceID.GetDeviceID() == deviceID { + return true + } + } + + // For scheduler-generated state, consumed capacity is recorded together with + // a shared device ID. Keep this check to preserve IsDeviceAllocated semantics + // and handle manually constructed or future AllocatedState producers. + if _, hasConsumedCapacity := allocatedState.AggregatedCapacity[deviceID]; hasConsumedCapacity { + return true + } + + return false +} + // GenerateShareID is a helper function that generates a new share ID. // This remains in the internal package as it's a utility function. func GenerateShareID() *types.UID { diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/allocatortesting/allocator_testing.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/allocatortesting/allocator_testing.go index b10509031e3..c6a68c2043d 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/allocatortesting/allocator_testing.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/allocatortesting/allocator_testing.go @@ -5425,6 +5425,57 @@ func TestAllocator(t *testing.T, deviceRequestAllocationResult(req1SubReq1, driverA, pool1, device3).withConsumedCapacity(&fixedShareID, map[resourceapi.QualifiedName]resource.Quantity{"memory": resource.MustParse("4Gi")}), )}, }, + "allow-multiple-allocations-with-shared-device-counter-cross-cycle": { + features: Features{ + PartitionableDevices: true, + ConsumableCapacity: true, + }, + claimsToAllocate: objects( + claim(claim0).withRequests( + deviceRequest(req0, classA, 1). + withCapacityRequest(ptr.To(one)). + withSelectors(resourceapi.DeviceSelector{ + CEL: &resourceapi.CELDeviceSelector{ + Expression: fmt.Sprintf(`device.attributes["%s"].mode == "b"`, driverA), + }, + }), + ), + ), + allocatedSharedDeviceIDs: sets.New( + internal.MakeSharedDeviceID(MakeDeviceID(driverA, pool1, device1), &fixedShareID), + ), + allocatedCapacityDevices: ConsumedCapacityCollection{ + MakeDeviceID(driverA, pool1, device1): ConsumedCapacity{ + capacity0: ptr.To(one), + }, + }, + classes: objects(classWithAllowMultipleAllocations(classA, driverA, true)), + slices: unwrapResourceSlices( + sliceWithDevices(slice1, node1, resourcePool(pool1, 2), driverA, + device(device1, fromCounters, map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{ + "mode": {StringValue: ptr.To("a")}, + }).withDeviceCounterConsumption( + deviceCounterConsumption(counterSet1, map[string]resource.Quantity{ + capacity0: one, + }), + ).withAllowMultipleAllocations(), + device(device2, fromCounters, map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{ + "mode": {StringValue: ptr.To("b")}, + }).withDeviceCounterConsumption( + deviceCounterConsumption(counterSet1, map[string]resource.Quantity{ + capacity0: one, + }), + ).withAllowMultipleAllocations(), + ), + sliceWithCounterSets(slice2, node1, resourcePool(pool1, 2), driverA, + counterSet(counterSet1, map[string]resource.Quantity{ + capacity0: one, + }), + ), + ), + node: node(node1, region1), + expectResults: []any{}, + }, "consumable-capacity-with-partitionable-device-multiple-capacity-pools": { // This test case combines integration of PrioritizedList, PartitionableDevices, and ConsumableCapacity features. features: Features{ diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/experimental/allocator_experimental.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/experimental/allocator_experimental.go index 4540e795095..5f388736a65 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/experimental/allocator_experimental.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/experimental/allocator_experimental.go @@ -1650,7 +1650,7 @@ func (alloc *allocator) checkAvailableCounters(device deviceWithID) (bool, error } // Devices that aren't allocated doesn't consume any counters, so we don't // need to consider them. - if !alloc.allocatedState.AllocatedDevices.Has(deviceID) { + if !internal.IsDeviceAllocated(deviceID, &alloc.allocatedState) { continue } for _, deviceCounterConsumption := range device.ConsumesCounters { diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/incubating/allocator_incubating.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/incubating/allocator_incubating.go index 0a9013320b0..c23708ac291 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/incubating/allocator_incubating.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/internal/incubating/allocator_incubating.go @@ -1522,7 +1522,7 @@ func (alloc *allocator) checkAvailableCounters(device deviceWithID) (bool, error } // Devices that aren't allocated doesn't consume any counters, so we don't // need to consider them. - if !alloc.allocatedState.AllocatedDevices.Has(deviceID) { + if !internal.IsDeviceAllocated(deviceID, &alloc.allocatedState) { continue } for _, deviceCounterConsumption := range device.ConsumesCounters {