Merge pull request #139040 from ashvindeodhar/bug-dra-consumescounters-multiallocatable

DRA: account for shared devices when rebuilding counters
This commit is contained in:
Kubernetes Prow Robot 2026-05-20 18:31:46 +05:30 committed by GitHub
commit fabbbc8b15
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 81 additions and 29 deletions

View file

@ -260,32 +260,7 @@ func NodeMatches(features Features, node *v1.Node, nodeNameToMatch string, allNo
return false, fmt.Errorf("internal error: no NodeMatches implementation available for feature set %v", features)
}
// IsDeviceAllocated checks if a device is allocated, considering both fully allocated devices
// and partially consumed devices when consumable capacity is enabled.
// IsDeviceAllocated checks if a device has committed allocation state.
func IsDeviceAllocated(deviceID DeviceID, allocatedState *AllocatedState) bool {
// Check if device is fully allocated (traditional case)
if allocatedState.AllocatedDevices.Has(deviceID) {
return true
}
// Check if device is partially consumed via shared allocations (consumable capacity case).
// We need to check if any shared device ID corresponds to our device.
for sharedDeviceID := range allocatedState.AllocatedSharedDeviceIDs {
// Extract the base device ID from the shared device ID by recreating it
baseDeviceID := MakeDeviceID(
sharedDeviceID.Driver.String(),
sharedDeviceID.Pool.String(),
sharedDeviceID.Device.String(),
)
if baseDeviceID == deviceID {
return true
}
}
// Check if device has consumed capacity tracked (consumable capacity case)
if _, hasConsumedCapacity := allocatedState.AggregatedCapacity[deviceID]; hasConsumedCapacity {
return true
}
return false
return internal.IsDeviceAllocated(deviceID, allocatedState)
}

View file

@ -58,6 +58,32 @@ func NewDeviceConsumedCapacity(deviceID DeviceID, consumedCapacity map[resourcea
return schedulerapi.NewDeviceConsumedCapacity(deviceID, consumedCapacity)
}
// IsDeviceAllocated checks if a device is allocated, considering both fully allocated devices
// and partially consumed devices when consumable capacity is enabled.
func IsDeviceAllocated(deviceID DeviceID, allocatedState *AllocatedState) bool {
// Check if device is fully allocated (traditional case).
if allocatedState.AllocatedDevices.Has(deviceID) {
return true
}
// Check if device is partially consumed via shared allocations (consumable capacity case).
// We need to check if any shared device ID corresponds to our device.
for sharedDeviceID := range allocatedState.AllocatedSharedDeviceIDs {
if sharedDeviceID.GetDeviceID() == deviceID {
return true
}
}
// For scheduler-generated state, consumed capacity is recorded together with
// a shared device ID. Keep this check to preserve IsDeviceAllocated semantics
// and handle manually constructed or future AllocatedState producers.
if _, hasConsumedCapacity := allocatedState.AggregatedCapacity[deviceID]; hasConsumedCapacity {
return true
}
return false
}
// GenerateShareID is a helper function that generates a new share ID.
// This remains in the internal package as it's a utility function.
func GenerateShareID() *types.UID {

View file

@ -5425,6 +5425,57 @@ func TestAllocator(t *testing.T,
deviceRequestAllocationResult(req1SubReq1, driverA, pool1, device3).withConsumedCapacity(&fixedShareID, map[resourceapi.QualifiedName]resource.Quantity{"memory": resource.MustParse("4Gi")}),
)},
},
"allow-multiple-allocations-with-shared-device-counter-cross-cycle": {
features: Features{
PartitionableDevices: true,
ConsumableCapacity: true,
},
claimsToAllocate: objects(
claim(claim0).withRequests(
deviceRequest(req0, classA, 1).
withCapacityRequest(ptr.To(one)).
withSelectors(resourceapi.DeviceSelector{
CEL: &resourceapi.CELDeviceSelector{
Expression: fmt.Sprintf(`device.attributes["%s"].mode == "b"`, driverA),
},
}),
),
),
allocatedSharedDeviceIDs: sets.New(
internal.MakeSharedDeviceID(MakeDeviceID(driverA, pool1, device1), &fixedShareID),
),
allocatedCapacityDevices: ConsumedCapacityCollection{
MakeDeviceID(driverA, pool1, device1): ConsumedCapacity{
capacity0: ptr.To(one),
},
},
classes: objects(classWithAllowMultipleAllocations(classA, driverA, true)),
slices: unwrapResourceSlices(
sliceWithDevices(slice1, node1, resourcePool(pool1, 2), driverA,
device(device1, fromCounters, map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{
"mode": {StringValue: ptr.To("a")},
}).withDeviceCounterConsumption(
deviceCounterConsumption(counterSet1, map[string]resource.Quantity{
capacity0: one,
}),
).withAllowMultipleAllocations(),
device(device2, fromCounters, map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{
"mode": {StringValue: ptr.To("b")},
}).withDeviceCounterConsumption(
deviceCounterConsumption(counterSet1, map[string]resource.Quantity{
capacity0: one,
}),
).withAllowMultipleAllocations(),
),
sliceWithCounterSets(slice2, node1, resourcePool(pool1, 2), driverA,
counterSet(counterSet1, map[string]resource.Quantity{
capacity0: one,
}),
),
),
node: node(node1, region1),
expectResults: []any{},
},
"consumable-capacity-with-partitionable-device-multiple-capacity-pools": {
// This test case combines integration of PrioritizedList, PartitionableDevices, and ConsumableCapacity features.
features: Features{

View file

@ -1650,7 +1650,7 @@ func (alloc *allocator) checkAvailableCounters(device deviceWithID) (bool, error
}
// Devices that aren't allocated doesn't consume any counters, so we don't
// need to consider them.
if !alloc.allocatedState.AllocatedDevices.Has(deviceID) {
if !internal.IsDeviceAllocated(deviceID, &alloc.allocatedState) {
continue
}
for _, deviceCounterConsumption := range device.ConsumesCounters {

View file

@ -1522,7 +1522,7 @@ func (alloc *allocator) checkAvailableCounters(device deviceWithID) (bool, error
}
// Devices that aren't allocated doesn't consume any counters, so we don't
// need to consider them.
if !alloc.allocatedState.AllocatedDevices.Has(deviceID) {
if !internal.IsDeviceAllocated(deviceID, &alloc.allocatedState) {
continue
}
for _, deviceCounterConsumption := range device.ConsumesCounters {