DRA: account for shared devices when rebuilding counters

When DRAConsumableCapacity is enabled, multi-allocatable device
 allocations are tracked via AllocatedSharedDeviceIDs and
 AggregatedCapacity instead of AllocatedDevices. The incubating and
 experimental allocators only checked AllocatedDevices when rebuilding
 available SharedCounters, so committed shared allocations could be
 missed across scheduling cycles.

 Add an internal IsDeviceAllocated helper that checks all committed
 allocation-state sources, and use it when reconstructing consumed
 counters. Add a regression test for a fresh allocator seeing committed
 shared-device counter consumption.
This commit is contained in:
Ashvin Deodhar 2026-05-13 06:39:29 -07:00
parent bd854a9dd5
commit d2c2de72e1
5 changed files with 81 additions and 29 deletions

View file

@ -260,32 +260,7 @@ func NodeMatches(features Features, node *v1.Node, nodeNameToMatch string, allNo
return false, fmt.Errorf("internal error: no NodeMatches implementation available for feature set %v", features)
}
// IsDeviceAllocated checks if a device is allocated, considering both fully allocated devices
// and partially consumed devices when consumable capacity is enabled.
// IsDeviceAllocated checks if a device has committed allocation state.
func IsDeviceAllocated(deviceID DeviceID, allocatedState *AllocatedState) bool {
// Check if device is fully allocated (traditional case)
if allocatedState.AllocatedDevices.Has(deviceID) {
return true
}
// Check if device is partially consumed via shared allocations (consumable capacity case).
// We need to check if any shared device ID corresponds to our device.
for sharedDeviceID := range allocatedState.AllocatedSharedDeviceIDs {
// Extract the base device ID from the shared device ID by recreating it
baseDeviceID := MakeDeviceID(
sharedDeviceID.Driver.String(),
sharedDeviceID.Pool.String(),
sharedDeviceID.Device.String(),
)
if baseDeviceID == deviceID {
return true
}
}
// Check if device has consumed capacity tracked (consumable capacity case)
if _, hasConsumedCapacity := allocatedState.AggregatedCapacity[deviceID]; hasConsumedCapacity {
return true
}
return false
return internal.IsDeviceAllocated(deviceID, allocatedState)
}

View file

@ -58,6 +58,32 @@ func NewDeviceConsumedCapacity(deviceID DeviceID, consumedCapacity map[resourcea
return schedulerapi.NewDeviceConsumedCapacity(deviceID, consumedCapacity)
}
// IsDeviceAllocated checks if a device is allocated, considering both fully allocated devices
// and partially consumed devices when consumable capacity is enabled.
func IsDeviceAllocated(deviceID DeviceID, allocatedState *AllocatedState) bool {
// Check if device is fully allocated (traditional case).
if allocatedState.AllocatedDevices.Has(deviceID) {
return true
}
// Check if device is partially consumed via shared allocations (consumable capacity case).
// We need to check if any shared device ID corresponds to our device.
for sharedDeviceID := range allocatedState.AllocatedSharedDeviceIDs {
if sharedDeviceID.GetDeviceID() == deviceID {
return true
}
}
// For scheduler-generated state, consumed capacity is recorded together with
// a shared device ID. Keep this check to preserve IsDeviceAllocated semantics
// and handle manually constructed or future AllocatedState producers.
if _, hasConsumedCapacity := allocatedState.AggregatedCapacity[deviceID]; hasConsumedCapacity {
return true
}
return false
}
// GenerateShareID is a helper function that generates a new share ID.
// This remains in the internal package as it's a utility function.
func GenerateShareID() *types.UID {

View file

@ -5425,6 +5425,57 @@ func TestAllocator(t *testing.T,
deviceRequestAllocationResult(req1SubReq1, driverA, pool1, device3).withConsumedCapacity(&fixedShareID, map[resourceapi.QualifiedName]resource.Quantity{"memory": resource.MustParse("4Gi")}),
)},
},
"allow-multiple-allocations-with-shared-device-counter-cross-cycle": {
features: Features{
PartitionableDevices: true,
ConsumableCapacity: true,
},
claimsToAllocate: objects(
claim(claim0).withRequests(
deviceRequest(req0, classA, 1).
withCapacityRequest(ptr.To(one)).
withSelectors(resourceapi.DeviceSelector{
CEL: &resourceapi.CELDeviceSelector{
Expression: fmt.Sprintf(`device.attributes["%s"].mode == "b"`, driverA),
},
}),
),
),
allocatedSharedDeviceIDs: sets.New(
internal.MakeSharedDeviceID(MakeDeviceID(driverA, pool1, device1), &fixedShareID),
),
allocatedCapacityDevices: ConsumedCapacityCollection{
MakeDeviceID(driverA, pool1, device1): ConsumedCapacity{
capacity0: ptr.To(one),
},
},
classes: objects(classWithAllowMultipleAllocations(classA, driverA, true)),
slices: unwrapResourceSlices(
sliceWithDevices(slice1, node1, resourcePool(pool1, 2), driverA,
device(device1, fromCounters, map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{
"mode": {StringValue: ptr.To("a")},
}).withDeviceCounterConsumption(
deviceCounterConsumption(counterSet1, map[string]resource.Quantity{
capacity0: one,
}),
).withAllowMultipleAllocations(),
device(device2, fromCounters, map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{
"mode": {StringValue: ptr.To("b")},
}).withDeviceCounterConsumption(
deviceCounterConsumption(counterSet1, map[string]resource.Quantity{
capacity0: one,
}),
).withAllowMultipleAllocations(),
),
sliceWithCounterSets(slice2, node1, resourcePool(pool1, 2), driverA,
counterSet(counterSet1, map[string]resource.Quantity{
capacity0: one,
}),
),
),
node: node(node1, region1),
expectResults: []any{},
},
"consumable-capacity-with-partitionable-device-multiple-capacity-pools": {
// This test case combines integration of PrioritizedList, PartitionableDevices, and ConsumableCapacity features.
features: Features{

View file

@ -1650,7 +1650,7 @@ func (alloc *allocator) checkAvailableCounters(device deviceWithID) (bool, error
}
// Devices that aren't allocated doesn't consume any counters, so we don't
// need to consider them.
if !alloc.allocatedState.AllocatedDevices.Has(deviceID) {
if !internal.IsDeviceAllocated(deviceID, &alloc.allocatedState) {
continue
}
for _, deviceCounterConsumption := range device.ConsumesCounters {

View file

@ -1522,7 +1522,7 @@ func (alloc *allocator) checkAvailableCounters(device deviceWithID) (bool, error
}
// Devices that aren't allocated doesn't consume any counters, so we don't
// need to consider them.
if !alloc.allocatedState.AllocatedDevices.Has(deviceID) {
if !internal.IsDeviceAllocated(deviceID, &alloc.allocatedState) {
continue
}
for _, deviceCounterConsumption := range device.ConsumesCounters {