Merge pull request #137897 from nojnhuh/dra-gang

scheduler: fix race in DRA pending allocation sharing
This commit is contained in:
Kubernetes Prow Robot 2026-03-24 23:40:18 +05:30 committed by GitHub
commit da97d71f14
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 406 additions and 372 deletions

View file

@ -148,26 +148,18 @@ func (r *resourceClaimTrackerContract) GatherAllocatedState() (*schedulerapi.All
return nil, nil
}
func (r *resourceClaimTrackerContract) GetPendingAllocation(_ types.UID) *resourceapi.AllocationResult {
return nil
}
func (r *resourceClaimTrackerContract) SignalClaimPendingAllocation(_ types.UID, _ *resourceapi.ResourceClaim) error {
return nil
}
func (r *resourceClaimTrackerContract) GetPendingAllocation(_ types.UID) (*resourceapi.AllocationResult, bool) {
return nil, false
}
func (r *resourceClaimTrackerContract) MaybeRemoveClaimPendingAllocation(_ types.UID, _ bool) (deleted bool) {
return false
}
func (r *resourceClaimTrackerContract) AddSharedClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error {
return nil
}
func (r *resourceClaimTrackerContract) RemoveSharedClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error {
return nil
}
func (r *resourceClaimTrackerContract) AssumeClaimAfterAPICall(_ *resourceapi.ResourceClaim) error {
return nil
}

View file

