From 61cf993c6b0d6fd8f90a689f9674c5fb62c8a089 Mon Sep 17 00:00:00 2001 From: Jon Huhn Date: Thu, 19 Mar 2026 08:42:04 -0500 Subject: [PATCH] scheduler: fix race in DRA pending allocation sharing --- .../lister_contract_test.go | 16 +- pkg/scheduler/framework/cycle_state.go | 20 +- .../plugins/dynamicresources/dra_manager.go | 159 ++++++------ .../dynamicresources/dra_manager_test.go | 242 ++++++++---------- .../dynamicresources/dynamicresources.go | 213 ++++++++++----- .../dynamicresources/dynamicresources_test.go | 9 +- .../nodeallocatabledynamicresources_test.go | 18 +- .../gangscheduling/gangscheduling_test.go | 4 +- pkg/scheduler/schedule_one_podgroup.go | 45 ++-- pkg/scheduler/schedule_one_podgroup_test.go | 18 +- .../kube-scheduler/framework/cycle_state.go | 7 +- .../kube-scheduler/framework/listers.go | 27 +- 12 files changed, 406 insertions(+), 372 deletions(-) diff --git a/pkg/scheduler/framework/autoscaler_contract/lister_contract_test.go b/pkg/scheduler/framework/autoscaler_contract/lister_contract_test.go index ed53485c95e..bcb522c9a23 100644 --- a/pkg/scheduler/framework/autoscaler_contract/lister_contract_test.go +++ b/pkg/scheduler/framework/autoscaler_contract/lister_contract_test.go @@ -148,26 +148,18 @@ func (r *resourceClaimTrackerContract) GatherAllocatedState() (*schedulerapi.All return nil, nil } +func (r *resourceClaimTrackerContract) GetPendingAllocation(_ types.UID) *resourceapi.AllocationResult { + return nil +} + func (r *resourceClaimTrackerContract) SignalClaimPendingAllocation(_ types.UID, _ *resourceapi.ResourceClaim) error { return nil } -func (r *resourceClaimTrackerContract) GetPendingAllocation(_ types.UID) (*resourceapi.AllocationResult, bool) { - return nil, false -} - func (r *resourceClaimTrackerContract) MaybeRemoveClaimPendingAllocation(_ types.UID, _ bool) (deleted bool) { return false } -func (r *resourceClaimTrackerContract) AddSharedClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error { - return nil -} - -func (r *resourceClaimTrackerContract) RemoveSharedClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error { - return nil -} - func (r *resourceClaimTrackerContract) AssumeClaimAfterAPICall(_ *resourceapi.ResourceClaim) error { return nil } diff --git a/pkg/scheduler/framework/cycle_state.go b/pkg/scheduler/framework/cycle_state.go index 2676c263956..31b78725356 100644 --- a/pkg/scheduler/framework/cycle_state.go +++ b/pkg/scheduler/framework/cycle_state.go @@ -41,11 +41,11 @@ type CycleState struct { // GetParallelPreBindPlugins returns plugins that can be run in parallel with other plugins // in the PreBind extension point. parallelPreBindPlugins sets.Set[string] - // isPodGroupSchedulingCycle indicates whether this cycle is a pod group scheduling cycle or not. - // If set to false, it means that the pod referencing this CycleState either passed the pod group cycle + // podGroupCycleState contains the CycleState for this pod's PodGroup. + // If set to nil, it means that the pod referencing this CycleState either passed the pod group cycle // or doesn't belong to any pod group. - // This field can only be set to true when GenericWorkload feature flag is enabled. - isPodGroupSchedulingCycle bool + // This field can only be non-nil when GenericWorkload feature flag is enabled. + podGroupCycleState fwk.PodGroupCycleState } // NewCycleState initializes a new CycleState and returns its pointer. @@ -102,11 +102,15 @@ func (c *CycleState) GetParallelPreBindPlugins() sets.Set[string] { } func (c *CycleState) IsPodGroupSchedulingCycle() bool { - return c.isPodGroupSchedulingCycle + return c.podGroupCycleState != nil } -func (c *CycleState) SetPodGroupSchedulingCycle(isPodGroupSchedulingCycle bool) { - c.isPodGroupSchedulingCycle = isPodGroupSchedulingCycle +func (c *CycleState) SetPodGroupSchedulingCycle(podGroupCycleState fwk.PodGroupCycleState) { + c.podGroupCycleState = podGroupCycleState +} + +func (c *CycleState) GetPodGroupSchedulingCycle() fwk.PodGroupCycleState { + return c.podGroupCycleState } func (c *CycleState) SetSkipAllPostFilterPlugins(flag bool) { @@ -135,7 +139,7 @@ func (c *CycleState) Clone() fwk.CycleState { copy.skipScorePlugins = c.skipScorePlugins copy.skipPreBindPlugins = c.skipPreBindPlugins copy.parallelPreBindPlugins = c.parallelPreBindPlugins - copy.isPodGroupSchedulingCycle = c.isPodGroupSchedulingCycle + copy.podGroupCycleState = c.podGroupCycleState copy.skipAllPostFilterPlugins = c.skipAllPostFilterPlugins return copy diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go b/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go index 4687e55c4c5..a1703319f6c 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "iter" "slices" "sync" @@ -63,7 +64,7 @@ func NewDRAManager(ctx context.Context, claimsCache *assumecache.AssumeCache, re manager := &DefaultDRAManager{ resourceClaimTracker: &claimTracker{ cache: claimsCache, - inFlightAllocations: &sync.Map{}, + inFlightAllocations: make(map[types.UID]inFlightAllocation), allocatedDevices: newAllocatedDevices(logger), logger: logger, }, @@ -74,9 +75,6 @@ func NewDRAManager(ctx context.Context, claimsCache *assumecache.AssumeCache, re if utilfeature.DefaultFeatureGate.Enabled(features.DRAExtendedResource) { manager.extendedResourceCache = extendedresourcecache.NewExtendedResourceCache(logger) } - if utilfeature.DefaultFeatureGate.Enabled(features.GenericWorkload) { - manager.resourceClaimTracker.inFlightAllocationSharers = &sync.Map{} - } pgLister := &podGroupLister{} if utilfeature.DefaultFeatureGate.Enabled(features.DRAWorkloadResourceClaims) { @@ -178,13 +176,16 @@ type claimTracker struct { // - would make integration with cluster autoscaler harder because it would need // to trigger informer callbacks. cache *assumecache.AssumeCache + // inFlightMutex syncs access to inFlightAllocations. + inFlightMutex sync.RWMutex // inFlightAllocations is a map from claim UUIDs to claim objects for those claims // for which allocation was triggered during a scheduling cycle and the // corresponding claim status update call in PreBind has not been done - // yet. If another pod needs the claim, the pod is treated as "not - // schedulable yet" unless the pod is a member of a PodGroup. For ungrouped - // pods, the cluster event for the claim status update will make it - // schedulable. + // yet. It also includes a reference count tracking how many actively + // scheduling Pods in a PodGroup are using that pending allocation. If + // another pod outside the PodGroup needs the claim, the pod is treated as + // "not schedulable yet". For those pods, the cluster event for the + // claim status update will make them schedulable. // // This mechanism avoids the following problem: // - Pod A triggers allocation for claim X. @@ -215,25 +216,45 @@ type claimTracker struct { // pods is expected to be rare compared to per-pod claim, so we end up // hitting the "multiple goroutines read, write, and overwrite entries // for disjoint sets of keys" case that sync.Map is optimized for. - inFlightAllocations *sync.Map - // inFlightAllocationSharers counts the actively scheduling pods - // sharing a given ResourceClaim. - inFlightAllocationSharers *sync.Map - allocatedDevices *allocatedDevices - logger klog.Logger + inFlightAllocations map[types.UID]inFlightAllocation + allocatedDevices *allocatedDevices + logger klog.Logger } -func (c *claimTracker) GetPendingAllocation(claimUID types.UID) (*resourceapi.AllocationResult, bool) { - var allocation *resourceapi.AllocationResult - claim, found := c.inFlightAllocations.Load(claimUID) - if found && claim != nil { - allocation = claim.(*resourceapi.ResourceClaim).Status.Allocation +type inFlightAllocation struct { + claim *resourceapi.ResourceClaim + sharers int +} + +func (c *claimTracker) GetPendingAllocation(claimUID types.UID) *resourceapi.AllocationResult { + c.inFlightMutex.RLock() + defer c.inFlightMutex.RUnlock() + + inFlight, found := c.inFlightAllocations[claimUID] + if !found || inFlight.claim == nil { + return nil } - return allocation, found + return inFlight.claim.Status.Allocation } func (c *claimTracker) SignalClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error { - c.inFlightAllocations.Store(claimUID, allocatedClaim) + c.inFlightMutex.Lock() + defer c.inFlightMutex.Unlock() + + inFlight, found := c.inFlightAllocations[claimUID] + if found { + inFlight.sharers++ + c.inFlightAllocations[claimUID] = inFlight + + claim := inFlight.claim + c.logger.V(5).Info("Added share for in-flight claim", "claim", klog.KObj(claim), "uid", claimUID, "version", claim.ResourceVersion, "sharers", inFlight.sharers) + return nil + } + + c.inFlightAllocations[claimUID] = inFlightAllocation{ + claim: allocatedClaim, + sharers: 1, + } // This is the same verbosity as the corresponding log in the assume cache. c.logger.V(5).Info("Added in-flight claim", "claim", klog.KObj(allocatedClaim), "uid", claimUID, "version", allocatedClaim.ResourceVersion) // There's no reason to return an error in this implementation, but the error is helpful for other implementations. @@ -242,67 +263,28 @@ func (c *claimTracker) SignalClaimPendingAllocation(claimUID types.UID, allocate return nil } -func (c *claimTracker) MaybeRemoveClaimPendingAllocation(claimUID types.UID, shareable bool) (deleted bool) { - if c.inFlightAllocationSharers != nil && shareable { - value, ok := c.inFlightAllocationSharers.Load(claimUID) - if ok && value.(int) > 0 { - if loggerV := c.logger.V(5); loggerV.Enabled() { - claim, found := c.inFlightAllocations.Load(claimUID) - if found { - claim := claim.(*resourceapi.ResourceClaim) - c.logger.V(5).Info("Claim is still shared by other pods, not removing in-flight claim", "claim", klog.KObj(claim), "uid", claimUID, "version", claim.ResourceVersion) - } - } - return false - } - } +func (c *claimTracker) MaybeRemoveClaimPendingAllocation(claimUID types.UID, forceRemove bool) (deleted bool) { + c.inFlightMutex.Lock() + defer c.inFlightMutex.Unlock() - claim, found := c.inFlightAllocations.LoadAndDelete(claimUID) + inFlight, found := c.inFlightAllocations[claimUID] // The assume cache doesn't log this, but maybe it should. - if found { - claim := claim.(*resourceapi.ResourceClaim) - c.logger.V(5).Info("Removed in-flight claim", "claim", klog.KObj(claim), "uid", claimUID, "version", claim.ResourceVersion) - } else { + if !found { c.logger.V(5).Info("Redundant remove of in-flight claim, not found", "uid", claimUID) + return false } - return found -} + claim := inFlight.claim -func (c *claimTracker) AddSharedClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error { - newSharers := 1 - value, loaded := c.inFlightAllocationSharers.LoadOrStore(claimUID, newSharers) - if loaded { - oldSharers := value.(int) - newSharers = oldSharers + 1 - swapped := c.inFlightAllocationSharers.CompareAndSwap(claimUID, oldSharers, newSharers) - if !swapped { - // The value must have changed since we loaded - return fmt.Errorf("conflict adding in-flight allocation sharer for claim %s/%s, UID=%s", allocatedClaim.Namespace, allocatedClaim.Name, claimUID) - } + if forceRemove || inFlight.sharers == 1 { + delete(c.inFlightAllocations, claimUID) + c.logger.V(5).Info("Removed in-flight claim", "claim", klog.KObj(claim), "uid", claimUID, "version", claim.ResourceVersion) + return true } - c.logger.V(5).Info("Added share for in-flight claim", "claim", klog.KObj(allocatedClaim), "uid", claimUID, "version", allocatedClaim.ResourceVersion, "sharers", newSharers) - return nil -} -func (c *claimTracker) RemoveSharedClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error { - value, ok := c.inFlightAllocationSharers.Load(claimUID) - if !ok { - return nil - } - oldSharers := value.(int) - newSharers := oldSharers - 1 - var written bool - if newSharers == 0 { - written = c.inFlightAllocationSharers.CompareAndDelete(claimUID, oldSharers) - } else { - written = c.inFlightAllocationSharers.CompareAndSwap(claimUID, oldSharers, newSharers) - } - if !written { - // The value must have changed since we loaded - return fmt.Errorf("conflict removing in-flight allocation sharer for claim %s/%s, UID=%s", allocatedClaim.Namespace, allocatedClaim.Name, claimUID) - } - c.logger.V(5).Info("Removed share for in-flight claim", "claim", klog.KObj(allocatedClaim), "uid", claimUID, "version", allocatedClaim.ResourceVersion, "sharers", newSharers) - return nil + inFlight.sharers-- + c.inFlightAllocations[claimUID] = inFlight + c.logger.V(5).Info("Claim is still shared by other pods, not removing in-flight claim", "claim", klog.KObj(claim), "uid", claimUID, "version", claim.ResourceVersion, "sharers", inFlight.sharers) + return false } func (c *claimTracker) Get(namespace, claimName string) (*resourceapi.ResourceClaim, error) { @@ -360,14 +342,13 @@ func (c *claimTracker) ListAllAllocatedDevices() (a sets.Set[structured.DeviceID allocated, revision := c.allocatedDevices.Get() // Whatever is in flight also has to be checked. - c.inFlightAllocations.Range(func(key, value any) bool { - claim := value.(*resourceapi.ResourceClaim) + for _, inFlight := range c.allInFlightAllocationsRLocked() { + claim := inFlight.claim foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) { c.logger.V(6).Info("Device is in flight for allocation", "device", deviceID, "claim", klog.KObj(claim)) allocated.Insert(deviceID) }, false, func(structured.SharedDeviceID) {}, func(structured.DeviceConsumedCapacity) {}) - return true - }) + } if revision == c.allocatedDevices.Revision() { // Our current result is valid, nothing changed in the meantime. @@ -433,8 +414,8 @@ func (c *claimTracker) GatherAllocatedState() (s *structured.AllocatedState, err } // Whatever is in flight also has to be checked. - c.inFlightAllocations.Range(func(key, value any) bool { - claim := value.(*resourceapi.ResourceClaim) + for _, inFlight := range c.allInFlightAllocationsRLocked() { + claim := inFlight.claim foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) { // dedicatedDeviceCallback c.logger.V(6).Info("Device is in flight for allocation", "device", deviceID, "claim", klog.KObj(claim)) @@ -449,9 +430,7 @@ func (c *claimTracker) GatherAllocatedState() (s *structured.AllocatedState, err c.logger.V(6).Info("Device is in flight for allocation", "consumed capacity", capacity, "claim", klog.KObj(claim)) aggregatedCapacity.Insert(capacity) }) - return true - }) - + } if revision1 == c.allocatedDevices.Revision() { // Our current result is valid, nothing changed in the meantime. return &structured.AllocatedState{ @@ -464,6 +443,18 @@ func (c *claimTracker) GatherAllocatedState() (s *structured.AllocatedState, err return nil, errClaimTrackerConcurrentModification } +func (c *claimTracker) allInFlightAllocationsRLocked() iter.Seq2[types.UID, inFlightAllocation] { + return func(yield func(types.UID, inFlightAllocation) bool) { + c.inFlightMutex.RLock() + defer c.inFlightMutex.RUnlock() + for uid, inFlight := range c.inFlightAllocations { + if !yield(uid, inFlight) { + return + } + } + } +} + func (c *claimTracker) AssumeClaimAfterAPICall(claim *resourceapi.ResourceClaim) error { return c.cache.Assume(claim) } diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dra_manager_test.go b/pkg/scheduler/framework/plugins/dynamicresources/dra_manager_test.go index b93bd5e4db0..8fc8abf325c 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dra_manager_test.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dra_manager_test.go @@ -17,7 +17,7 @@ limitations under the License. package dynamicresources import ( - "sync" + "maps" "testing" "github.com/onsi/gomega" @@ -31,33 +31,33 @@ const ( otherClaimUID = types.UID("claim-uid-2") ) -func TestAddSharedClaimPendingAllocation(t *testing.T) { - testAddSharedClaimPendingAllocation(ktesting.Init(t)) +func TestSignalClaimPendingAllocation(t *testing.T) { + testSignalClaimPendingAllocation(ktesting.Init(t)) } -func testAddSharedClaimPendingAllocation(tCtx ktesting.TContext) { +func testSignalClaimPendingAllocation(tCtx ktesting.TContext) { tests := map[string]struct { - inFlightAllocationSharers map[types.UID]int - claimUID types.UID - allocatedClaim *resourceapi.ResourceClaim - expectedInFlightAllocationSharers map[types.UID]int + inFlightAllocations map[types.UID]inFlightAllocation + claimUID types.UID + allocatedClaim *resourceapi.ResourceClaim + expectedInFlightAllocations map[types.UID]inFlightAllocation }{ "empty": { claimUID: claimUID, allocatedClaim: allocatedClaim, - expectedInFlightAllocationSharers: map[types.UID]int{ - claimUID: 1, + expectedInFlightAllocations: map[types.UID]inFlightAllocation{ + claimUID: {claim: allocatedClaim, sharers: 1}, }, }, - "increment": { - inFlightAllocationSharers: map[types.UID]int{ - claimUID: 1, - otherClaimUID: 10, + "already-exists": { + inFlightAllocations: map[types.UID]inFlightAllocation{ + claimUID: {claim: allocatedClaim, sharers: 1}, + otherClaimUID: {claim: allocatedClaim2, sharers: 10}, }, claimUID: claimUID, allocatedClaim: allocatedClaim, - expectedInFlightAllocationSharers: map[types.UID]int{ - claimUID: 2, - otherClaimUID: 10, + expectedInFlightAllocations: map[types.UID]inFlightAllocation{ + claimUID: {claim: allocatedClaim, sharers: 2}, + otherClaimUID: {claim: allocatedClaim2, sharers: 10}, }, }, } @@ -65,79 +65,61 @@ func testAddSharedClaimPendingAllocation(tCtx ktesting.TContext) { for name, test := range tests { tCtx.Run(name, func(tCtx ktesting.TContext) { c := &claimTracker{ - logger: tCtx.Logger(), - inFlightAllocationSharers: &sync.Map{}, + logger: tCtx.Logger(), + inFlightAllocations: make(map[types.UID]inFlightAllocation), } + maps.Copy(c.inFlightAllocations, test.inFlightAllocations) - for key, value := range test.inFlightAllocationSharers { - c.inFlightAllocationSharers.Store(key, value) - } - err := c.AddSharedClaimPendingAllocation(test.claimUID, test.allocatedClaim) + err := c.SignalClaimPendingAllocation(test.claimUID, test.allocatedClaim) tCtx.ExpectNoError(err) - actual := map[types.UID]int{} - c.inFlightAllocationSharers.Range(func(key, value any) bool { - actual[key.(types.UID)] = value.(int) - return true - }) - tCtx.Expect(actual).To(gomega.Equal(test.expectedInFlightAllocationSharers)) + tCtx.Expect(c.inFlightAllocations).To(gomega.Equal(test.expectedInFlightAllocations)) }) } } -func TestRemoveSharedClaimPendingAllocation(t *testing.T) { - testRemoveSharedClaimPendingAllocation(ktesting.Init(t)) +func TestGetPendingAllocation(t *testing.T) { + testGetPendingAllocation(ktesting.Init(t)) } -func testRemoveSharedClaimPendingAllocation(tCtx ktesting.TContext) { +func testGetPendingAllocation(tCtx ktesting.TContext) { tests := map[string]struct { - inFlightAllocationSharers map[types.UID]int - claimUID types.UID - allocatedClaim *resourceapi.ResourceClaim - expectedInFlightAllocationSharers map[types.UID]int - expectedErr string + inFlightAllocations map[types.UID]inFlightAllocation + claimUID types.UID + expected *resourceapi.AllocationResult }{ "empty": { - inFlightAllocationSharers: map[types.UID]int{ - claimUID: 1, - otherClaimUID: 10, - }, - claimUID: claimUID, - allocatedClaim: allocatedClaim, - expectedInFlightAllocationSharers: map[types.UID]int{ - otherClaimUID: 10, - }, + claimUID: claimUID, + expected: nil, }, - "decrement": { - inFlightAllocationSharers: map[types.UID]int{ - claimUID: 2, - otherClaimUID: 10, + "nil-claim": { + inFlightAllocations: map[types.UID]inFlightAllocation{ + claimUID: {claim: nil}, }, - claimUID: claimUID, - allocatedClaim: allocatedClaim, - expectedInFlightAllocationSharers: map[types.UID]int{ - claimUID: 1, - otherClaimUID: 10, + claimUID: claimUID, + expected: nil, + }, + "claim": { + inFlightAllocations: map[types.UID]inFlightAllocation{ + claimUID: {claim: allocatedClaim, sharers: 1}, + otherClaimUID: {claim: allocatedClaim2, sharers: 10}, }, + claimUID: claimUID, + expected: allocationResult, }, } for name, test := range tests { tCtx.Run(name, func(tCtx ktesting.TContext) { c := &claimTracker{ - logger: tCtx.Logger(), - inFlightAllocationSharers: &sync.Map{}, + logger: tCtx.Logger(), + inFlightAllocations: make(map[types.UID]inFlightAllocation), } + maps.Copy(c.inFlightAllocations, test.inFlightAllocations) + beforeInFlight := maps.Clone(c.inFlightAllocations) - for key, value := range test.inFlightAllocationSharers { - c.inFlightAllocationSharers.Store(key, value) - } - err := c.RemoveSharedClaimPendingAllocation(test.claimUID, test.allocatedClaim) - tCtx.ExpectNoError(err) - actual := map[types.UID]int{} - c.inFlightAllocationSharers.Range(func(key, value any) bool { - actual[key.(types.UID)] = value.(int) - return true - }) - tCtx.Expect(actual).To(gomega.Equal(test.expectedInFlightAllocationSharers)) + actual := c.GetPendingAllocation(test.claimUID) + tCtx.Expect(actual).To(gomega.Equal(test.expected)) + // Get is strictly read-only + tCtx.Expect(c.inFlightAllocations).To(gomega.Equal(beforeInFlight)) }) } } @@ -147,62 +129,63 @@ func TestMaybeRemoveClaimPendingAllocation(t *testing.T) { } func testMaybeRemoveClaimPendingAllocation(tCtx ktesting.TContext) { tests := map[string]struct { - inFlightAllocations map[types.UID]*resourceapi.ResourceClaim - inFlightAllocationSharers map[types.UID]int - claimUID types.UID - shareable bool - expected bool - expectedInFlightAllocationSharers map[types.UID]int - expectedInFlightAllocations map[types.UID]*resourceapi.ResourceClaim + inFlightAllocations map[types.UID]inFlightAllocation + claimUID types.UID + forceRemove bool + expected bool + expectedInFlightAllocations map[types.UID]inFlightAllocation }{ "empty": { - claimUID: claimUID, - shareable: true, - expected: false, - expectedInFlightAllocations: map[types.UID]*resourceapi.ResourceClaim{}, - expectedInFlightAllocationSharers: map[types.UID]int{}, - }, - "delete-existing-not-shareable": { - inFlightAllocations: map[types.UID]*resourceapi.ResourceClaim{ - claimUID: allocatedClaim, - }, - claimUID: claimUID, - expected: true, - expectedInFlightAllocations: map[types.UID]*resourceapi.ResourceClaim{}, - expectedInFlightAllocationSharers: map[types.UID]int{}, - }, - "delete-existing-shareable-unshared": { - inFlightAllocations: map[types.UID]*resourceapi.ResourceClaim{ - claimUID: allocatedClaim, - }, - inFlightAllocationSharers: map[types.UID]int{ - otherClaimUID: 10, - }, claimUID: claimUID, - shareable: true, - expected: true, - expectedInFlightAllocations: map[types.UID]*resourceapi.ResourceClaim{}, - expectedInFlightAllocationSharers: map[types.UID]int{ - otherClaimUID: 10, + forceRemove: true, + expected: false, + expectedInFlightAllocations: map[types.UID]inFlightAllocation{}, + }, + "delete-last-sharer": { + inFlightAllocations: map[types.UID]inFlightAllocation{ + claimUID: {claim: allocatedClaim, sharers: 1}, + otherClaimUID: {claim: allocatedClaim2, sharers: 10}, + }, + claimUID: claimUID, + expected: true, + expectedInFlightAllocations: map[types.UID]inFlightAllocation{ + otherClaimUID: {claim: allocatedClaim2, sharers: 10}, }, }, - "keep-existing-shareable-shared": { - inFlightAllocations: map[types.UID]*resourceapi.ResourceClaim{ - claimUID: allocatedClaim, + "force-delete-last-sharer": { + inFlightAllocations: map[types.UID]inFlightAllocation{ + claimUID: {claim: allocatedClaim, sharers: 1}, + otherClaimUID: {claim: allocatedClaim2, sharers: 10}, }, - inFlightAllocationSharers: map[types.UID]int{ - claimUID: 1, - otherClaimUID: 0, + claimUID: claimUID, + forceRemove: true, + expected: true, + expectedInFlightAllocations: map[types.UID]inFlightAllocation{ + otherClaimUID: {claim: allocatedClaim2, sharers: 10}, }, - claimUID: claimUID, - shareable: true, - expected: false, - expectedInFlightAllocations: map[types.UID]*resourceapi.ResourceClaim{ - claimUID: allocatedClaim, + }, + "decrement-remaining-sharers": { + inFlightAllocations: map[types.UID]inFlightAllocation{ + claimUID: {claim: allocatedClaim, sharers: 2}, + otherClaimUID: {claim: allocatedClaim2, sharers: 10}, }, - expectedInFlightAllocationSharers: map[types.UID]int{ - claimUID: 1, - otherClaimUID: 0, + claimUID: claimUID, + expected: false, + expectedInFlightAllocations: map[types.UID]inFlightAllocation{ + claimUID: {claim: allocatedClaim, sharers: 1}, + otherClaimUID: {claim: allocatedClaim2, sharers: 10}, + }, + }, + "force-delete-remaining-sharers": { + inFlightAllocations: map[types.UID]inFlightAllocation{ + claimUID: {claim: allocatedClaim, sharers: 2}, + otherClaimUID: {claim: allocatedClaim2, sharers: 10}, + }, + claimUID: claimUID, + forceRemove: true, + expected: true, + expectedInFlightAllocations: map[types.UID]inFlightAllocation{ + otherClaimUID: {claim: allocatedClaim2, sharers: 10}, }, }, } @@ -210,31 +193,14 @@ func testMaybeRemoveClaimPendingAllocation(tCtx ktesting.TContext) { for name, test := range tests { tCtx.Run(name, func(tCtx ktesting.TContext) { c := &claimTracker{ - logger: tCtx.Logger(), - inFlightAllocations: &sync.Map{}, - inFlightAllocationSharers: &sync.Map{}, + logger: tCtx.Logger(), + inFlightAllocations: make(map[types.UID]inFlightAllocation), } + maps.Copy(c.inFlightAllocations, test.inFlightAllocations) - for key, value := range test.inFlightAllocations { - c.inFlightAllocations.Store(key, value) - } - for key, value := range test.inFlightAllocationSharers { - c.inFlightAllocationSharers.Store(key, value) - } - actual := c.MaybeRemoveClaimPendingAllocation(test.claimUID, test.shareable) + actual := c.MaybeRemoveClaimPendingAllocation(test.claimUID, test.forceRemove) tCtx.Expect(actual).To(gomega.Equal(test.expected), "wrong value for deletion indicator") - actualInFlight := map[types.UID]*resourceapi.ResourceClaim{} - c.inFlightAllocations.Range(func(key, value any) bool { - actualInFlight[key.(types.UID)] = value.(*resourceapi.ResourceClaim) - return true - }) - tCtx.Expect(actualInFlight).To(gomega.Equal(test.expectedInFlightAllocations)) - actualSharers := map[types.UID]int{} - c.inFlightAllocationSharers.Range(func(key, value any) bool { - actualSharers[key.(types.UID)] = value.(int) - return true - }) - tCtx.Expect(actualSharers).To(gomega.Equal(test.expectedInFlightAllocationSharers)) + tCtx.Expect(c.inFlightAllocations).To(gomega.Equal(test.expectedInFlightAllocations)) }) } } diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 749d7640ab9..11b5287fc67 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -115,11 +115,32 @@ func (d *stateData) Clone() fwk.StateData { return d } +// This state is persisted in the PodGroup CycleState. Because we save the +// pointer in fwk.CycleState, in the later phases we don't need to call Write +// method to update the value. +type podGroupStateData struct { + // pendingAllocations stores the UIDs of ResourceClaims that are currently + // pending for *this* PodGroup scheduling cycle. The DRAManager continues to + // track inter-cycle sharing of pending allocations, including the + // allocations themselves and how many Pods are sharing them. + // + // It does not need to be protected by a lock because a PodGroup's + // CycleState is only available during synchronous scheduling phases and is + // cleared before asynchronous phases begin. + pendingAllocations sets.Set[types.UID] +} + +func (d *podGroupStateData) Clone() fwk.StateData { + return d +} + type informationForClaim struct { // Node selector based on the claim status if allocated. availableOnNodes *nodeaffinity.NodeSelector - // Set by Reserved, published by PreBind, empty if nothing had to be allocated. + // Set by Reserve, published by PreBind, empty if nothing had to be allocated. + // May be set by PreFilter if this allocation was made by a previous Pod's + // Reserve but not yet published in PreBind. allocation *resourceapi.AllocationResult } @@ -448,6 +469,11 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state fwk.CycleState, s := &stateData{} state.Write(stateKey, s) + podGroupState, err := getPodGroupStateData(state) + if err != nil { + return nil, statusError(logger, err) + } + userClaims, err := pl.podResourceClaims(pod) if err != nil { return nil, statusUnschedulable(logger, err.Error()) @@ -479,22 +505,9 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state fwk.CycleState, return nil, statusUnschedulable(logger, "resourceclaim in use", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim)) } - allocation := claim.Status.Allocation - // Gang scheduling works by waiting for all Pods in a gang to succeed - // through the Reserve phase, then binding all of the Pods. Since a - // claim's allocation is not recorded in [stateData.claims] until - // PreBind, we look for a pending allocation made in Reserve here to - // avoid blocking this Pod on waiting for the allocation to complete. - if pl.isSchedulingPodGroup(pod) && allocation == nil { - if pendingAllocation, found := pl.draManager.ResourceClaims().GetPendingAllocation(claim.UID); found { - allocation = pendingAllocation - logger.V(5).Info("reusing pending allocation", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim), "uid", claim.UID, "allocation", klog.Format(allocation)) - } - } - - if allocation != nil { - if allocation.NodeSelector != nil { - nodeSelector, err := nodeaffinity.NewNodeSelector(allocation.NodeSelector) + if claim.Status.Allocation != nil { + if claim.Status.Allocation.NodeSelector != nil { + nodeSelector, err := nodeaffinity.NewNodeSelector(claim.Status.Allocation.NodeSelector) if err != nil { return nil, statusError(logger, err) } @@ -503,10 +516,38 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state fwk.CycleState, } else { numClaimsToAllocate++ + // Gang scheduling works by waiting for all Pods in a gang to + // succeed through the Reserve phase, then binding all of the Pods. + // Since a claim's allocation is not recorded in [stateData.claims] + // until PreBind, we look for a pending allocation made in Reserve + // here to avoid blocking this Pod on waiting for the allocation to + // complete. + // + // Claims with pending allocations are considered "unallocated" in + // that the claim's Status.Allocation needs to be populated. Claims + // with pending allocations are not input to the allocator, though + // we numClaimsToAllocate increment to create an allocator anyway to + // signal that some claims need their status updated in PreBind. + // + // Currently, sharing is limited to a single PodGroup scheduling + // cycle. Ungrouped Pods and Pods across different PodGroup + // scheduling cycles cannot share claims with pending allocations + // for now to limit potential impact outside of the alpha + // GenericWorkload feature gate. Sharing claims this way more + // broadly may have benefits: + // https://github.com/kubernetes/kubernetes/issues/137932 + if podGroupState != nil && podGroupState.pendingAllocations.Has(claim.UID) { + if pendingAllocation := pl.draManager.ResourceClaims().GetPendingAllocation(claim.UID); pendingAllocation != nil { + s.informationsForClaim[index].allocation = pendingAllocation + logger.V(5).Info("reusing pending allocation", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim), "uid", claim.UID, "allocation", klog.Format(pendingAllocation)) + continue + } + } + // Allocation in flight? Better wait for that // to finish, see inFlightAllocations // documentation for details. - if _, pending := pl.draManager.ResourceClaims().GetPendingAllocation(claim.UID); pending { + if pl.draManager.ResourceClaims().GetPendingAllocation(claim.UID) != nil { return nil, statusUnschedulable(logger, fmt.Sprintf("resource claim %s is in the process of being allocated", klog.KObj(claim))) } @@ -672,6 +713,29 @@ func getStateData(cs fwk.CycleState) (*stateData, error) { return s, nil } +func getPodGroupStateData(cs fwk.CycleState) (*podGroupStateData, error) { + podGroupCycleState := cs.GetPodGroupSchedulingCycle() + if podGroupCycleState == nil { + return nil, nil + } + state, err := podGroupCycleState.Read(stateKey) + if errors.Is(err, fwk.ErrNotFound) { + podGroupState := &podGroupStateData{ + pendingAllocations: make(sets.Set[types.UID]), + } + podGroupCycleState.Write(stateKey, podGroupState) + return podGroupState, nil + } + if err != nil { + return nil, err + } + s, ok := state.(*podGroupStateData) + if !ok { + return nil, errors.New("state is not podGroupStateData") + } + return s, nil +} + // Filter invoked at the filter extension point. // It evaluates if a pod can fit due to the resources it requests, // for both allocated and unallocated claims. @@ -766,11 +830,19 @@ func (pl *DynamicResources) Filter(ctx context.Context, cs fwk.CycleState, pod * // This replaces the special ResourceClaim for extended resources with one // matching the node. claimsToAllocate := make([]*resourceapi.ResourceClaim, 0, state.claims.len()) + // pendingResult holds pending allocations that were made for a previous + // Pod in Reserve, but before that allocation is recorded in the status + // by PreBind. + var pendingResult []resourceapi.AllocationResult extendedResourceClaim := state.claims.extendedResourceClaim() - for _, claim := range state.claims.toAllocate() { + for index, claim := range state.claims.toAllocate() { if claim == extendedResourceClaim && nodeExtendedResourceClaim != nil { claim = nodeExtendedResourceClaim } + if state.informationsForClaim[index].allocation != nil { + pendingResult = append(pendingResult, *state.informationsForClaim[index].allocation) + continue + } claimsToAllocate = append(claimsToAllocate, claim) } allocationResult, err := state.allocator.Allocate(allocCtx, node, claimsToAllocate) @@ -812,13 +884,18 @@ func (pl *DynamicResources) Filter(ctx context.Context, cs fwk.CycleState, pod * for i, claim := range claimsToAllocate { allocationsMap[claim.UID] = &allocationResult[i] } + for i, claim := range state.claims.toAllocate() { + if state.informationsForClaim[i].allocation != nil { + allocationsMap[claim.UID] = state.informationsForClaim[i].allocation + } + } nodeAllocatableClaimStatus, status = pl.calculateAndCheckNodeAllocatableResources(ctx, state, pod, nodeInfo, allocationsMap) if status != nil { return status } } // Reserve uses this information. - allocations = allocationResult + allocations = append(allocationResult, pendingResult...) } // Store information in state while holding the mutex. @@ -1078,6 +1155,11 @@ func (pl *DynamicResources) Reserve(ctx context.Context, cs fwk.CycleState, pod logger := klog.FromContext(ctx) + podGroupState, err := getPodGroupStateData(cs) + if err != nil { + return statusError(logger, err) + } + numClaimsWithAllocator := 0 for _, claim := range state.claims.all() { if claim.Status.Allocation != nil { @@ -1087,18 +1169,6 @@ func (pl *DynamicResources) Reserve(ctx context.Context, cs fwk.CycleState, pod // will work and only do it for real during binding. If it fails at // that time, some other pod was faster and we have to try again. - if pl.isSchedulingPodGroup(pod) && claim.UID != state.claims.getInitialExtendedResourceClaimUID() { - // Inform the tracker that this pod still needs the pending - // allocation. Then PreBind or Unreserve can know whether or not - // it's safe to remove the pending allocation. - // - // Extended resources claims cannot be shared, so we never - // signal sharing for it. - if err := pl.draManager.ResourceClaims().AddSharedClaimPendingAllocation(claim.UID, claim); err != nil { - return statusError(logger, err) - } - } - continue } @@ -1168,6 +1238,10 @@ func (pl *DynamicResources) Reserve(ctx context.Context, cs fwk.CycleState, pod } logger.V(5).Info("Reserved resource in allocation result", "claim", klog.KObj(claim), "uid", claim.UID, "resourceVersion", claim.ResourceVersion, "allocation", klog.Format(allocation)) allocIndex++ + + if podGroupState != nil { + podGroupState.pendingAllocations.Insert(claim.UID) + } } } @@ -1187,28 +1261,33 @@ func (pl *DynamicResources) Unreserve(ctx context.Context, cs fwk.CycleState, po if state.claims.empty() { return } + podGroupState, err := getPodGroupStateData(cs) + if err != nil { + return + } logger := klog.FromContext(ctx) // we process user claims here first, extendedResourceClaim if any is handled below. for _, claim := range state.claims.allUserClaims() { - // Since several Pods may be sharing a claim with a pending allocation, - // make sure that we don't remove the pending allocation until the last - // Pod sharing it is Bound or Unreserved. - if pl.isSchedulingPodGroup(pod) { - if err := pl.draManager.ResourceClaims().RemoveSharedClaimPendingAllocation(claim.UID, claim); err != nil { - logger.Error(err, "unreserve", "resourceclaim", klog.KObj(claim)) - continue - } - } - // If allocation was in-flight, then it might not be anymore if no pods // still need the pending allocation. If the allocation was removed from // in-flight, we need to revert the claim object in the assume cache to - // what it was before. - if deleted := pl.draManager.ResourceClaims().MaybeRemoveClaimPendingAllocation(claim.UID, pl.isSchedulingPodGroup(pod)); deleted { + // what it was before. The allocation is not removed until the last Pod + // sharing it is Bound or Unreserved. + if deleted := pl.draManager.ResourceClaims().MaybeRemoveClaimPendingAllocation(claim.UID, false); deleted { logger.V(5).Info("Released resource in allocation result", "claim", klog.KObj(claim), "uid", claim.UID, "resourceVersion", claim.ResourceVersion, "allocation", klog.Format(claim.Status.Allocation)) pl.draManager.ResourceClaims().AssumedClaimRestore(claim.Namespace, claim.Name) + + // If we are currently asynchronously Binding Pods in a PodGroup, + // then the pendingAllocations set does not need to be updated. New + // PodGroup scheduling cycles will start with an empty set and not + // share pending allocations started in *this* cycle until Unreserve + // completes for all the Pods sharing that pending allocation and + // they can be Reserved again in another cycle. + if podGroupState != nil { + podGroupState.pendingAllocations.Delete(claim.UID) + } } // Ignore claims reserved for the PodGroup because other Pods in the @@ -1264,9 +1343,14 @@ func (pl *DynamicResources) PreBind(ctx context.Context, cs fwk.CycleState, pod logger := klog.FromContext(ctx) + podGroupState, err := getPodGroupStateData(cs) + if err != nil { + return statusError(logger, err) + } + for index, claim := range state.claims.all() { if !resourceclaim.IsReservedForPod(pod, claim, pl.fts.EnableDRAWorkloadResourceClaims) { - claim, err := pl.bindClaim(ctx, state, index, pod, nodeName) + claim, err := pl.bindClaim(ctx, state, podGroupState, index, pod, nodeName) if err != nil { return statusError(logger, err) } @@ -1382,7 +1466,7 @@ func (pl *DynamicResources) PreBindPreFlight(ctx context.Context, cs fwk.CycleSt // bindClaim gets called by PreBind for claim which is not reserved for the pod yet. // It might not even be allocated. bindClaim then ensures that the allocation // and reservation are recorded. This finishes the work started in Reserve. -func (pl *DynamicResources) bindClaim(ctx context.Context, state *stateData, index int, pod *v1.Pod, nodeName string) (*resourceapi.ResourceClaim, error) { +func (pl *DynamicResources) bindClaim(ctx context.Context, state *stateData, podGroupState *podGroupStateData, index int, pod *v1.Pod, nodeName string) (*resourceapi.ResourceClaim, error) { logger := klog.FromContext(ctx) claim := state.claims.get(index) binding := state.claims.getBinding(index, pod) @@ -1408,6 +1492,13 @@ func (pl *DynamicResources) bindClaim(ctx context.Context, state *stateData, ind // The scheduler was handling allocation. Now that has // completed, either successfully or with a failure. + + // The claim should remain in-flight in case any more Pods in the + // PodGroup are still using the pending allocation. If binding failed + // (e.g. due to a conflict in writing the allocation), then we signal + // that this Pod no longer needs the pending allocation without removing it. + forceRemovePendingAllocation := false + if resourceClaimModified { if isExtendedResourceClaim { pl.waitForExtendedClaimInAssumeCache(ctx, claim) @@ -1418,22 +1509,28 @@ func (pl *DynamicResources) bindClaim(ctx context.Context, state *stateData, ind logger.V(5).Info("Claim not stored in assume cache", "err", err, "claim", klog.KObj(claim), "uid", claimUID, "resourceVersion", resourceVersion) } else { logger.V(5).Info("Claim stored in assume cache", "claim", klog.KObj(claim), "uid", claimUID, "resourceVersion", resourceVersion) + + // Once the pending allocation is recorded in the + // AssumeCache, we no longer need to keep it in-flight. + // Even if it happens to still be shared, the allocation in + // the AssumeCache is authoritative. + forceRemovePendingAllocation = true } } } if allocation != nil { for _, claimUID := range claimUIDs { - // Since several Pods may be sharing a claim with a pending - // allocation, make sure that we don't remove the pending - // allocation until the last Pod sharing it is Bound or Unreserved. - if pl.isSchedulingPodGroup(pod) { - if err := pl.draManager.ResourceClaims().RemoveSharedClaimPendingAllocation(claim.UID, claim); err != nil { - logger.Error(err, "unreserve", "resourceclaim", klog.KObj(claim)) - continue + if deleted := pl.draManager.ResourceClaims().MaybeRemoveClaimPendingAllocation(claimUID, forceRemovePendingAllocation); deleted { + // If we are currently asynchronously Binding Pods in a + // PodGroup, then the pendingAllocations set does not need + // to be updated. New PodGroup scheduling cycles will start + // with an empty set and not share pending allocations + // started in *this* cycle until Unreserve completes for all + // the Pods sharing that pending allocation and they can be + // Reserved again in another cycle. + if podGroupState != nil { + podGroupState.pendingAllocations.Delete(claim.UID) } - } - - if deleted := pl.draManager.ResourceClaims().MaybeRemoveClaimPendingAllocation(claimUID, pl.isSchedulingPodGroup(pod)); deleted { logger.V(5).Info("Removed claim from in-flight claims", "claim", klog.KObj(claim), "uid", claimUID, "resourceVersion", resourceVersion, "allocation", klog.Format(allocation)) } } @@ -1631,10 +1728,6 @@ func (pl *DynamicResources) isPodReadyForBinding(state *stateData) (bool, error) return true, nil } -func (pl *DynamicResources) isSchedulingPodGroup(pod *v1.Pod) bool { - return pl.fts.EnableGenericWorkload && pod.Spec.SchedulingGroup != nil -} - func (pl *DynamicResources) getPodGroup(pod *v1.Pod) (*schedulingapi.PodGroup, error) { if !pl.fts.EnableDRAWorkloadResourceClaims || pod.Spec.SchedulingGroup == nil || pod.Spec.SchedulingGroup.PodGroupName == nil { diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go index a8d459dde3c..0efabe99fe8 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go @@ -3884,10 +3884,9 @@ func (tc *testContext) listAssumedClaims() ([]metav1.Object, []metav1.Object) { func (tc *testContext) listInFlightClaims() []metav1.Object { var inFlightClaims []metav1.Object - tc.draManager.resourceClaimTracker.inFlightAllocations.Range(func(key, value any) bool { - inFlightClaims = append(inFlightClaims, value.(*resourceapi.ResourceClaim)) - return true - }) + for _, inFlight := range tc.draManager.resourceClaimTracker.allInFlightAllocationsRLocked() { + inFlightClaims = append(inFlightClaims, inFlight.claim) + } sortObjects(inFlightClaims) return inFlightClaims } @@ -4968,7 +4967,7 @@ func testGatherAllocatedState(tCtx ktesting.TContext) { logger := klog.FromContext(tCtx) draManager := &DefaultDRAManager{ resourceClaimTracker: &claimTracker{ - inFlightAllocations: &sync.Map{}, + inFlightAllocations: make(map[types.UID]inFlightAllocation), allocatedDevices: newAllocatedDevices(logger), }, } diff --git a/pkg/scheduler/framework/plugins/dynamicresources/nodeallocatabledynamicresources_test.go b/pkg/scheduler/framework/plugins/dynamicresources/nodeallocatabledynamicresources_test.go index 63955078e16..a240bb685d8 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/nodeallocatabledynamicresources_test.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/nodeallocatabledynamicresources_test.go @@ -75,30 +75,18 @@ func (m *mockDRAManager) AssumeClaimAfterAPICall(claim *resourceapi.ResourceClai func (m *mockDRAManager) AssumedClaimRestore(namespace, name string) {} -func (m *mockDRAManager) ClaimHasPendingAllocation(uid types.UID) bool { - return false +func (m *mockDRAManager) GetPendingAllocation(uid types.UID) *resourceapi.AllocationResult { + return nil } func (m *mockDRAManager) SignalClaimPendingAllocation(uid types.UID, claim *resourceapi.ResourceClaim) error { return nil } -func (m *mockDRAManager) GetPendingAllocation(claimUID types.UID) (*resourceapi.AllocationResult, bool) { - return nil, false -} - -func (m *mockDRAManager) MaybeRemoveClaimPendingAllocation(claimUID types.UID, shareable bool) (deleted bool) { +func (m *mockDRAManager) MaybeRemoveClaimPendingAllocation(_ types.UID, _ bool) (deleted bool) { return false } -func (m *mockDRAManager) AddSharedClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error { - return nil -} - -func (m *mockDRAManager) RemoveSharedClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error { - return nil -} - func (m *mockDRAManager) List() ([]*resourceapi.ResourceClaim, error) { return nil, nil } diff --git a/pkg/scheduler/framework/plugins/gangscheduling/gangscheduling_test.go b/pkg/scheduler/framework/plugins/gangscheduling/gangscheduling_test.go index 73d1d6009ae..98cf7d9edf1 100644 --- a/pkg/scheduler/framework/plugins/gangscheduling/gangscheduling_test.go +++ b/pkg/scheduler/framework/plugins/gangscheduling/gangscheduling_test.go @@ -317,7 +317,9 @@ func TestGangSchedulingFlow(t *testing.T) { fakeActivator.activatedPods = nil cycleState := schedulerframework.NewCycleState() - cycleState.SetPodGroupSchedulingCycle(tt.isDuringPodGroupSchedulingCycle) + if tt.isDuringPodGroupSchedulingCycle { + cycleState.SetPodGroupSchedulingCycle(cycleState) + } pod := tt.pod.DeepCopy() pod.Spec.NodeName = "some-node" diff --git a/pkg/scheduler/schedule_one_podgroup.go b/pkg/scheduler/schedule_one_podgroup.go index f75480e2b88..0954d06736d 100644 --- a/pkg/scheduler/schedule_one_podgroup.go +++ b/pkg/scheduler/schedule_one_podgroup.go @@ -76,7 +76,7 @@ func (sched *Scheduler) scheduleOnePodGroup(ctx context.Context, podGroupInfo *f logger.V(3).Info("Attempting to schedule pod group", "podGroup", klog.KObj(podGroupInfo)) - sched.podGroupCycle(ctx, schedFwk, podGroupInfo) + sched.podGroupCycle(ctx, schedFwk, framework.NewCycleState(), podGroupInfo) } // frameworkForPodGroup obtains the concrete scheduler framework for the entire pod group. @@ -182,7 +182,7 @@ type podSchedulingContext struct { } // initPodSchedulingContext initializes the scheduling context of a single pod for pod group scheduling cycle. -func initPodSchedulingContext(ctx context.Context, pod *v1.Pod, postFilterMode podGroupPostFilterMode) *podSchedulingContext { +func initPodSchedulingContext(ctx context.Context, pod *v1.Pod, podGroupState *framework.CycleState, postFilterMode podGroupPostFilterMode) *podSchedulingContext { logger := klog.FromContext(ctx) // TODO(knelasevero): Remove duplicated keys from log entry calls // When contextualized logging hits GA @@ -201,7 +201,7 @@ func initPodSchedulingContext(ctx context.Context, pod *v1.Pod, postFilterMode p state.Write(framework.PodsToActivateKey, podsToActivate) // Marks this cycle as a pod group scheduling cycle. - state.SetPodGroupSchedulingCycle(true) + state.SetPodGroupSchedulingCycle(podGroupState) // Skip post filters if requested. switch postFilterMode { @@ -219,7 +219,7 @@ func initPodSchedulingContext(ctx context.Context, pod *v1.Pod, postFilterMode p } // podGroupCycle runs a pod group scheduling cycle for the given pod group in a single cluster snapshot. -func (sched *Scheduler) podGroupCycle(ctx context.Context, schedFwk framework.Framework, podGroupInfo *framework.QueuedPodGroupInfo) { +func (sched *Scheduler) podGroupCycle(ctx context.Context, schedFwk framework.Framework, podGroupCycleState *framework.CycleState, podGroupInfo *framework.QueuedPodGroupInfo) { // Synchronously attempt to find a fit for the pod group. start := time.Now() @@ -229,17 +229,17 @@ func (sched *Scheduler) podGroupCycle(ctx context.Context, schedFwk framework.Fr result := podGroupAlgorithmResult{ status: fwk.AsStatus(err), } - sched.submitPodGroupAlgorithmResult(ctx, schedFwk, podGroupInfo, result, start) + sched.submitPodGroupAlgorithmResult(ctx, schedFwk, podGroupCycleState, podGroupInfo, result, start) return } - result := sched.podGroupSchedulingAlgorithm(ctx, schedFwk, podGroupInfo, runAllPostFilters) + result := sched.podGroupSchedulingAlgorithm(ctx, schedFwk, podGroupCycleState, podGroupInfo, runAllPostFilters) metrics.PodGroupSchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start)) // Run workload aware preemption if required. If the preemption is successful, // we need to put the pods from pod group back into the scheduling queue. if sched.workloadAwarePreemptionEnabled && result.status.Code() == fwk.Unschedulable { - status := sched.runWorkloadAwarePreemption(ctx, schedFwk, podGroupInfo) + status := sched.runWorkloadAwarePreemption(ctx, schedFwk, podGroupCycleState, podGroupInfo) if status.IsSuccess() { result.waitingOnPreemption = true } else if status.IsError() { @@ -249,7 +249,7 @@ func (sched *Scheduler) podGroupCycle(ctx context.Context, schedFwk framework.Fr } } - sched.submitPodGroupAlgorithmResult(ctx, schedFwk, podGroupInfo, result, start) + sched.submitPodGroupAlgorithmResult(ctx, schedFwk, podGroupCycleState, podGroupInfo, result, start) } // runWorkloadAwarePreemption runs workload-aware preemption for the given pod group. @@ -259,7 +259,7 @@ func (sched *Scheduler) podGroupCycle(ctx context.Context, schedFwk framework.Fr // original state. // The function used for evaluating feasibility of pod group scheduling is // scheduler.podGroupSchedulingAlgorithm run without any post filters. -func (sched *Scheduler) runWorkloadAwarePreemption(ctx context.Context, schedFwk framework.Framework, podGroupInfo *framework.QueuedPodGroupInfo) *fwk.Status { +func (sched *Scheduler) runWorkloadAwarePreemption(ctx context.Context, schedFwk framework.Framework, podGroupCycleState *framework.CycleState, podGroupInfo *framework.QueuedPodGroupInfo) *fwk.Status { // Default preemption should be the only pod group post filter registered plugin. plugins := schedFwk.PodGroupPostFilterPlugins() if len(plugins) == 0 { @@ -281,7 +281,7 @@ func (sched *Scheduler) runWorkloadAwarePreemption(ctx context.Context, schedFwk defer restoreFn() return plugins[0].PodGroupPostFilter(ctx, pg, podGroupInfo.UnscheduledPods, func(ctx context.Context) *fwk.Status { - res := sched.podGroupSchedulingAlgorithm(ctx, schedFwk, podGroupInfo, runWithoutPostFilters) + res := sched.podGroupSchedulingAlgorithm(ctx, schedFwk, podGroupCycleState, podGroupInfo, runWithoutPostFilters) return res.status }) } @@ -353,7 +353,7 @@ type podGroupAlgorithmResult struct { // It tries to schedule each pod using standard filtering and scoring logic in a fixed order. // If a pod requires preemption to be schedulable, subsequent pods in the algorithm // treat that pod as already scheduled on that node with victims being already removed in memory. -func (sched *Scheduler) podGroupSchedulingDefaultAlgorithm(ctx context.Context, schedFwk framework.Framework, podGroupInfo *framework.QueuedPodGroupInfo, postFilterMode podGroupPostFilterMode) podGroupAlgorithmResult { +func (sched *Scheduler) podGroupSchedulingDefaultAlgorithm(ctx context.Context, schedFwk framework.Framework, podGroupCycleState *framework.CycleState, podGroupInfo *framework.QueuedPodGroupInfo, postFilterMode podGroupPostFilterMode) podGroupAlgorithmResult { result := podGroupAlgorithmResult{ podResults: make([]algorithmResult, 0, len(podGroupInfo.QueuedPodInfos)), status: fwk.NewStatus(fwk.Unschedulable).WithError(errPodGroupUnschedulable), @@ -365,7 +365,7 @@ func (sched *Scheduler) podGroupSchedulingDefaultAlgorithm(ctx context.Context, requiresPreemption := false for _, podInfo := range podGroupInfo.QueuedPodInfos { - podResult, revertFn := sched.podGroupPodSchedulingAlgorithm(ctx, schedFwk, podGroupInfo, podInfo, postFilterMode) + podResult, revertFn := sched.podGroupPodSchedulingAlgorithm(ctx, schedFwk, podGroupCycleState, podGroupInfo, podInfo, postFilterMode) result.podResults = append(result.podResults, podResult) if !podResult.status.IsSuccess() && !podResult.requiresPreemption { // When a pod is not feasible and doesn't require preemption, it means that it failed scheduling. @@ -403,9 +403,9 @@ func (sched *Scheduler) podGroupSchedulingDefaultAlgorithm(ctx context.Context, // podGroupPodSchedulingAlgorithm runs a scheduling algorithm for individual pod from a pod group. // It returns the algorithm result and, if successful or the preemption is required, the permit status together with the revert function. -func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, schedFwk framework.Framework, podGroupInfo *framework.QueuedPodGroupInfo, podInfo *framework.QueuedPodInfo, postFilterMode podGroupPostFilterMode) (algorithmResult, func()) { +func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, schedFwk framework.Framework, podGroupCycleState *framework.CycleState, podGroupInfo *framework.QueuedPodGroupInfo, podInfo *framework.QueuedPodInfo, postFilterMode podGroupPostFilterMode) (algorithmResult, func()) { pod := podInfo.Pod - podCtx := initPodSchedulingContext(ctx, pod, postFilterMode) + podCtx := initPodSchedulingContext(ctx, pod, podGroupCycleState, postFilterMode) logger := podCtx.logger ctx = klog.NewContext(ctx, logger) start := time.Now() @@ -492,7 +492,7 @@ func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, sche // for the next pod group scheduling cycle. // If the preemption is required for this pod group, all pods are moved back to the scheduling queue // and require the next pod group scheduling cycle to verify the preemption outcome. -func (sched *Scheduler) submitPodGroupAlgorithmResult(ctx context.Context, schedFwk framework.Framework, podGroupInfo *framework.QueuedPodGroupInfo, podGroupResult podGroupAlgorithmResult, start time.Time) { +func (sched *Scheduler) submitPodGroupAlgorithmResult(ctx context.Context, schedFwk framework.Framework, podGroupState *framework.CycleState, podGroupInfo *framework.QueuedPodGroupInfo, podGroupResult podGroupAlgorithmResult, start time.Time) { logger := klog.FromContext(ctx) var scheduledPods, unschedulablePods int @@ -504,7 +504,7 @@ func (sched *Scheduler) submitPodGroupAlgorithmResult(ctx context.Context, sched // In pod group-level unschedulable or error cases, podResult may not be defined. // Initialize it now to handle pod failure correctly. podResult = algorithmResult{ - podCtx: initPodSchedulingContext(ctx, pInfo.Pod, runAllPostFilters), + podCtx: initPodSchedulingContext(ctx, pInfo.Pod, podGroupState, runAllPostFilters), status: podGroupResult.status.Clone(), } } @@ -531,7 +531,7 @@ func (sched *Scheduler) submitPodGroupAlgorithmResult(ctx context.Context, sched switch { case podGroupResult.status.IsSuccess(): // Disable pod group scheduling in cycle state before binding. - podCtx.state.SetPodGroupSchedulingCycle(false) + podCtx.state.SetPodGroupSchedulingCycle(nil) // Schedule result is applied for pod and its binding cycle executes. assumedPodInfo, status := sched.prepareForBindingCycle(ctx, podCtx.state, schedFwk, pInfo, podCtx.podsToActivate, podResult.scheduleResult) if !status.IsSuccess() { @@ -648,7 +648,7 @@ func (sched *Scheduler) updatePodGroupCondition(ctx context.Context, // Placement is a set of nodes that will be considered when scheduling a pod group. // Then for each placement it tries to schedule the pod group through podGroupSchedulingDefaultAlgorithm. // Finally, it runs placement scorer plugins to select the best placement. -func (sched *Scheduler) podGroupSchedulingPlacementAlgorithm(ctx context.Context, schedFwk framework.Framework, podGroupInfo *framework.QueuedPodGroupInfo, postFilterMode podGroupPostFilterMode) podGroupAlgorithmResult { +func (sched *Scheduler) podGroupSchedulingPlacementAlgorithm(ctx context.Context, schedFwk framework.Framework, podGroupCycleState *framework.CycleState, podGroupInfo *framework.QueuedPodGroupInfo, postFilterMode podGroupPostFilterMode) podGroupAlgorithmResult { logger := klog.FromContext(ctx) allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List() if err != nil { @@ -657,7 +657,6 @@ func (sched *Scheduler) podGroupSchedulingPlacementAlgorithm(ctx context.Context } } - podGroupCycleState := framework.NewCycleState() // For now, always record plugin metrics until we understand its impact on performance. podGroupCycleState.SetRecordPluginMetrics(true) placements, status := schedFwk.RunPlacementGeneratePlugins(ctx, podGroupCycleState, podGroupInfo.PodGroupInfo, allNodes) @@ -678,7 +677,7 @@ func (sched *Scheduler) podGroupSchedulingPlacementAlgorithm(ctx context.Context status: fwk.AsStatus(fmt.Errorf("failed to assume pod group placement: %w", err)), } } - result := sched.podGroupSchedulingDefaultAlgorithm(ctx, schedFwk, podGroupInfo, postFilterMode) + result := sched.podGroupSchedulingDefaultAlgorithm(ctx, schedFwk, podGroupCycleState, podGroupInfo, postFilterMode) sched.nodeInfoSnapshot.ForgetPlacement() if result.status.IsError() { return result @@ -768,13 +767,13 @@ func makePodGroupAssignments(successfulResults map[*fwk.Placement]*podGroupAlgor } // podGroupSchedulingAlgorithm attempts to schedule pods in the pod group according to the policy and constraints and returns the scheduling result for each pod in the pod group. -func (sched *Scheduler) podGroupSchedulingAlgorithm(ctx context.Context, schedFwk framework.Framework, podGroupInfo *framework.QueuedPodGroupInfo, postFilterMode podGroupPostFilterMode) podGroupAlgorithmResult { +func (sched *Scheduler) podGroupSchedulingAlgorithm(ctx context.Context, schedFwk framework.Framework, podGroupCycleState *framework.CycleState, podGroupInfo *framework.QueuedPodGroupInfo, postFilterMode podGroupPostFilterMode) podGroupAlgorithmResult { podGroupCycleCtx, cancel := context.WithCancel(ctx) defer cancel() if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareWorkloadScheduling) { - return sched.podGroupSchedulingPlacementAlgorithm(podGroupCycleCtx, schedFwk, podGroupInfo, postFilterMode) + return sched.podGroupSchedulingPlacementAlgorithm(podGroupCycleCtx, schedFwk, podGroupCycleState, podGroupInfo, postFilterMode) } else { - return sched.podGroupSchedulingDefaultAlgorithm(podGroupCycleCtx, schedFwk, podGroupInfo, postFilterMode) + return sched.podGroupSchedulingDefaultAlgorithm(podGroupCycleCtx, schedFwk, podGroupCycleState, podGroupInfo, postFilterMode) } } diff --git a/pkg/scheduler/schedule_one_podgroup_test.go b/pkg/scheduler/schedule_one_podgroup_test.go index 5a6556e7345..9a0347dc6ca 100644 --- a/pkg/scheduler/schedule_one_podgroup_test.go +++ b/pkg/scheduler/schedule_one_podgroup_test.go @@ -417,7 +417,7 @@ func TestPodGroupCycle_UpdateSnapshotError(t *testing.T) { }, } - sched.podGroupCycle(ctx, schedFwk, podGroupInfo) + sched.podGroupCycle(ctx, schedFwk, framework.NewCycleState(), podGroupInfo) if !failureHandlerCalled { t.Errorf("Expected FailureHandler to be called after UpdateSnapshot failed") @@ -559,7 +559,7 @@ func TestPodGroupCycle_PodGroupPostFilter(t *testing.T) { } sched.SchedulePod = sched.schedulePod - sched.podGroupCycle(ctx, schedFwk, podGroupInfo) + sched.podGroupCycle(ctx, schedFwk, framework.NewCycleState(), podGroupInfo) if fakePlugin.podGroupPostFilterCalled != tt.expectedPodGroupPostFilterCalled { t.Errorf("Expected workload aware preemption (PodGroupPostFilter) to be %v, but got %v", tt.expectedPodGroupPostFilterCalled, fakePlugin.podGroupPostFilterCalled) @@ -1020,7 +1020,7 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) { t.Fatalf("Failed to update snapshot: %v", err) } - result := sched.podGroupSchedulingAlgorithm(ctx, schedFwk, podGroupInfo, runAllPostFilters) + result := sched.podGroupSchedulingAlgorithm(ctx, schedFwk, framework.NewCycleState(), podGroupInfo, runAllPostFilters) if result.status.Code() != tt.expectedGroupStatusCode { t.Errorf("Expected group status code: %v, got: %v", tt.expectedGroupStatusCode, result.status.Code()) @@ -1636,13 +1636,15 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) { }, } + podGroupCycleState := framework.NewCycleState() + for i := range tt.algorithmResult.podResults { pod := podGroupInfo.QueuedPodInfos[i].Pod - podCtx := initPodSchedulingContext(ctx, pod, runAllPostFilters) + podCtx := initPodSchedulingContext(ctx, pod, podGroupCycleState, runAllPostFilters) tt.algorithmResult.podResults[i].podCtx = podCtx } - sched.submitPodGroupAlgorithmResult(ctx, schedFwk, podGroupInfo, tt.algorithmResult, time.Now()) + sched.submitPodGroupAlgorithmResult(ctx, schedFwk, podGroupCycleState, podGroupInfo, tt.algorithmResult, time.Now()) if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { lock.Lock() @@ -2340,7 +2342,7 @@ func TestPodGroupSchedulingPlacementAlgorithm(t *testing.T) { }, } - result := sched.podGroupSchedulingPlacementAlgorithm(ctx, schedFwk, pgInfo, runAllPostFilters) + result := sched.podGroupSchedulingPlacementAlgorithm(ctx, schedFwk, framework.NewCycleState(), pgInfo, runAllPostFilters) opts := cmp.Options{ cmp.AllowUnexported( @@ -2502,7 +2504,7 @@ func TestPodGroupSchedulingPlacementAlgorithm_Scoring(t *testing.T) { }, } - result := sched.podGroupSchedulingPlacementAlgorithm(ctx, schedFwk, pgInfo, runAllPostFilters) + result := sched.podGroupSchedulingPlacementAlgorithm(ctx, schedFwk, framework.NewCycleState(), pgInfo, runAllPostFilters) expectedHost := placements[tt.expectedPlacement][0] actualHost := result.podResults[0].scheduleResult.SuggestedHost @@ -2697,7 +2699,7 @@ func TestRunWorkloadAwarePreemption(t *testing.T) { // Just inject logger explicitly in context to avoid panic ctx = klog.NewContext(ctx, logger) - status := sched.runWorkloadAwarePreemption(ctx, schedFwk, tt.podGroupInfo) + status := sched.runWorkloadAwarePreemption(ctx, schedFwk, framework.NewCycleState(), tt.podGroupInfo) if tt.expectedStatus.Code() != status.Code() || tt.expectedStatus.Message() != status.Message() { t.Errorf("Unexpected status, want code %v message %q, got code %v message %q", diff --git a/staging/src/k8s.io/kube-scheduler/framework/cycle_state.go b/staging/src/k8s.io/kube-scheduler/framework/cycle_state.go index 965993ba8e3..4d103b2e39c 100644 --- a/staging/src/k8s.io/kube-scheduler/framework/cycle_state.go +++ b/staging/src/k8s.io/kube-scheduler/framework/cycle_state.go @@ -98,9 +98,12 @@ type CycleState interface { // or doesn't belong to any pod group. // This field can only be set to true when GenericWorkload feature flag is enabled. IsPodGroupSchedulingCycle() bool - // SetPodGroupSchedulingCycle sets whether this cycle is a pod group scheduling cycle or not. + // GetPodGroupSchedulingCycle gets the cycle state of the PodGroup for a Pod. // This should be only used when GenericWorkload feature flag is enabled. - SetPodGroupSchedulingCycle(bool) + GetPodGroupSchedulingCycle() PodGroupCycleState + // SetPodGroupSchedulingCycle sets the cycle state of the PodGroup for a Pod. + // This should be only used when GenericWorkload feature flag is enabled. + SetPodGroupSchedulingCycle(PodGroupCycleState) } // PodGroupCycleState provides a mechanism for plugins that operate on pod groups to store and retrieve arbitrary data. diff --git a/staging/src/k8s.io/kube-scheduler/framework/listers.go b/staging/src/k8s.io/kube-scheduler/framework/listers.go index 3faf75bb3c7..701e81408fb 100644 --- a/staging/src/k8s.io/kube-scheduler/framework/listers.go +++ b/staging/src/k8s.io/kube-scheduler/framework/listers.go @@ -105,25 +105,20 @@ type ResourceClaimTracker interface { GatherAllocatedState() (*structured.AllocatedState, error) // SignalClaimPendingAllocation signals to the tracker that the given ResourceClaim will be allocated via an API call in the - // binding phase. This change is immediately reflected in the result of List() and the other accessors. + // binding phase, therefore the given ResourceClaim must be non-nil and have a non-nil Status.Allocation. + // If the claim already has a pending allocation, then the allocation becomes shared. The same number of SignalClaimPendingAllocation() callers + // for a given claimUID is expected to eventually call MaybeRemoveClaimPendingAllocation() for that claimUID. + // This change is immediately reflected in the result of List() and the other accessors. SignalClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error - // GetPendingAllocation answers whether a given claim has a pending allocation during the binding phase. It can be used to avoid + // ClaimHasPendingAllocation answers whether a given claim has a pending allocation during the binding phase. It can be used to avoid // race conditions in subsequent scheduling phases. - GetPendingAllocation(claimUID types.UID) (*resourceapi.AllocationResult, bool) + GetPendingAllocation(claimUID types.UID) *resourceapi.AllocationResult // MaybeRemoveClaimPendingAllocation might remove the pending allocation for the given ResourceClaim from the tracker if any was signaled via - // SignalClaimPendingAllocation(). When a pending allocation is `shareable`, it removes the pending allocation only when - // no other pods are still using that pending allocation (per AddSharedClaimPendingAllocation and - // RemoveSharedClaimPendingAllocation). When a pending allocation is not shareable, it always removes the pending - // allocation as long as one exists. Returns whether there was a pending allocation and it was removed. - // List() and the other accessors immediately stop reflecting the pending allocation in the results. - MaybeRemoveClaimPendingAllocation(claimUID types.UID, shareable bool) (deleted bool) - - // AddSharedClaimPendingAllocation increments the number of active sharers - // of the given ResourceClaim. - AddSharedClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error - // RemoveSharedClaimPendingAllocation decrements the number of active sharers - // of the given ResourceClaim. - RemoveSharedClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error + // SignalClaimPendingAllocation(). When `forceRemove` is true, it always removes the pending allocation. Otherwise, it removes the pending + // allocation only when no other pods are still using that pending allocation (from SignalClaimPendingAllocation and AcquirePendingAllocation). + // Returns whether there was a pending allocation and it was removed. + // List() and the other accessors immediately stop reflecting the pending allocation in the results when the pending allocation is removed. + MaybeRemoveClaimPendingAllocation(claimUID types.UID, forceRemove bool) (deleted bool) // AssumeClaimAfterAPICall signals to the tracker that an API call modifying the given ResourceClaim was made in the binding phase, and the // changes should be reflected in informers very soon. This change is immediately reflected in the result of List() and the other accessors.