@ -41,11 +41,11 @@ type CycleState struct {
// GetParallelPreBindPlugins returns plugins that can be run in parallel with other plugins
// in the PreBind extension point.
parallelPreBindPlugins sets.Set[string]
// isPodGroupSchedulingCycle indicates whether this cycle is a pod group scheduling cycle or not.
// If set to false, it means that the pod referencing this CycleState either passed the pod group cycle
// podGroupCycleState contains the CycleState for this pod's PodGroup.
// If set to nil, it means that the pod referencing this CycleState either passed the pod group cycle
// or doesn't belong to any pod group.
// This field can only be set to true when GenericWorkload feature flag is enabled.
isPodGroupSchedulingCycle bool
// This field can only be non-nil when GenericWorkload feature flag is enabled.
podGroupCycleState fwk.PodGroupCycleState
}
// NewCycleState initializes a new CycleState and returns its pointer.
@ -102,11 +102,15 @@ func (c *CycleState) GetParallelPreBindPlugins() sets.Set[string] {
}
func (c *CycleState) IsPodGroupSchedulingCycle() bool {
return c.isPodGroupSchedulingCycle
return c.podGroupCycleState != nil
}
func (c *CycleState) SetPodGroupSchedulingCycle(isPodGroupSchedulingCycle bool) {
c.isPodGroupSchedulingCycle = isPodGroupSchedulingCycle
func (c *CycleState) SetPodGroupSchedulingCycle(podGroupCycleState fwk.PodGroupCycleState) {
c.podGroupCycleState = podGroupCycleState
}
func (c *CycleState) GetPodGroupSchedulingCycle() fwk.PodGroupCycleState {
return c.podGroupCycleState
}
func (c *CycleState) SetSkipAllPostFilterPlugins(flag bool) {
@ -135,7 +139,7 @@ func (c *CycleState) Clone() fwk.CycleState {
copy.skipScorePlugins = c.skipScorePlugins
copy.skipPreBindPlugins = c.skipPreBindPlugins
copy.parallelPreBindPlugins = c.parallelPreBindPlugins
copy.isPodGroupSchedulingCycle = c.isPodGroupSchedulingCycle
copy.podGroupCycleState = c.podGroupCycleState
copy.skipAllPostFilterPlugins = c.skipAllPostFilterPlugins
return copy

View file

@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"iter"
"slices"
"sync"
@ -63,7 +64,7 @@ func NewDRAManager(ctx context.Context, claimsCache *assumecache.AssumeCache, re
manager := &DefaultDRAManager{
resourceClaimTracker: &claimTracker{
cache: claimsCache,
inFlightAllocations: &sync.Map{},
inFlightAllocations: make(map[types.UID]inFlightAllocation),
allocatedDevices: newAllocatedDevices(logger),
logger: logger,
},
@ -74,9 +75,6 @@ func NewDRAManager(ctx context.Context, claimsCache *assumecache.AssumeCache, re
if utilfeature.DefaultFeatureGate.Enabled(features.DRAExtendedResource) {
manager.extendedResourceCache = extendedresourcecache.NewExtendedResourceCache(logger)
}
if utilfeature.DefaultFeatureGate.Enabled(features.GenericWorkload) {
manager.resourceClaimTracker.inFlightAllocationSharers = &sync.Map{}
}
pgLister := &podGroupLister{}
if utilfeature.DefaultFeatureGate.Enabled(features.DRAWorkloadResourceClaims) {
@ -178,13 +176,16 @@ type claimTracker struct {
// - would make integration with cluster autoscaler harder because it would need
// to trigger informer callbacks.
cache *assumecache.AssumeCache
// inFlightMutex syncs access to inFlightAllocations.
inFlightMutex sync.RWMutex
// inFlightAllocations is a map from claim UUIDs to claim objects for those claims
// for which allocation was triggered during a scheduling cycle and the
// corresponding claim status update call in PreBind has not been done
// yet. If another pod needs the claim, the pod is treated as "not
// schedulable yet" unless the pod is a member of a PodGroup. For ungrouped
// pods, the cluster event for the claim status update will make it
// schedulable.
// yet. It also includes a reference count tracking how many actively
// scheduling Pods in a PodGroup are using that pending allocation. If
// another pod outside the PodGroup needs the claim, the pod is treated as
// "not schedulable yet". For those pods, the cluster event for the
// claim status update will make them schedulable.
//
// This mechanism avoids the following problem:
// - Pod A triggers allocation for claim X.
@ -215,25 +216,45 @@ type claimTracker struct {
// pods is expected to be rare compared to per-pod claim, so we end up
// hitting the "multiple goroutines read, write, and overwrite entries
// for disjoint sets of keys" case that sync.Map is optimized for.
inFlightAllocations *sync.Map
// inFlightAllocationSharers counts the actively scheduling pods
// sharing a given ResourceClaim.
inFlightAllocationSharers *sync.Map
allocatedDevices *allocatedDevices
logger klog.Logger
inFlightAllocations map[types.UID]inFlightAllocation
allocatedDevices *allocatedDevices
logger klog.Logger
}
func (c *claimTracker) GetPendingAllocation(claimUID types.UID) (*resourceapi.AllocationResult, bool) {
var allocation *resourceapi.AllocationResult
claim, found := c.inFlightAllocations.Load(claimUID)
if found && claim != nil {
allocation = claim.(*resourceapi.ResourceClaim).Status.Allocation
type inFlightAllocation struct {
claim *resourceapi.ResourceClaim
sharers int
}
func (c *claimTracker) GetPendingAllocation(claimUID types.UID) *resourceapi.AllocationResult {
c.inFlightMutex.RLock()
defer c.inFlightMutex.RUnlock()
inFlight, found := c.inFlightAllocations[claimUID]
if !found || inFlight.claim == nil {
return nil
}
return allocation, found
return inFlight.claim.Status.Allocation
}
func (c *claimTracker) SignalClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error {
c.inFlightAllocations.Store(claimUID, allocatedClaim)
c.inFlightMutex.Lock()
defer c.inFlightMutex.Unlock()
inFlight, found := c.inFlightAllocations[claimUID]
if found {
inFlight.sharers++
c.inFlightAllocations[claimUID] = inFlight
claim := inFlight.claim
c.logger.V(5).Info("Added share for in-flight claim", "claim", klog.KObj(claim), "uid", claimUID, "version", claim.ResourceVersion, "sharers", inFlight.sharers)
return nil
}
c.inFlightAllocations[claimUID] = inFlightAllocation{
claim: allocatedClaim,
sharers: 1,
}
// This is the same verbosity as the corresponding log in the assume cache.
c.logger.V(5).Info("Added in-flight claim", "claim", klog.KObj(allocatedClaim), "uid", claimUID, "version", allocatedClaim.ResourceVersion)
// There's no reason to return an error in this implementation, but the error is helpful for other implementations.
@ -242,67 +263,28 @@ func (c *claimTracker) SignalClaimPendingAllocation(claimUID types.UID, allocate
return nil
}
func (c *claimTracker) MaybeRemoveClaimPendingAllocation(claimUID types.UID, shareable bool) (deleted bool) {
if c.inFlightAllocationSharers != nil && shareable {
value, ok := c.inFlightAllocationSharers.Load(claimUID)
if ok && value.(int) > 0 {
if loggerV := c.logger.V(5); loggerV.Enabled() {
claim, found := c.inFlightAllocations.Load(claimUID)
if found {
claim := claim.(*resourceapi.ResourceClaim)
c.logger.V(5).Info("Claim is still shared by other pods, not removing in-flight claim", "claim", klog.KObj(claim), "uid", claimUID, "version", claim.ResourceVersion)
}
}
return false
}
}
func (c *claimTracker) MaybeRemoveClaimPendingAllocation(claimUID types.UID, forceRemove bool) (deleted bool) {
c.inFlightMutex.Lock()
defer c.inFlightMutex.Unlock()
claim, found := c.inFlightAllocations.LoadAndDelete(claimUID)
inFlight, found := c.inFlightAllocations[claimUID]
// The assume cache doesn't log this, but maybe it should.
if found {
claim := claim.(*resourceapi.ResourceClaim)
c.logger.V(5).Info("Removed in-flight claim", "claim", klog.KObj(claim), "uid", claimUID, "version", claim.ResourceVersion)
} else {
if !found {
c.logger.V(5).Info("Redundant remove of in-flight claim, not found", "uid", claimUID)
return false
}
return found
}
claim := inFlight.claim
func (c *claimTracker) AddSharedClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error {
newSharers := 1
value, loaded := c.inFlightAllocationSharers.LoadOrStore(claimUID, newSharers)
if loaded {
oldSharers := value.(int)
newSharers = oldSharers + 1
swapped := c.inFlightAllocationSharers.CompareAndSwap(claimUID, oldSharers, newSharers)
if !swapped {
// The value must have changed since we loaded
return fmt.Errorf("conflict adding in-flight allocation sharer for claim %s/%s, UID=%s", allocatedClaim.Namespace, allocatedClaim.Name, claimUID)
}
if forceRemove || inFlight.sharers == 1 {
delete(c.inFlightAllocations, claimUID)
c.logger.V(5).Info("Removed in-flight claim", "claim", klog.KObj(claim), "uid", claimUID, "version", claim.ResourceVersion)
return true
}
c.logger.V(5).Info("Added share for in-flight claim", "claim", klog.KObj(allocatedClaim), "uid", claimUID, "version", allocatedClaim.ResourceVersion, "sharers", newSharers)
return nil
}
func (c *claimTracker) RemoveSharedClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error {
value, ok := c.inFlightAllocationSharers.Load(claimUID)
if !ok {
return nil
}
oldSharers := value.(int)
newSharers := oldSharers - 1
var written bool
if newSharers == 0 {
written = c.inFlightAllocationSharers.CompareAndDelete(claimUID, oldSharers)
} else {
written = c.inFlightAllocationSharers.CompareAndSwap(claimUID, oldSharers, newSharers)
}
if !written {
// The value must have changed since we loaded
return fmt.Errorf("conflict removing in-flight allocation sharer for claim %s/%s, UID=%s", allocatedClaim.Namespace, allocatedClaim.Name, claimUID)
}
c.logger.V(5).Info("Removed share for in-flight claim", "claim", klog.KObj(allocatedClaim), "uid", claimUID, "version", allocatedClaim.ResourceVersion, "sharers", newSharers)
return nil
inFlight.sharers--
c.inFlightAllocations[claimUID] = inFlight
c.logger.V(5).Info("Claim is still shared by other pods, not removing in-flight claim", "claim", klog.KObj(claim), "uid", claimUID, "version", claim.ResourceVersion, "sharers", inFlight.sharers)
return false
}
func (c *claimTracker) Get(namespace, claimName string) (*resourceapi.ResourceClaim, error) {
@ -360,14 +342,13 @@ func (c *claimTracker) ListAllAllocatedDevices() (a sets.Set[structured.DeviceID
allocated, revision := c.allocatedDevices.Get()
// Whatever is in flight also has to be checked.
c.inFlightAllocations.Range(func(key, value any) bool {
claim := value.(*resourceapi.ResourceClaim)
for _, inFlight := range c.allInFlightAllocationsRLocked() {
claim := inFlight.claim
foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) {
c.logger.V(6).Info("Device is in flight for allocation", "device", deviceID, "claim", klog.KObj(claim))
allocated.Insert(deviceID)
}, false, func(structured.SharedDeviceID) {}, func(structured.DeviceConsumedCapacity) {})
return true
})
}
if revision == c.allocatedDevices.Revision() {
// Our current result is valid, nothing changed in the meantime.
@ -433,8 +414,8 @@ func (c *claimTracker) GatherAllocatedState() (s *structured.AllocatedState, err
}
// Whatever is in flight also has to be checked.
c.inFlightAllocations.Range(func(key, value any) bool {
claim := value.(*resourceapi.ResourceClaim)
for _, inFlight := range c.allInFlightAllocationsRLocked() {
claim := inFlight.claim
foreachAllocatedDevice(claim,
func(deviceID structured.DeviceID) { // dedicatedDeviceCallback
c.logger.V(6).Info("Device is in flight for allocation", "device", deviceID, "claim", klog.KObj(claim))
@ -449,9 +430,7 @@ func (c *claimTracker) GatherAllocatedState() (s *structured.AllocatedState, err
c.logger.V(6).Info("Device is in flight for allocation", "consumed capacity", capacity, "claim", klog.KObj(claim))
aggregatedCapacity.Insert(capacity)
})
return true
})
}
if revision1 == c.allocatedDevices.Revision() {
// Our current result is valid, nothing changed in the meantime.
return &structured.AllocatedState{
@ -464,6 +443,18 @@ func (c *claimTracker) GatherAllocatedState() (s *structured.AllocatedState, err
return nil, errClaimTrackerConcurrentModification
}
func (c *claimTracker) allInFlightAllocationsRLocked() iter.Seq2[types.UID, inFlightAllocation] {
return func(yield func(types.UID, inFlightAllocation) bool) {
c.inFlightMutex.RLock()
defer c.inFlightMutex.RUnlock()
for uid, inFlight := range c.inFlightAllocations {
if !yield(uid, inFlight) {
return
}
}
}
}
func (c *claimTracker) AssumeClaimAfterAPICall(claim *resourceapi.ResourceClaim) error {
return c.cache.Assume(claim)
}

View file

@ -17,7 +17,7 @@ limitations under the License.
package dynamicresources
import (
"sync"
"maps"
"testing"
"github.com/onsi/gomega"
@ -31,33 +31,33 @@ const (
otherClaimUID = types.UID("claim-uid-2")
)
func TestAddSharedClaimPendingAllocation(t *testing.T) {
testAddSharedClaimPendingAllocation(ktesting.Init(t))
func TestSignalClaimPendingAllocation(t *testing.T) {
testSignalClaimPendingAllocation(ktesting.Init(t))
}
func testAddSharedClaimPendingAllocation(tCtx ktesting.TContext) {
func testSignalClaimPendingAllocation(tCtx ktesting.TContext) {
tests := map[string]struct {
inFlightAllocationSharers map[types.UID]int
claimUID types.UID
allocatedClaim *resourceapi.ResourceClaim
expectedInFlightAllocationSharers map[types.UID]int
inFlightAllocations map[types.UID]inFlightAllocation
claimUID types.UID
allocatedClaim *resourceapi.ResourceClaim
expectedInFlightAllocations map[types.UID]inFlightAllocation
}{
"empty": {
claimUID: claimUID,
allocatedClaim: allocatedClaim,
expectedInFlightAllocationSharers: map[types.UID]int{
claimUID: 1,
expectedInFlightAllocations: map[types.UID]inFlightAllocation{
claimUID: {claim: allocatedClaim, sharers: 1},
},
},
"increment": {
inFlightAllocationSharers: map[types.UID]int{
claimUID: 1,
otherClaimUID: 10,
"already-exists": {
inFlightAllocations: map[types.UID]inFlightAllocation{
claimUID: {claim: allocatedClaim, sharers: 1},
otherClaimUID: {claim: allocatedClaim2, sharers: 10},
},
claimUID: claimUID,
allocatedClaim: allocatedClaim,
expectedInFlightAllocationSharers: map[types.UID]int{
claimUID: 2,
otherClaimUID: 10,
expectedInFlightAllocations: map[types.UID]inFlightAllocation{
claimUID: {claim: allocatedClaim, sharers: 2},
otherClaimUID: {claim: allocatedClaim2, sharers: 10},
},
},
}
@ -65,79 +65,61 @@ func testAddSharedClaimPendingAllocation(tCtx ktesting.TContext) {
for name, test := range tests {
tCtx.Run(name, func(tCtx ktesting.TContext) {
c := &claimTracker{
logger: tCtx.Logger(),
inFlightAllocationSharers: &sync.Map{},
logger: tCtx.Logger(),
inFlightAllocations: make(map[types.UID]inFlightAllocation),
}
maps.Copy(c.inFlightAllocations, test.inFlightAllocations)
for key, value := range test.inFlightAllocationSharers {
c.inFlightAllocationSharers.Store(key, value)
}
err := c.AddSharedClaimPendingAllocation(test.claimUID, test.allocatedClaim)
err := c.SignalClaimPendingAllocation(test.claimUID, test.allocatedClaim)
tCtx.ExpectNoError(err)
actual := map[types.UID]int{}
c.inFlightAllocationSharers.Range(func(key, value any) bool {
actual[key.(types.UID)] = value.(int)
return true
})
tCtx.Expect(actual).To(gomega.Equal(test.expectedInFlightAllocationSharers))
tCtx.Expect(c.inFlightAllocations).To(gomega.Equal(test.expectedInFlightAllocations))
})
}
}
func TestRemoveSharedClaimPendingAllocation(t *testing.T) {
testRemoveSharedClaimPendingAllocation(ktesting.Init(t))
func TestGetPendingAllocation(t *testing.T) {
testGetPendingAllocation(ktesting.Init(t))
}
func testRemoveSharedClaimPendingAllocation(tCtx ktesting.TContext) {
func testGetPendingAllocation(tCtx ktesting.TContext) {
tests := map[string]struct {
inFlightAllocationSharers map[types.UID]int
claimUID types.UID
allocatedClaim *resourceapi.ResourceClaim
expectedInFlightAllocationSharers map[types.UID]int
expectedErr string
inFlightAllocations map[types.UID]inFlightAllocation
claimUID types.UID
expected *resourceapi.AllocationResult
}{
"empty": {
inFlightAllocationSharers: map[types.UID]int{
claimUID: 1,
otherClaimUID: 10,
},
claimUID: claimUID,
allocatedClaim: allocatedClaim,
expectedInFlightAllocationSharers: map[types.UID]int{
otherClaimUID: 10,
},
claimUID: claimUID,
expected: nil,
},
"decrement": {
inFlightAllocationSharers: map[types.UID]int{
claimUID: 2,
otherClaimUID: 10,
"nil-claim": {
inFlightAllocations: map[types.UID]inFlightAllocation{
claimUID: {claim: nil},
},
claimUID: claimUID,
allocatedClaim: allocatedClaim,
expectedInFlightAllocationSharers: map[types.UID]int{
claimUID: 1,
otherClaimUID: 10,
claimUID: claimUID,
expected: nil,
},
"claim": {
inFlightAllocations: map[types.UID]inFlightAllocation{
claimUID: {claim: allocatedClaim, sharers: 1},
otherClaimUID: {claim: allocatedClaim2, sharers: 10},
},
claimUID: claimUID,
expected: allocationResult,
},
}
for name, test := range tests {
tCtx.Run(name, func(tCtx ktesting.TContext) {
c := &claimTracker{
logger: tCtx.Logger(),
inFlightAllocationSharers: &sync.Map{},
logger: tCtx.Logger(),
inFlightAllocations: make(map[types.UID]inFlightAllocation),
}
maps.Copy(c.inFlightAllocations, test.inFlightAllocations)
beforeInFlight := maps.Clone(c.inFlightAllocations)
for key, value := range test.inFlightAllocationSharers {
c.inFlightAllocationSharers.Store(key, value)
}
err := c.RemoveSharedClaimPendingAllocation(test.claimUID, test.allocatedClaim)
tCtx.ExpectNoError(err)
actual := map[types.UID]int{}
c.inFlightAllocationSharers.Range(func(key, value any) bool {
actual[key.(types.UID)] = value.(int)
return true
})
tCtx.Expect(actual).To(gomega.Equal(test.expectedInFlightAllocationSharers))
actual := c.GetPendingAllocation(test.claimUID)
tCtx.Expect(actual).To(gomega.Equal(test.expected))
// Get is strictly read-only
tCtx.Expect(c.inFlightAllocations).To(gomega.Equal(beforeInFlight))
})
}
}
@ -147,62 +129,63 @@ func TestMaybeRemoveClaimPendingAllocation(t *testing.T) {
}
func testMaybeRemoveClaimPendingAllocation(tCtx ktesting.TContext) {
tests := map[string]struct {
inFlightAllocations map[types.UID]*resourceapi.ResourceClaim
inFlightAllocationSharers map[types.UID]int
claimUID types.UID
shareable bool
expected bool
expectedInFlightAllocationSharers map[types.UID]int
expectedInFlightAllocations map[types.UID]*resourceapi.ResourceClaim
inFlightAllocations map[types.UID]inFlightAllocation
claimUID types.UID
forceRemove bool
expected bool
expectedInFlightAllocations map[types.UID]inFlightAllocation
}{
"empty": {
claimUID: claimUID,
shareable: true,
expected: false,
expectedInFlightAllocations: map[types.UID]*resourceapi.ResourceClaim{},
expectedInFlightAllocationSharers: map[types.UID]int{},
},
"delete-existing-not-shareable": {
inFlightAllocations: map[types.UID]*resourceapi.ResourceClaim{
claimUID: allocatedClaim,
},
claimUID: claimUID,
expected: true,
expectedInFlightAllocations: map[types.UID]*resourceapi.ResourceClaim{},
expectedInFlightAllocationSharers: map[types.UID]int{},
},
"delete-existing-shareable-unshared": {
inFlightAllocations: map[types.UID]*resourceapi.ResourceClaim{
claimUID: allocatedClaim,
},
inFlightAllocationSharers: map[types.UID]int{
otherClaimUID: 10,
},
claimUID: claimUID,
shareable: true,
expected: true,
expectedInFlightAllocations: map[types.UID]*resourceapi.ResourceClaim{},
expectedInFlightAllocationSharers: map[types.UID]int{
otherClaimUID: 10,
forceRemove: true,
expected: false,
expectedInFlightAllocations: map[types.UID]inFlightAllocation{},
},
"delete-last-sharer": {
inFlightAllocations: map[types.UID]inFlightAllocation{
claimUID: {claim: allocatedClaim, sharers: 1},
otherClaimUID: {claim: allocatedClaim2, sharers: 10},
},
claimUID: claimUID,
expected: true,
expectedInFlightAllocations: map[types.UID]inFlightAllocation{
otherClaimUID: {claim: allocatedClaim2, sharers: 10},
},
},
"keep-existing-shareable-shared": {
inFlightAllocations: map[types.UID]*resourceapi.ResourceClaim{
claimUID: allocatedClaim,
"force-delete-last-sharer": {
inFlightAllocations: map[types.UID]inFlightAllocation{
claimUID: {claim: allocatedClaim, sharers: 1},
otherClaimUID: {claim: allocatedClaim2, sharers: 10},
},
inFlightAllocationSharers: map[types.UID]int{
claimUID: 1,
otherClaimUID: 0,
claimUID: claimUID,
forceRemove: true,
expected: true,
expectedInFlightAllocations: map[types.UID]inFlightAllocation{
otherClaimUID: {claim: allocatedClaim2, sharers: 10},
},
claimUID: claimUID,
shareable: true,
expected: false,
expectedInFlightAllocations: map[types.UID]*resourceapi.ResourceClaim{
claimUID: allocatedClaim,
},
"decrement-remaining-sharers": {
inFlightAllocations: map[types.UID]inFlightAllocation{
claimUID: {claim: allocatedClaim, sharers: 2},
otherClaimUID: {claim: allocatedClaim2, sharers: 10},
},
expectedInFlightAllocationSharers: map[types.UID]int{
claimUID: 1,
otherClaimUID: 0,
claimUID: claimUID,
expected: false,
expectedInFlightAllocations: map[types.UID]inFlightAllocation{
claimUID: {claim: allocatedClaim, sharers: 1},
otherClaimUID: {claim: allocatedClaim2, sharers: 10},
},
},
"force-delete-remaining-sharers": {
inFlightAllocations: map[types.UID]inFlightAllocation{
claimUID: {claim: allocatedClaim, sharers: 2},
otherClaimUID: {claim: allocatedClaim2, sharers: 10},
},
claimUID: claimUID,
forceRemove: true,
expected: true,
expectedInFlightAllocations: map[types.UID]inFlightAllocation{
otherClaimUID: {claim: allocatedClaim2, sharers: 10},
},
},
}
@ -210,31 +193,14 @@ func testMaybeRemoveClaimPendingAllocation(tCtx ktesting.TContext) {
for name, test := range tests {
tCtx.Run(name, func(tCtx ktesting.TContext) {
c := &claimTracker{
logger: tCtx.Logger(),
inFlightAllocations: &sync.Map{},
inFlightAllocationSharers: &sync.Map{},
logger: tCtx.Logger(),
inFlightAllocations: make(map[types.UID]inFlightAllocation),
}
maps.Copy(c.inFlightAllocations, test.inFlightAllocations)
for key, value := range test.inFlightAllocations {
c.inFlightAllocations.Store(key, value)
}
for key, value := range test.inFlightAllocationSharers {
c.inFlightAllocationSharers.Store(key, value)
}
actual := c.MaybeRemoveClaimPendingAllocation(test.claimUID, test.shareable)
actual := c.MaybeRemoveClaimPendingAllocation(test.claimUID, test.forceRemove)
tCtx.Expect(actual).To(gomega.Equal(test.expected), "wrong value for deletion indicator")
actualInFlight := map[types.UID]*resourceapi.ResourceClaim{}
c.inFlightAllocations.Range(func(key, value any) bool {
actualInFlight[key.(types.UID)] = value.(*resourceapi.ResourceClaim)
return true
})
tCtx.Expect(actualInFlight).To(gomega.Equal(test.expectedInFlightAllocations))
actualSharers := map[types.UID]int{}
c.inFlightAllocationSharers.Range(func(key, value any) bool {
actualSharers[key.(types.UID)] = value.(int)
return true
})
tCtx.Expect(actualSharers).To(gomega.Equal(test.expectedInFlightAllocationSharers))
tCtx.Expect(c.inFlightAllocations).To(gomega.Equal(test.expectedInFlightAllocations))
})
}
}

View file

@ -115,11 +115,32 @@ func (d *stateData) Clone() fwk.StateData {
return d
}
// This state is persisted in the PodGroup CycleState. Because we save the
// pointer in fwk.CycleState, in the later phases we don't need to call Write
// method to update the value.
type podGroupStateData struct {
// pendingAllocations stores the UIDs of ResourceClaims that are currently
// pending for *this* PodGroup scheduling cycle. The DRAManager continues to
// track inter-cycle sharing of pending allocations, including the
// allocations themselves and how many Pods are sharing them.
//
// It does not need to be protected by a lock because a PodGroup's
// CycleState is only available during synchronous scheduling phases and is
// cleared before asynchronous phases begin.
pendingAllocations sets.Set[types.UID]
}
func (d *podGroupStateData) Clone() fwk.StateData {
return d
}
type informationForClaim struct {
// Node selector based on the claim status if allocated.
availableOnNodes *nodeaffinity.NodeSelector
// Set by Reserved, published by PreBind, empty if nothing had to be allocated.
// Set by Reserve, published by PreBind, empty if nothing had to be allocated.
// May be set by PreFilter if this allocation was made by a previous Pod's
// Reserve but not yet published in PreBind.
allocation *resourceapi.AllocationResult
}
@ -448,6 +469,11 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state fwk.CycleState,
s := &stateData{}
state.Write(stateKey, s)
podGroupState, err := getPodGroupStateData(state)
if err != nil {
return nil, statusError(logger, err)
}
userClaims, err := pl.podResourceClaims(pod)
if err != nil {
return nil, statusUnschedulable(logger, err.Error())
@ -479,22 +505,9 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state fwk.CycleState,
return nil, statusUnschedulable(logger, "resourceclaim in use", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim))
}
allocation := claim.Status.Allocation
// Gang scheduling works by waiting for all Pods in a gang to succeed
// through the Reserve phase, then binding all of the Pods. Since a
// claim's allocation is not recorded in [stateData.claims] until
// PreBind, we look for a pending allocation made in Reserve here to
// avoid blocking this Pod on waiting for the allocation to complete.
if pl.isSchedulingPodGroup(pod) && allocation == nil {
if pendingAllocation, found := pl.draManager.ResourceClaims().GetPendingAllocation(claim.UID); found {
allocation = pendingAllocation
logger.V(5).Info("reusing pending allocation", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim), "uid", claim.UID, "allocation", klog.Format(allocation))
}
}
if allocation != nil {
if allocation.NodeSelector != nil {
nodeSelector, err := nodeaffinity.NewNodeSelector(allocation.NodeSelector)
if claim.Status.Allocation != nil {
if claim.Status.Allocation.NodeSelector != nil {
nodeSelector, err := nodeaffinity.NewNodeSelector(claim.Status.Allocation.NodeSelector)
if err != nil {
return nil, statusError(logger, err)
}
@ -503,10 +516,38 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state fwk.CycleState,
} else {
numClaimsToAllocate++
// Gang scheduling works by waiting for all Pods in a gang to
// succeed through the Reserve phase, then binding all of the Pods.
// Since a claim's allocation is not recorded in [stateData.claims]
// until PreBind, we look for a pending allocation made in Reserve
// here to avoid blocking this Pod on waiting for the allocation to
// complete.
//
// Claims with pending allocations are considered "unallocated" in
// that the claim's Status.Allocation needs to be populated. Claims
// with pending allocations are not input to the allocator, though
// we numClaimsToAllocate increment to create an allocator anyway to
// signal that some claims need their status updated in PreBind.
//
// Currently, sharing is limited to a single PodGroup scheduling
// cycle. Ungrouped Pods and Pods across different PodGroup
// scheduling cycles cannot share claims with pending allocations
// for now to limit potential impact outside of the alpha
// GenericWorkload feature gate. Sharing claims this way more
// broadly may have benefits:
// https://github.com/kubernetes/kubernetes/issues/137932
if podGroupState != nil && podGroupState.pendingAllocations.Has(claim.UID) {
if pendingAllocation := pl.draManager.ResourceClaims().GetPendingAllocation(claim.UID); pendingAllocation != nil {
s.informationsForClaim[index].allocation = pendingAllocation
logger.V(5).Info("reusing pending allocation", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim), "uid", claim.UID, "allocation", klog.Format(pendingAllocation))
continue
}
}
// Allocation in flight? Better wait for that
// to finish, see inFlightAllocations
// documentation for details.
if _, pending := pl.draManager.ResourceClaims().GetPendingAllocation(claim.UID); pending {
if pl.draManager.ResourceClaims().GetPendingAllocation(claim.UID) != nil {
return nil, statusUnschedulable(logger, fmt.Sprintf("resource claim %s is in the process of being allocated", klog.KObj(claim)))
}
@ -672,6 +713,29 @@ func getStateData(cs fwk.CycleState) (*stateData, error) {
return s, nil
}
func getPodGroupStateData(cs fwk.CycleState) (*podGroupStateData, error) {
podGroupCycleState := cs.GetPodGroupSchedulingCycle()
if podGroupCycleState == nil {
return nil, nil
}
state, err := podGroupCycleState.Read(stateKey)
if errors.Is(err, fwk.ErrNotFound) {
podGroupState := &podGroupStateData{
pendingAllocations: make(sets.Set[types.UID]),
}
podGroupCycleState.Write(stateKey, podGroupState)
return podGroupState, nil
}
if err != nil {
return nil, err
}
s, ok := state.(*podGroupStateData)
if !ok {
return nil, errors.New("state is not podGroupStateData")
}
return s, nil
}
// Filter invoked at the filter extension point.
// It evaluates if a pod can fit due to the resources it requests,
// for both allocated and unallocated claims.
@ -766,11 +830,19 @@ func (pl *DynamicResources) Filter(ctx context.Context, cs fwk.CycleState, pod *
// This replaces the special ResourceClaim for extended resources with one
// matching the node.
claimsToAllocate := make([]*resourceapi.ResourceClaim, 0, state.claims.len())
// pendingResult holds pending allocations that were made for a previous
// Pod in Reserve, but before that allocation is recorded in the status
// by PreBind.
var pendingResult []resourceapi.AllocationResult
extendedResourceClaim := state.claims.extendedResourceClaim()
for _, claim := range state.claims.toAllocate() {
for index, claim := range state.claims.toAllocate() {
if claim == extendedResourceClaim && nodeExtendedResourceClaim != nil {
claim = nodeExtendedResourceClaim
}
if state.informationsForClaim[index].allocation != nil {
pendingResult = append(pendingResult, *state.informationsForClaim[index].allocation)
continue
}
claimsToAllocate = append(claimsToAllocate, claim)
}
allocationResult, err := state.allocator.Allocate(allocCtx, node, claimsToAllocate)
@ -812,13 +884,18 @@ func (pl *DynamicResources) Filter(ctx context.Context, cs fwk.CycleState, pod *
for i, claim := range claimsToAllocate {
allocationsMap[claim.UID] = &allocationResult[i]
}
for i, claim := range state.claims.toAllocate() {
if state.informationsForClaim[i].allocation != nil {
allocationsMap[claim.UID] = state.informationsForClaim[i].allocation
}
}
nodeAllocatableClaimStatus, status = pl.calculateAndCheckNodeAllocatableResources(ctx, state, pod, nodeInfo, allocationsMap)
if status != nil {
return status
}
}
// Reserve uses this information.
allocations = allocationResult
allocations = append(allocationResult, pendingResult...)
}
// Store information in state while holding the mutex.
@ -1078,6 +1155,11 @@ func (pl *DynamicResources) Reserve(ctx context.Context, cs fwk.CycleState, pod
logger := klog.FromContext(ctx)
podGroupState, err := getPodGroupStateData(cs)
if err != nil {
return statusError(logger, err)
}
numClaimsWithAllocator := 0
for _, claim := range state.claims.all() {
if claim.Status.Allocation != nil {
@ -1087,18 +1169,6 @@ func (pl *DynamicResources) Reserve(ctx context.Context, cs fwk.CycleState, pod
// will work and only do it for real during binding. If it fails at
// that time, some other pod was faster and we have to try again.
if pl.isSchedulingPodGroup(pod) && claim.UID != state.claims.getInitialExtendedResourceClaimUID() {
// Inform the tracker that this pod still needs the pending
// allocation. Then PreBind or Unreserve can know whether or not
// it's safe to remove the pending allocation.
//
// Extended resources claims cannot be shared, so we never
// signal sharing for it.
if err := pl.draManager.ResourceClaims().AddSharedClaimPendingAllocation(claim.UID, claim); err != nil {
return statusError(logger, err)
}
}
continue
}
@ -1168,6 +1238,10 @@ func (pl *DynamicResources) Reserve(ctx context.Context, cs fwk.CycleState, pod
}
logger.V(5).Info("Reserved resource in allocation result", "claim", klog.KObj(claim), "uid", claim.UID, "resourceVersion", claim.ResourceVersion, "allocation", klog.Format(allocation))
allocIndex++
if podGroupState != nil {
podGroupState.pendingAllocations.Insert(claim.UID)
}
}
}
@ -1187,28 +1261,33 @@ func (pl *DynamicResources) Unreserve(ctx context.Context, cs fwk.CycleState, po
if state.claims.empty() {
return
}
podGroupState, err := getPodGroupStateData(cs)
if err != nil {
return
}
logger := klog.FromContext(ctx)
// we process user claims here first, extendedResourceClaim if any is handled below.
for _, claim := range state.claims.allUserClaims() {
// Since several Pods may be sharing a claim with a pending allocation,
// make sure that we don't remove the pending allocation until the last
// Pod sharing it is Bound or Unreserved.
if pl.isSchedulingPodGroup(pod) {
if err := pl.draManager.ResourceClaims().RemoveSharedClaimPendingAllocation(claim.UID, claim); err != nil {
logger.Error(err, "unreserve", "resourceclaim", klog.KObj(claim))
continue
}
}
// If allocation was in-flight, then it might not be anymore if no pods
// still need the pending allocation. If the allocation was removed from
// in-flight, we need to revert the claim object in the assume cache to
// what it was before.
if deleted := pl.draManager.ResourceClaims().MaybeRemoveClaimPendingAllocation(claim.UID, pl.isSchedulingPodGroup(pod)); deleted {
// what it was before. The allocation is not removed until the last Pod
// sharing it is Bound or Unreserved.
if deleted := pl.draManager.ResourceClaims().MaybeRemoveClaimPendingAllocation(claim.UID, false); deleted {
logger.V(5).Info("Released resource in allocation result", "claim", klog.KObj(claim), "uid", claim.UID, "resourceVersion", claim.ResourceVersion, "allocation", klog.Format(claim.Status.Allocation))
pl.draManager.ResourceClaims().AssumedClaimRestore(claim.Namespace, claim.Name)
// If we are currently asynchronously Binding Pods in a PodGroup,
// then the pendingAllocations set does not need to be updated. New
// PodGroup scheduling cycles will start with an empty set and not
// share pending allocations started in *this* cycle until Unreserve
// completes for all the Pods sharing that pending allocation and
// they can be Reserved again in another cycle.
if podGroupState != nil {
podGroupState.pendingAllocations.Delete(claim.UID)
}
}
// Ignore claims reserved for the PodGroup because other Pods in the
@ -1264,9 +1343,14 @@ func (pl *DynamicResources) PreBind(ctx context.Context, cs fwk.CycleState, pod
logger := klog.FromContext(ctx)
podGroupState, err := getPodGroupStateData(cs)
if err != nil {
return statusError(logger, err)
}
for index, claim := range state.claims.all() {
if !resourceclaim.IsReservedForPod(pod, claim, pl.fts.EnableDRAWorkloadResourceClaims) {
claim, err := pl.bindClaim(ctx, state, index, pod, nodeName)
claim, err := pl.bindClaim(ctx, state, podGroupState, index, pod, nodeName)
if err != nil {
return statusError(logger, err)
}
@ -1382,7 +1466,7 @@ func (pl *DynamicResources) PreBindPreFlight(ctx context.Context, cs fwk.CycleSt
// bindClaim gets called by PreBind for claim which is not reserved for the pod yet.
// It might not even be allocated. bindClaim then ensures that the allocation
// and reservation are recorded. This finishes the work started in Reserve.
func (pl *DynamicResources) bindClaim(ctx context.Context, state *stateData, index int, pod *v1.Pod, nodeName string) (*resourceapi.ResourceClaim, error) {
func (pl *DynamicResources) bindClaim(ctx context.Context, state *stateData, podGroupState *podGroupStateData, index int, pod *v1.Pod, nodeName string) (*resourceapi.ResourceClaim, error) {
logger := klog.FromContext(ctx)
claim := state.claims.get(index)
binding := state.claims.getBinding(index, pod)
@ -1408,6 +1492,13 @@ func (pl *DynamicResources) bindClaim(ctx context.Context, state *stateData, ind
// The scheduler was handling allocation. Now that has
// completed, either successfully or with a failure.
// The claim should remain in-flight in case any more Pods in the
// PodGroup are still using the pending allocation. If binding failed
// (e.g. due to a conflict in writing the allocation), then we signal
// that this Pod no longer needs the pending allocation without removing it.
forceRemovePendingAllocation := false
if resourceClaimModified {
if isExtendedResourceClaim {
pl.waitForExtendedClaimInAssumeCache(ctx, claim)
@ -1418,22 +1509,28 @@ func (pl *DynamicResources) bindClaim(ctx context.Context, state *stateData, ind
logger.V(5).Info("Claim not stored in assume cache", "err", err, "claim", klog.KObj(claim), "uid", claimUID, "resourceVersion", resourceVersion)
} else {
logger.V(5).Info("Claim stored in assume cache", "claim", klog.KObj(claim), "uid", claimUID, "resourceVersion", resourceVersion)
// Once the pending allocation is recorded in the
// AssumeCache, we no longer need to keep it in-flight.
// Even if it happens to still be shared, the allocation in
// the AssumeCache is authoritative.
forceRemovePendingAllocation = true
}
}
}
if allocation != nil {
for _, claimUID := range claimUIDs {
// Since several Pods may be sharing a claim with a pending
// allocation, make sure that we don't remove the pending
// allocation until the last Pod sharing it is Bound or Unreserved.
if pl.isSchedulingPodGroup(pod) {
if err := pl.draManager.ResourceClaims().RemoveSharedClaimPendingAllocation(claim.UID, claim); err != nil {
logger.Error(err, "unreserve", "resourceclaim", klog.KObj(claim))
continue
if deleted := pl.draManager.ResourceClaims().MaybeRemoveClaimPendingAllocation(claimUID, forceRemovePendingAllocation); deleted {
// If we are currently asynchronously Binding Pods in a
// PodGroup, then the pendingAllocations set does not need
// to be updated. New PodGroup scheduling cycles will start
// with an empty set and not share pending allocations
// started in *this* cycle until Unreserve completes for all
// the Pods sharing that pending allocation and they can be
// Reserved again in another cycle.
if podGroupState != nil {
podGroupState.pendingAllocations.Delete(claim.UID)
}
}
if deleted := pl.draManager.ResourceClaims().MaybeRemoveClaimPendingAllocation(claimUID, pl.isSchedulingPodGroup(pod)); deleted {
logger.V(5).Info("Removed claim from in-flight claims", "claim", klog.KObj(claim), "uid", claimUID, "resourceVersion", resourceVersion, "allocation", klog.Format(allocation))
}
}
@ -1631,10 +1728,6 @@ func (pl *DynamicResources) isPodReadyForBinding(state *stateData) (bool, error)
return true, nil
}
func (pl *DynamicResources) isSchedulingPodGroup(pod *v1.Pod) bool {
return pl.fts.EnableGenericWorkload && pod.Spec.SchedulingGroup != nil
}
func (pl *DynamicResources) getPodGroup(pod *v1.Pod) (*schedulingapi.PodGroup, error) {
if !pl.fts.EnableDRAWorkloadResourceClaims ||
pod.Spec.SchedulingGroup == nil || pod.Spec.SchedulingGroup.PodGroupName == nil {

View file

@ -3884,10 +3884,9 @@ func (tc *testContext) listAssumedClaims() ([]metav1.Object, []metav1.Object) {
func (tc *testContext) listInFlightClaims() []metav1.Object {
var inFlightClaims []metav1.Object
tc.draManager.resourceClaimTracker.inFlightAllocations.Range(func(key, value any) bool {
inFlightClaims = append(inFlightClaims, value.(*resourceapi.ResourceClaim))
return true
})
for _, inFlight := range tc.draManager.resourceClaimTracker.allInFlightAllocationsRLocked() {
inFlightClaims = append(inFlightClaims, inFlight.claim)
}
sortObjects(inFlightClaims)
return inFlightClaims
}
@ -4968,7 +4967,7 @@ func testGatherAllocatedState(tCtx ktesting.TContext) {
logger := klog.FromContext(tCtx)
draManager := &DefaultDRAManager{
resourceClaimTracker: &claimTracker{
inFlightAllocations: &sync.Map{},
inFlightAllocations: make(map[types.UID]inFlightAllocation),
allocatedDevices: newAllocatedDevices(logger),
},
}

View file

@ -75,30 +75,18 @@ func (m *mockDRAManager) AssumeClaimAfterAPICall(claim *resourceapi.ResourceClai
func (m *mockDRAManager) AssumedClaimRestore(namespace, name string) {}
func (m *mockDRAManager) ClaimHasPendingAllocation(uid types.UID) bool {
return false
func (m *mockDRAManager) GetPendingAllocation(uid types.UID) *resourceapi.AllocationResult {
return nil
}
func (m *mockDRAManager) SignalClaimPendingAllocation(uid types.UID, claim *resourceapi.ResourceClaim) error {
return nil
}
func (m *mockDRAManager) GetPendingAllocation(claimUID types.UID) (*resourceapi.AllocationResult, bool) {
return nil, false
}
func (m *mockDRAManager) MaybeRemoveClaimPendingAllocation(claimUID types.UID, shareable bool) (deleted bool) {
func (m *mockDRAManager) MaybeRemoveClaimPendingAllocation(_ types.UID, _ bool) (deleted bool) {
return false
}
func (m *mockDRAManager) AddSharedClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error {
return nil
}
func (m *mockDRAManager) RemoveSharedClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error {
return nil
}
func (m *mockDRAManager) List() ([]*resourceapi.ResourceClaim, error) {
return nil, nil
}

View file

@ -317,7 +317,9 @@ func TestGangSchedulingFlow(t *testing.T) {
fakeActivator.activatedPods = nil
cycleState := schedulerframework.NewCycleState()
cycleState.SetPodGroupSchedulingCycle(tt.isDuringPodGroupSchedulingCycle)
if tt.isDuringPodGroupSchedulingCycle {
cycleState.SetPodGroupSchedulingCycle(cycleState)
}
pod := tt.pod.DeepCopy()
pod.Spec.NodeName = "some-node"

View file

@ -76,7 +76,7 @@ func (sched *Scheduler) scheduleOnePodGroup(ctx context.Context, podGroupInfo *f
logger.V(3).Info("Attempting to schedule pod group", "podGroup", klog.KObj(podGroupInfo))
sched.podGroupCycle(ctx, schedFwk, podGroupInfo)
sched.podGroupCycle(ctx, schedFwk, framework.NewCycleState(), podGroupInfo)
}
// frameworkForPodGroup obtains the concrete scheduler framework for the entire pod group.
@ -182,7 +182,7 @@ type podSchedulingContext struct {
}
// initPodSchedulingContext initializes the scheduling context of a single pod for pod group scheduling cycle.
func initPodSchedulingContext(ctx context.Context, pod *v1.Pod, postFilterMode podGroupPostFilterMode) *podSchedulingContext {
func initPodSchedulingContext(ctx context.Context, pod *v1.Pod, podGroupState *framework.CycleState, postFilterMode podGroupPostFilterMode) *podSchedulingContext {
logger := klog.FromContext(ctx)
// TODO(knelasevero): Remove duplicated keys from log entry calls
// When contextualized logging hits GA
@ -201,7 +201,7 @@ func initPodSchedulingContext(ctx context.Context, pod *v1.Pod, postFilterMode p
state.Write(framework.PodsToActivateKey, podsToActivate)
// Marks this cycle as a pod group scheduling cycle.
state.SetPodGroupSchedulingCycle(true)
state.SetPodGroupSchedulingCycle(podGroupState)
// Skip post filters if requested.
switch postFilterMode {
@ -219,7 +219,7 @@ func initPodSchedulingContext(ctx context.Context, pod *v1.Pod, postFilterMode p
}
// podGroupCycle runs a pod group scheduling cycle for the given pod group in a single cluster snapshot.
func (sched *Scheduler) podGroupCycle(ctx context.Context, schedFwk framework.Framework, podGroupInfo *framework.QueuedPodGroupInfo) {
func (sched *Scheduler) podGroupCycle(ctx context.Context, schedFwk framework.Framework, podGroupCycleState *framework.CycleState, podGroupInfo *framework.QueuedPodGroupInfo) {
// Synchronously attempt to find a fit for the pod group.
start := time.Now()
@ -229,17 +229,17 @@ func (sched *Scheduler) podGroupCycle(ctx context.Context, schedFwk framework.Fr
result := podGroupAlgorithmResult{
status: fwk.AsStatus(err),
}
sched.submitPodGroupAlgorithmResult(ctx, schedFwk, podGroupInfo, result, start)
sched.submitPodGroupAlgorithmResult(ctx, schedFwk, podGroupCycleState, podGroupInfo, result, start)
return
}
result := sched.podGroupSchedulingAlgorithm(ctx, schedFwk, podGroupInfo, runAllPostFilters)
result := sched.podGroupSchedulingAlgorithm(ctx, schedFwk, podGroupCycleState, podGroupInfo, runAllPostFilters)
metrics.PodGroupSchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
// Run workload aware preemption if required. If the preemption is successful,
// we need to put the pods from pod group back into the scheduling queue.
if sched.workloadAwarePreemptionEnabled && result.status.Code() == fwk.Unschedulable {
status := sched.runWorkloadAwarePreemption(ctx, schedFwk, podGroupInfo)
status := sched.runWorkloadAwarePreemption(ctx, schedFwk, podGroupCycleState, podGroupInfo)
if status.IsSuccess() {
result.waitingOnPreemption = true
} else if status.IsError() {
@ -249,7 +249,7 @@ func (sched *Scheduler) podGroupCycle(ctx context.Context, schedFwk framework.Fr
}
}
sched.submitPodGroupAlgorithmResult(ctx, schedFwk, podGroupInfo, result, start)
sched.submitPodGroupAlgorithmResult(ctx, schedFwk, podGroupCycleState, podGroupInfo, result, start)
}
// runWorkloadAwarePreemption runs workload-aware preemption for the given pod group.
@ -259,7 +259,7 @@ func (sched *Scheduler) podGroupCycle(ctx context.Context, schedFwk framework.Fr
// original state.
// The function used for evaluating feasibility of pod group scheduling is
// scheduler.podGroupSchedulingAlgorithm run without any post filters.
func (sched *Scheduler) runWorkloadAwarePreemption(ctx context.Context, schedFwk framework.Framework, podGroupInfo *framework.QueuedPodGroupInfo) *fwk.Status {
func (sched *Scheduler) runWorkloadAwarePreemption(ctx context.Context, schedFwk framework.Framework, podGroupCycleState *framework.CycleState, podGroupInfo *framework.QueuedPodGroupInfo) *fwk.Status {
// Default preemption should be the only pod group post filter registered plugin.
plugins := schedFwk.PodGroupPostFilterPlugins()
if len(plugins) == 0 {
@ -281,7 +281,7 @@ func (sched *Scheduler) runWorkloadAwarePreemption(ctx context.Context, schedFwk
defer restoreFn()
return plugins[0].PodGroupPostFilter(ctx, pg, podGroupInfo.UnscheduledPods, func(ctx context.Context) *fwk.Status {
res := sched.podGroupSchedulingAlgorithm(ctx, schedFwk, podGroupInfo, runWithoutPostFilters)
res := sched.podGroupSchedulingAlgorithm(ctx, schedFwk, podGroupCycleState, podGroupInfo, runWithoutPostFilters)
return res.status
})
}
@ -353,7 +353,7 @@ type podGroupAlgorithmResult struct {
// It tries to schedule each pod using standard filtering and scoring logic in a fixed order.
// If a pod requires preemption to be schedulable, subsequent pods in the algorithm
// treat that pod as already scheduled on that node with victims being already removed in memory.
func (sched *Scheduler) podGroupSchedulingDefaultAlgorithm(ctx context.Context, schedFwk framework.Framework, podGroupInfo *framework.QueuedPodGroupInfo, postFilterMode podGroupPostFilterMode) podGroupAlgorithmResult {
func (sched *Scheduler) podGroupSchedulingDefaultAlgorithm(ctx context.Context, schedFwk framework.Framework, podGroupCycleState *framework.CycleState, podGroupInfo *framework.QueuedPodGroupInfo, postFilterMode podGroupPostFilterMode) podGroupAlgorithmResult {
result := podGroupAlgorithmResult{
podResults: make([]algorithmResult, 0, len(podGroupInfo.QueuedPodInfos)),
status: fwk.NewStatus(fwk.Unschedulable).WithError(errPodGroupUnschedulable),
@ -365,7 +365,7 @@ func (sched *Scheduler) podGroupSchedulingDefaultAlgorithm(ctx context.Context,
requiresPreemption := false
for _, podInfo := range podGroupInfo.QueuedPodInfos {
podResult, revertFn := sched.podGroupPodSchedulingAlgorithm(ctx, schedFwk, podGroupInfo, podInfo, postFilterMode)
podResult, revertFn := sched.podGroupPodSchedulingAlgorithm(ctx, schedFwk, podGroupCycleState, podGroupInfo, podInfo, postFilterMode)
result.podResults = append(result.podResults, podResult)
if !podResult.status.IsSuccess() && !podResult.requiresPreemption {
// When a pod is not feasible and doesn't require preemption, it means that it failed scheduling.
@ -403,9 +403,9 @@ func (sched *Scheduler) podGroupSchedulingDefaultAlgorithm(ctx context.Context,
// podGroupPodSchedulingAlgorithm runs a scheduling algorithm for individual pod from a pod group.
// It returns the algorithm result and, if successful or the preemption is required, the permit status together with the revert function.
func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, schedFwk framework.Framework, podGroupInfo *framework.QueuedPodGroupInfo, podInfo *framework.QueuedPodInfo, postFilterMode podGroupPostFilterMode) (algorithmResult, func()) {
func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, schedFwk framework.Framework, podGroupCycleState *framework.CycleState, podGroupInfo *framework.QueuedPodGroupInfo, podInfo *framework.QueuedPodInfo, postFilterMode podGroupPostFilterMode) (algorithmResult, func()) {
pod := podInfo.Pod
podCtx := initPodSchedulingContext(ctx, pod, postFilterMode)
podCtx := initPodSchedulingContext(ctx, pod, podGroupCycleState, postFilterMode)
logger := podCtx.logger
ctx = klog.NewContext(ctx, logger)
start := time.Now()
@ -492,7 +492,7 @@ func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, sche
// for the next pod group scheduling cycle.
// If the preemption is required for this pod group, all pods are moved back to the scheduling queue
// and require the next pod group scheduling cycle to verify the preemption outcome.
func (sched *Scheduler) submitPodGroupAlgorithmResult(ctx context.Context, schedFwk framework.Framework, podGroupInfo *framework.QueuedPodGroupInfo, podGroupResult podGroupAlgorithmResult, start time.Time) {
func (sched *Scheduler) submitPodGroupAlgorithmResult(ctx context.Context, schedFwk framework.Framework, podGroupState *framework.CycleState, podGroupInfo *framework.QueuedPodGroupInfo, podGroupResult podGroupAlgorithmResult, start time.Time) {
logger := klog.FromContext(ctx)
var scheduledPods, unschedulablePods int
@ -504,7 +504,7 @@ func (sched *Scheduler) submitPodGroupAlgorithmResult(ctx context.Context, sched
// In pod group-level unschedulable or error cases, podResult may not be defined.
// Initialize it now to handle pod failure correctly.
podResult = algorithmResult{
podCtx: initPodSchedulingContext(ctx, pInfo.Pod, runAllPostFilters),
podCtx: initPodSchedulingContext(ctx, pInfo.Pod, podGroupState, runAllPostFilters),
status: podGroupResult.status.Clone(),
}
}
@ -531,7 +531,7 @@ func (sched *Scheduler) submitPodGroupAlgorithmResult(ctx context.Context, sched
switch {
case podGroupResult.status.IsSuccess():
// Disable pod group scheduling in cycle state before binding.
podCtx.state.SetPodGroupSchedulingCycle(false)
podCtx.state.SetPodGroupSchedulingCycle(nil)
// Schedule result is applied for pod and its binding cycle executes.
assumedPodInfo, status := sched.prepareForBindingCycle(ctx, podCtx.state, schedFwk, pInfo, podCtx.podsToActivate, podResult.scheduleResult)
if !status.IsSuccess() {
@ -648,7 +648,7 @@ func (sched *Scheduler) updatePodGroupCondition(ctx context.Context,
// Placement is a set of nodes that will be considered when scheduling a pod group.
// Then for each placement it tries to schedule the pod group through podGroupSchedulingDefaultAlgorithm.
// Finally, it runs placement scorer plugins to select the best placement.
func (sched *Scheduler) podGroupSchedulingPlacementAlgorithm(ctx context.Context, schedFwk framework.Framework, podGroupInfo *framework.QueuedPodGroupInfo, postFilterMode podGroupPostFilterMode) podGroupAlgorithmResult {
func (sched *Scheduler) podGroupSchedulingPlacementAlgorithm(ctx context.Context, schedFwk framework.Framework, podGroupCycleState *framework.CycleState, podGroupInfo *framework.QueuedPodGroupInfo, postFilterMode podGroupPostFilterMode) podGroupAlgorithmResult {
logger := klog.FromContext(ctx)
allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List()
if err != nil {
@ -657,7 +657,6 @@ func (sched *Scheduler) podGroupSchedulingPlacementAlgorithm(ctx context.Context
}
}
podGroupCycleState := framework.NewCycleState()
// For now, always record plugin metrics until we understand its impact on performance.
podGroupCycleState.SetRecordPluginMetrics(true)
placements, status := schedFwk.RunPlacementGeneratePlugins(ctx, podGroupCycleState, podGroupInfo.PodGroupInfo, allNodes)
@ -678,7 +677,7 @@ func (sched *Scheduler) podGroupSchedulingPlacementAlgorithm(ctx context.Context
status: fwk.AsStatus(fmt.Errorf("failed to assume pod group placement: %w", err)),
}
}
result := sched.podGroupSchedulingDefaultAlgorithm(ctx, schedFwk, podGroupInfo, postFilterMode)
result := sched.podGroupSchedulingDefaultAlgorithm(ctx, schedFwk, podGroupCycleState, podGroupInfo, postFilterMode)
sched.nodeInfoSnapshot.ForgetPlacement()
if result.status.IsError() {
return result
@ -768,13 +767,13 @@ func makePodGroupAssignments(successfulResults map[*fwk.Placement]*podGroupAlgor
}
// podGroupSchedulingAlgorithm attempts to schedule pods in the pod group according to the policy and constraints and returns the scheduling result for each pod in the pod group.
func (sched *Scheduler) podGroupSchedulingAlgorithm(ctx context.Context, schedFwk framework.Framework, podGroupInfo *framework.QueuedPodGroupInfo, postFilterMode podGroupPostFilterMode) podGroupAlgorithmResult {
func (sched *Scheduler) podGroupSchedulingAlgorithm(ctx context.Context, schedFwk framework.Framework, podGroupCycleState *framework.CycleState, podGroupInfo *framework.QueuedPodGroupInfo, postFilterMode podGroupPostFilterMode) podGroupAlgorithmResult {
podGroupCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareWorkloadScheduling) {
return sched.podGroupSchedulingPlacementAlgorithm(podGroupCycleCtx, schedFwk, podGroupInfo, postFilterMode)
return sched.podGroupSchedulingPlacementAlgorithm(podGroupCycleCtx, schedFwk, podGroupCycleState, podGroupInfo, postFilterMode)
} else {
return sched.podGroupSchedulingDefaultAlgorithm(podGroupCycleCtx, schedFwk, podGroupInfo, postFilterMode)
return sched.podGroupSchedulingDefaultAlgorithm(podGroupCycleCtx, schedFwk, podGroupCycleState, podGroupInfo, postFilterMode)
}
}

View file

@ -417,7 +417,7 @@ func TestPodGroupCycle_UpdateSnapshotError(t *testing.T) {
},
}
sched.podGroupCycle(ctx, schedFwk, podGroupInfo)
sched.podGroupCycle(ctx, schedFwk, framework.NewCycleState(), podGroupInfo)
if !failureHandlerCalled {
t.Errorf("Expected FailureHandler to be called after UpdateSnapshot failed")
@ -559,7 +559,7 @@ func TestPodGroupCycle_PodGroupPostFilter(t *testing.T) {
}
sched.SchedulePod = sched.schedulePod
sched.podGroupCycle(ctx, schedFwk, podGroupInfo)
sched.podGroupCycle(ctx, schedFwk, framework.NewCycleState(), podGroupInfo)
if fakePlugin.podGroupPostFilterCalled != tt.expectedPodGroupPostFilterCalled {
t.Errorf("Expected workload aware preemption (PodGroupPostFilter) to be %v, but got %v", tt.expectedPodGroupPostFilterCalled, fakePlugin.podGroupPostFilterCalled)
@ -1020,7 +1020,7 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) {
t.Fatalf("Failed to update snapshot: %v", err)
}
result := sched.podGroupSchedulingAlgorithm(ctx, schedFwk, podGroupInfo, runAllPostFilters)
result := sched.podGroupSchedulingAlgorithm(ctx, schedFwk, framework.NewCycleState(), podGroupInfo, runAllPostFilters)
if result.status.Code() != tt.expectedGroupStatusCode {
t.Errorf("Expected group status code: %v, got: %v", tt.expectedGroupStatusCode, result.status.Code())
@ -1636,13 +1636,15 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) {
},
}
podGroupCycleState := framework.NewCycleState()
for i := range tt.algorithmResult.podResults {
pod := podGroupInfo.QueuedPodInfos[i].Pod
podCtx := initPodSchedulingContext(ctx, pod, runAllPostFilters)
podCtx := initPodSchedulingContext(ctx, pod, podGroupCycleState, runAllPostFilters)
tt.algorithmResult.podResults[i].podCtx = podCtx
}
sched.submitPodGroupAlgorithmResult(ctx, schedFwk, podGroupInfo, tt.algorithmResult, time.Now())
sched.submitPodGroupAlgorithmResult(ctx, schedFwk, podGroupCycleState, podGroupInfo, tt.algorithmResult, time.Now())
if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
lock.Lock()
@ -2340,7 +2342,7 @@ func TestPodGroupSchedulingPlacementAlgorithm(t *testing.T) {
},
}
result := sched.podGroupSchedulingPlacementAlgorithm(ctx, schedFwk, pgInfo, runAllPostFilters)
result := sched.podGroupSchedulingPlacementAlgorithm(ctx, schedFwk, framework.NewCycleState(), pgInfo, runAllPostFilters)
opts := cmp.Options{
cmp.AllowUnexported(
@ -2502,7 +2504,7 @@ func TestPodGroupSchedulingPlacementAlgorithm_Scoring(t *testing.T) {
},
}
result := sched.podGroupSchedulingPlacementAlgorithm(ctx, schedFwk, pgInfo, runAllPostFilters)
result := sched.podGroupSchedulingPlacementAlgorithm(ctx, schedFwk, framework.NewCycleState(), pgInfo, runAllPostFilters)
expectedHost := placements[tt.expectedPlacement][0]
actualHost := result.podResults[0].scheduleResult.SuggestedHost
@ -2697,7 +2699,7 @@ func TestRunWorkloadAwarePreemption(t *testing.T) {
// Just inject logger explicitly in context to avoid panic
ctx = klog.NewContext(ctx, logger)
status := sched.runWorkloadAwarePreemption(ctx, schedFwk, tt.podGroupInfo)
status := sched.runWorkloadAwarePreemption(ctx, schedFwk, framework.NewCycleState(), tt.podGroupInfo)
if tt.expectedStatus.Code() != status.Code() || tt.expectedStatus.Message() != status.Message() {
t.Errorf("Unexpected status, want code %v message %q, got code %v message %q",

View file

@ -98,9 +98,12 @@ type CycleState interface {
// or doesn't belong to any pod group.
// This field can only be set to true when GenericWorkload feature flag is enabled.
IsPodGroupSchedulingCycle() bool
// SetPodGroupSchedulingCycle sets whether this cycle is a pod group scheduling cycle or not.
// GetPodGroupSchedulingCycle gets the cycle state of the PodGroup for a Pod.
// This should be only used when GenericWorkload feature flag is enabled.
SetPodGroupSchedulingCycle(bool)
GetPodGroupSchedulingCycle() PodGroupCycleState
// SetPodGroupSchedulingCycle sets the cycle state of the PodGroup for a Pod.
// This should be only used when GenericWorkload feature flag is enabled.
SetPodGroupSchedulingCycle(PodGroupCycleState)
}
// PodGroupCycleState provides a mechanism for plugins that operate on pod groups to store and retrieve arbitrary data.

View file

@ -105,25 +105,20 @@ type ResourceClaimTracker interface {
GatherAllocatedState() (*structured.AllocatedState, error)
// SignalClaimPendingAllocation signals to the tracker that the given ResourceClaim will be allocated via an API call in the
// binding phase. This change is immediately reflected in the result of List() and the other accessors.
// binding phase, therefore the given ResourceClaim must be non-nil and have a non-nil Status.Allocation.
// If the claim already has a pending allocation, then the allocation becomes shared. The same number of SignalClaimPendingAllocation() callers
// for a given claimUID is expected to eventually call MaybeRemoveClaimPendingAllocation() for that claimUID.
// This change is immediately reflected in the result of List() and the other accessors.
SignalClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error
// GetPendingAllocation answers whether a given claim has a pending allocation during the binding phase. It can be used to avoid
// ClaimHasPendingAllocation answers whether a given claim has a pending allocation during the binding phase. It can be used to avoid
// race conditions in subsequent scheduling phases.
GetPendingAllocation(claimUID types.UID) (*resourceapi.AllocationResult, bool)
GetPendingAllocation(claimUID types.UID) *resourceapi.AllocationResult
// MaybeRemoveClaimPendingAllocation might remove the pending allocation for the given ResourceClaim from the tracker if any was signaled via
// SignalClaimPendingAllocation(). When a pending allocation is `shareable`, it removes the pending allocation only when
// no other pods are still using that pending allocation (per AddSharedClaimPendingAllocation and
// RemoveSharedClaimPendingAllocation). When a pending allocation is not shareable, it always removes the pending
// allocation as long as one exists. Returns whether there was a pending allocation and it was removed.
// List() and the other accessors immediately stop reflecting the pending allocation in the results.
MaybeRemoveClaimPendingAllocation(claimUID types.UID, shareable bool) (deleted bool)
// AddSharedClaimPendingAllocation increments the number of active sharers
// of the given ResourceClaim.
AddSharedClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error
// RemoveSharedClaimPendingAllocation decrements the number of active sharers
// of the given ResourceClaim.
RemoveSharedClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error
// SignalClaimPendingAllocation(). When `forceRemove` is true, it always removes the pending allocation. Otherwise, it removes the pending
// allocation only when no other pods are still using that pending allocation (from SignalClaimPendingAllocation and AcquirePendingAllocation).
// Returns whether there was a pending allocation and it was removed.
// List() and the other accessors immediately stop reflecting the pending allocation in the results when the pending allocation is removed.
MaybeRemoveClaimPendingAllocation(claimUID types.UID, forceRemove bool) (deleted bool)
// AssumeClaimAfterAPICall signals to the tracker that an API call modifying the given ResourceClaim was made in the binding phase, and the
// changes should be reflected in informers very soon. This change is immediately reflected in the result of List() and the other accessors.