From e1b18e34ffa8cecbe8d88a26151ee821128f7a61 Mon Sep 17 00:00:00 2001 From: Omar Sayed Date: Mon, 9 Mar 2026 02:54:22 +0000 Subject: [PATCH] snapshot pod group state before scheduling cycle and embed pod group manager into cache --- pkg/scheduler/backend/cache/cache.go | 199 +++++- pkg/scheduler/backend/cache/cache_test.go | 662 +++++++++++++++++- pkg/scheduler/backend/cache/interface.go | 13 + pkg/scheduler/backend/cache/podgroupstate.go | 354 ++++++++++ .../podgroupstate_test.go | 127 ++-- pkg/scheduler/backend/cache/snapshot.go | 73 +- .../podgroupmanager/podgroupmanager.go | 134 ---- .../podgroupmanager/podgroupmanager_test.go | 280 -------- .../backend/podgroupmanager/podgroupstate.go | 248 ------- pkg/scheduler/eventhandlers.go | 20 +- pkg/scheduler/eventhandlers_test.go | 10 +- pkg/scheduler/extender_test.go | 2 +- .../lister_contract_test.go | 42 ++ pkg/scheduler/framework/cycle_state.go | 11 + .../defaultbinder/default_binder_test.go | 2 +- .../default_preemption_test.go | 4 +- .../plugins/gangscheduling/gangscheduling.go | 61 +- .../gangscheduling/gangscheduling_test.go | 115 ++- .../framework/preemption/executor_test.go | 4 +- .../framework/preemption/preemption_test.go | 2 +- pkg/scheduler/framework/runtime/batch_test.go | 4 + pkg/scheduler/framework/runtime/framework.go | 2 +- pkg/scheduler/schedule_one_podgroup.go | 7 +- pkg/scheduler/schedule_one_podgroup_test.go | 22 +- pkg/scheduler/schedule_one_test.go | 49 +- pkg/scheduler/scheduler.go | 15 +- pkg/scheduler/scheduler_test.go | 4 +- .../kube-scheduler/framework/cycle_state.go | 4 + .../kube-scheduler/framework/interface.go | 2 +- .../kube-scheduler/framework/listers.go | 27 +- .../scheduler/podgroup/podgroup_test.go | 3 - 31 files changed, 1593 insertions(+), 909 deletions(-) create mode 100644 pkg/scheduler/backend/cache/podgroupstate.go rename pkg/scheduler/backend/{podgroupmanager => cache}/podgroupstate_test.go (63%) delete mode 100644 pkg/scheduler/backend/podgroupmanager/podgroupmanager.go delete mode 100644 pkg/scheduler/backend/podgroupmanager/podgroupmanager_test.go delete mode 100644 pkg/scheduler/backend/podgroupmanager/podgroupstate.go diff --git a/pkg/scheduler/backend/cache/cache.go b/pkg/scheduler/backend/cache/cache.go index 7683791a8e0..d8dc6a4a400 100644 --- a/pkg/scheduler/backend/cache/cache.go +++ b/pkg/scheduler/backend/cache/cache.go @@ -41,8 +41,8 @@ var ( // New returns a Cache implementation. // It automatically starts a go routine that exports cache metrics. // "ctx" is the context that would close the background goroutine. -func New(ctx context.Context, apiDispatcher fwk.APIDispatcher) Cache { - cache := newCache(ctx, updateMetricsPeriod, apiDispatcher) +func New(ctx context.Context, apiDispatcher fwk.APIDispatcher, genericWorkloadEnabled bool) Cache { + cache := newCache(ctx, updateMetricsPeriod, apiDispatcher, genericWorkloadEnabled) cache.run() return cache } @@ -74,7 +74,10 @@ type cacheImpl struct { nodeTree *nodeTree // A map from image name to its ImageStateSummary. imageStates map[string]*fwk.ImageStateSummary - + // podGroupStates stores the runtime state for each known pod group (only if GenericWorkload feature gate is enabled). + podGroupStates map[podGroupKey]*podGroupState + // genericWorkloadEnabled stores the GenericWorkload feature gate value. + genericWorkloadEnabled bool // apiDispatcher is used for the methods that are expected to send API calls. // It's non-nil only if the SchedulerAsyncAPICalls feature gate is enabled. apiDispatcher fwk.APIDispatcher @@ -84,18 +87,20 @@ type podState struct { pod *v1.Pod } -func newCache(ctx context.Context, period time.Duration, apiDispatcher fwk.APIDispatcher) *cacheImpl { +func newCache(ctx context.Context, period time.Duration, apiDispatcher fwk.APIDispatcher, genericWorkloadEnabled bool) *cacheImpl { logger := klog.FromContext(ctx) return &cacheImpl{ period: period, stop: ctx.Done(), - nodes: make(map[string]*nodeInfoListItem), - nodeTree: newNodeTree(logger, nil), - assumedPods: sets.New[string](), - podStates: make(map[string]*podState), - imageStates: make(map[string]*fwk.ImageStateSummary), - apiDispatcher: apiDispatcher, + nodes: make(map[string]*nodeInfoListItem), + nodeTree: newNodeTree(logger, nil), + assumedPods: sets.New[string](), + podStates: make(map[string]*podState), + imageStates: make(map[string]*fwk.ImageStateSummary), + podGroupStates: make(map[podGroupKey]*podGroupState), + genericWorkloadEnabled: genericWorkloadEnabled, + apiDispatcher: apiDispatcher, } } @@ -284,9 +289,32 @@ func (cache *cacheImpl) UpdateSnapshot(logger klog.Logger, nodeSnapshot *Snapsho return errors.New(errMsg) } + // Take a snapshot of pod group states for this scheduling cycle. + cache.updatePodGroupStateSnapshot(nodeSnapshot) + return nil } +// updatePodGroupStateSnapshot updates the pod group state portion of the given snapshot. +// It assumes that the cache lock is already held. +// It removes entries that no longer exist in the live cache +// and clones entries whose generation has advanced since the last snapshot. +func (cache *cacheImpl) updatePodGroupStateSnapshot(snapshot *Snapshot) { + // Remove pod group states from snapshot that no longer exist in cache. + for key := range snapshot.podGroupStates { + if _, exists := cache.podGroupStates[key]; !exists { + delete(snapshot.podGroupStates, key) + } + } + // Clone only pod group states that changed since the last snapshot. + for key, podGroupState := range cache.podGroupStates { + if existing, ok := snapshot.podGroupStates[key]; ok && existing.generation == podGroupState.generation { + continue + } + snapshot.podGroupStates[key] = podGroupState.snapshot() + } +} + func (cache *cacheImpl) updateNodeInfoSnapshotList(logger klog.Logger, snapshot *Snapshot, updateAll bool) { snapshot.havePodsWithAffinityNodeInfoList = make([]fwk.NodeInfo, 0, cache.nodeTree.numNodes) snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = make([]fwk.NodeInfo, 0, cache.nodeTree.numNodes) @@ -400,7 +428,7 @@ func (cache *cacheImpl) ForgetPod(logger klog.Logger, pod *v1.Pod) error { } // Only assumed pod can be forgotten. if cache.assumedPods.Has(key) { - return cache.removePod(logger, pod) + return cache.removePod(logger, pod, true) } return fmt.Errorf("pod %v(%v) wasn't assumed so cannot be forgotten", key, klog.KObj(pod)) } @@ -425,12 +453,21 @@ func (cache *cacheImpl) addPod(logger klog.Logger, pod *v1.Pod, assumePod bool) if assumePod { cache.assumedPods.Insert(key) } + + if !cache.isPodGroupMember(pod) { + return nil + } + if assumePod { + cache.assumePodGroupMember(pod) + } else { + cache.addPodGroupMember(pod) + } return nil } // Assumes that lock is already acquired. func (cache *cacheImpl) updatePod(logger klog.Logger, oldPod, newPod *v1.Pod) error { - if err := cache.removePod(logger, oldPod); err != nil { + if err := cache.removePod(logger, oldPod, false); err != nil { return err } return cache.addPod(logger, newPod, false) @@ -440,7 +477,7 @@ func (cache *cacheImpl) updatePod(logger klog.Logger, oldPod, newPod *v1.Pod) er // Removes a pod from the cached node info. If the node information was already // removed and there are no more pods left in the node, cleans up the node from // the cache. -func (cache *cacheImpl) removePod(logger klog.Logger, pod *v1.Pod) error { +func (cache *cacheImpl) removePod(logger klog.Logger, pod *v1.Pod, forgetPod bool) error { key, err := framework.GetPodKey(pod) if err != nil { return err @@ -462,6 +499,16 @@ func (cache *cacheImpl) removePod(logger klog.Logger, pod *v1.Pod) error { delete(cache.podStates, key) delete(cache.assumedPods, key) + + if !cache.isPodGroupMember(pod) { + return nil + } + if forgetPod { + cache.forgetPodGroupMember(logger, pod) + } else { + cache.removePodGroupMember(pod) + } + return nil } @@ -546,7 +593,7 @@ func (cache *cacheImpl) RemovePod(logger klog.Logger, pod *v1.Pod) error { klog.FlushAndExit(klog.ExitFlushTimeout, 1) } } - return cache.removePod(logger, currState.pod) + return cache.removePod(logger, currState.pod, false) } func (cache *cacheImpl) IsAssumedPod(pod *v1.Pod) (bool, error) { @@ -714,6 +761,130 @@ func (cache *cacheImpl) updateMetrics() { metrics.CacheSize.WithLabelValues("nodes").Set(float64(len(cache.nodes))) } +// isPodGroupMember returns true if the pod belongs to a pod group, +// provided that GenericWorkload feature gate is enabled. +func (cache *cacheImpl) isPodGroupMember(pod *v1.Pod) bool { + return cache.genericWorkloadEnabled && pod.Spec.SchedulingGroup != nil +} + +// AddPodGroupMember adds not assigned and not assumed pod to its pod group state in the cache. +func (cache *cacheImpl) AddPodGroupMember(pod *v1.Pod) { + if !cache.isPodGroupMember(pod) { + return + } + cache.mu.Lock() + defer cache.mu.Unlock() + + cache.addPodGroupMember(pod) +} + +// UpdatePodGroupMember updates a pod's entry inside its pod group state in the cache. +func (cache *cacheImpl) UpdatePodGroupMember(logger klog.Logger, oldPod, newPod *v1.Pod) { + if !cache.isPodGroupMember(newPod) { + return + } + cache.mu.Lock() + defer cache.mu.Unlock() + + cache.updatePodGroupMember(logger, oldPod, newPod) +} + +// RemovePodGroupMember removes the pod from its pod group state in the cache. +func (cache *cacheImpl) RemovePodGroupMember(pod *v1.Pod) { + if !cache.isPodGroupMember(pod) { + return + } + cache.mu.Lock() + defer cache.mu.Unlock() + + cache.removePodGroupMember(pod) +} + +// addPodGroupMember adds the pod to its pod group state, creating the group entry if it doesn't exist yet. +// Assumes that the cache lock is already held. +func (cache *cacheImpl) addPodGroupMember(pod *v1.Pod) { + key := newPodGroupKey(pod.Namespace, *pod.Spec.SchedulingGroup.PodGroupName) + podGroupState, exists := cache.podGroupStates[key] + if !exists { + podGroupState = newPodGroupState() + cache.podGroupStates[key] = podGroupState + } + + podGroupState.addPod(pod) +} + +// updatePodGroupMember updates the pod entry inside its pod group state. +// Assumes that the cache lock is already held. +func (cache *cacheImpl) updatePodGroupMember(logger klog.Logger, oldPod, newPod *v1.Pod) { + key := newPodGroupKey(newPod.Namespace, *newPod.Spec.SchedulingGroup.PodGroupName) + podGroupState, exists := cache.podGroupStates[key] + if !exists { + // This should not happen: the pod group state should have been already created by a prior pod add action. + utilruntime.HandleErrorWithLogger(logger, nil, "Pod group state not found for update, this indicates a missed add event", "pod", klog.KObj(newPod), "podGroupKey", key) + return + } + + podGroupState.updatePod(oldPod, newPod) +} + +// removePodGroupMember removes the pod from its pod group state, deleting the group entry when empty. +// Assumes that the cache lock is already held. +func (cache *cacheImpl) removePodGroupMember(pod *v1.Pod) { + key := newPodGroupKey(pod.Namespace, *pod.Spec.SchedulingGroup.PodGroupName) + podGroupState, exists := cache.podGroupStates[key] + if !exists { + return + } + podGroupState.deletePod(pod.UID) + if podGroupState.empty() { + delete(cache.podGroupStates, key) + } +} + +// assumePodGroupMember marks the pod as assumed in its pod group state. +// Assumes that the cache lock is already held. +func (cache *cacheImpl) assumePodGroupMember(pod *v1.Pod) { + key := newPodGroupKey(pod.Namespace, *pod.Spec.SchedulingGroup.PodGroupName) + podGroupState, exists := cache.podGroupStates[key] + if !exists { + podGroupState = newPodGroupState() + cache.podGroupStates[key] = podGroupState + podGroupState.allPods[pod.UID] = pod + } + podGroupState.assumePod(pod.UID) +} + +// forgetPodGroupMember moves the pod back from assumed to unscheduled in its pod group state. +// Assumes that the cache lock is already held. +func (cache *cacheImpl) forgetPodGroupMember(logger klog.Logger, pod *v1.Pod) { + key := newPodGroupKey(pod.Namespace, *pod.Spec.SchedulingGroup.PodGroupName) + pgs, exists := cache.podGroupStates[key] + if !exists { + // This should not happen: the pod group state should have been already created by a prior pod add or assume action. + utilruntime.HandleErrorWithLogger(logger, nil, "Pod group state not found for forget, this indicates a missed add or assume event", "pod", klog.KObj(pod), "podGroupKey", key) + return + } + pgs.forgetPod(pod.UID) +} + +// PodGroupStates returns the PodGroupStateLister for this cache. +func (cache *cacheImpl) PodGroupStates() fwk.PodGroupStateLister { + return cache +} + +// Get returns the pod group state for the given pod group. +func (cache *cacheImpl) Get(namespace string, podGroupName string) (fwk.PodGroupState, error) { + cache.mu.RLock() + defer cache.mu.RUnlock() + + key := newPodGroupKey(namespace, podGroupName) + podGroupState, exists := cache.podGroupStates[key] + if !exists { + return nil, fmt.Errorf("pod group state not found for pod group %s", key) + } + return podGroupState, nil +} + // BindPod handles the pod binding by adding a bind API call to the dispatcher. // This method should be used only if the SchedulerAsyncAPICalls feature gate is enabled. func (cache *cacheImpl) BindPod(binding *v1.Binding) (<-chan error, error) { diff --git a/pkg/scheduler/backend/cache/cache_test.go b/pkg/scheduler/backend/cache/cache_test.go index 848613a55f0..5f17efa7b90 100644 --- a/pkg/scheduler/backend/cache/cache_test.go +++ b/pkg/scheduler/backend/cache/cache_test.go @@ -47,6 +47,12 @@ var nodeInfoCmpOpts = []cmp.Option{ cmpopts.IgnoreFields(framework.PodInfo{}, "cachedResource"), } +var podGroupStateCmpOpts = []cmp.Option{ + cmp.AllowUnexported(podGroupStateSnapshot{}, podGroupStateData{}, podGroupKey{}), + cmpopts.IgnoreFields(podGroupStateData{}, "generation"), + cmpopts.EquateEmpty(), +} + func init() { metrics.Register() } @@ -235,7 +241,7 @@ func TestAssumePodScheduled(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() - cache := newCache(ctx, time.Second, nil) + cache := newCache(ctx, time.Second, nil, false) for _, pod := range tc.pods { if err := cache.AssumePod(logger, pod); err != nil { t.Fatalf("AssumePod failed: %v", err) @@ -296,7 +302,7 @@ func TestAddPodWillConfirm(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() - cache := newCache(ctx, time.Second, nil) + cache := newCache(ctx, time.Second, nil, false) for _, podToAssume := range test.podsToAssume { if err := cache.AssumePod(logger, podToAssume); err != nil { t.Fatalf("assumePod failed: %v", err) @@ -349,7 +355,7 @@ func TestDump(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() - cache := newCache(ctx, time.Second, nil) + cache := newCache(ctx, time.Second, nil, false) for _, podToAssume := range test.podsToAssume { if err := cache.AssumePod(logger, podToAssume); err != nil { t.Errorf("assumePod failed: %v", err) @@ -415,7 +421,7 @@ func TestAddPodAlwaysUpdatesPodInfoInNodeInfo(t *testing.T) { }, } - cache := newCache(ctx, time.Second, nil) + cache := newCache(ctx, time.Second, nil, false) for _, podToAssume := range test.podsToAssume { if err := cache.AssumePod(logger, podToAssume); err != nil { t.Fatalf("assumePod failed: %v", err) @@ -471,7 +477,7 @@ func TestAddPodWillReplaceAssumed(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() - cache := newCache(ctx, time.Second, nil) + cache := newCache(ctx, time.Second, nil, false) for _, podToAssume := range test.podsToAssume { if err := cache.AssumePod(logger, podToAssume); err != nil { t.Fatalf("assumePod failed: %v", err) @@ -540,7 +546,7 @@ func TestUpdatePod(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() - cache := newCache(ctx, time.Second, nil) + cache := newCache(ctx, time.Second, nil, false) for _, podToAdd := range test.podsToAdd { if err := cache.AddPod(logger, podToAdd); err != nil { t.Fatalf("AddPod failed: %v", err) @@ -600,7 +606,7 @@ func TestUpdatePodAndGet(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() - cache := newCache(ctx, time.Second, nil) + cache := newCache(ctx, time.Second, nil, false) // trying to get an unknown pod should return an error // podToUpdate has not been added yet if _, err := cache.GetPod(tc.podToUpdate); err == nil { @@ -666,7 +672,7 @@ func TestEphemeralStorageResource(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() - cache := newCache(ctx, time.Second, nil) + cache := newCache(ctx, time.Second, nil, false) if err := cache.AddPod(logger, test.pod); err != nil { t.Fatalf("AddPod failed: %v", err) } @@ -683,6 +689,390 @@ func TestEphemeralStorageResource(t *testing.T) { } } +func Test_AddPodGroupMember(t *testing.T) { + podGroupName := "pg" + // Pod with no pod group name. + pod1 := st.MakePod().Namespace("namespace").Name("non-workload-pod").Obj() + // Unscheduled pod with a pod group name. + pod2 := st.MakePod().Namespace("namespace").Name("unscheduled-pod").PodGroupName(podGroupName).Obj() + // Assigned pod with the same pod group name. + pod3 := st.MakePod().Namespace("namespace").Name("assigned-pod").Node("node1").PodGroupName(podGroupName).Obj() + + tests := []struct { + name string + pod *v1.Pod + genericWorkloadEnabled bool + expectInUnscheduledPods bool + expectInAssignedPods bool + }{ + { + name: "generic workload disabled", + pod: pod2, + genericWorkloadEnabled: false, + }, + { + name: "pod with no pod group name", + pod: pod1, + genericWorkloadEnabled: true, + }, + { + name: "unscheduled pod with a pod group name", + pod: pod2, + genericWorkloadEnabled: true, + expectInUnscheduledPods: true, + }, + { + name: "assigned pod with a pod group name", + pod: pod3, + genericWorkloadEnabled: true, + expectInAssignedPods: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cache := newCache(context.Background(), time.Second, nil, tt.genericWorkloadEnabled) + cache.AddPodGroupMember(tt.pod) + + if tt.pod.Spec.SchedulingGroup == nil { + if tt.expectInAssignedPods || tt.expectInUnscheduledPods { + t.Errorf("Expected pod group to exist, but pod has no pod group") + } + return + } + + podGroupState, err := cache.PodGroupStates().Get(tt.pod.Namespace, *tt.pod.Spec.SchedulingGroup.PodGroupName) + if err != nil { + if tt.genericWorkloadEnabled { + t.Errorf("Expected pod group to exist, but got error: %v", err) + } + return + } + + _, inUnscheduledPods := podGroupState.UnscheduledPods()[tt.pod.Name] + if inUnscheduledPods != tt.expectInUnscheduledPods { + t.Errorf("expected pod in UnscheduledPods: %v, got %v", tt.expectInUnscheduledPods, inUnscheduledPods) + } + + if inAssignedPods := podGroupState.AssignedPods().Has(tt.pod.UID); inAssignedPods != tt.expectInAssignedPods { + t.Errorf("expected pod in AssignedPods: %v, got %v", tt.expectInAssignedPods, inAssignedPods) + } + }) + } +} + +func Test_UpdatePodGroupMember(t *testing.T) { + podGroupName := "pg" + // unscheduled pod with a pod group name + pod := st.MakePod().Namespace("namespace").Name("unscheduled-pod").UID("pod1"). + PodGroupName(podGroupName).Obj() + // updated unscheduled pod with a pod group name + updatedPod := st.MakePod().Namespace("namespace").Name("unscheduled-pod").UID("pod1"). + Labels(map[string]string{"foo": "bar"}).PodGroupName(podGroupName).Obj() + // assigned pod with a pod group name + assignedPod := st.MakePod().Namespace("namespace").Name("assigned-pod").UID("pod2").Node("node").PodGroupName(podGroupName).Obj() + // pod with no pod group name + noPodGroupPod := st.MakePod().Namespace("namespace").Name("no-pod-group-pod").UID("pod3").Obj() + // updated pod with no pod group name + updatedNoPodGroupPod := st.MakePod().Namespace("namespace").Name("no-pod-group-pod").UID("pod3"). + Labels(map[string]string{"foo": "bar"}).Obj() + + tests := []struct { + name string + isAssumedPod bool + oldPod *v1.Pod + newPod *v1.Pod + genericWorkloadEnabled bool + expectInAssumedPods bool + expectInUnscheduledPods bool + expectInAssignedPods bool + }{ + { + name: "updating a pod with genericWorkload disabled should be a no-op", + oldPod: pod, + newPod: updatedPod, + genericWorkloadEnabled: false, + expectInUnscheduledPods: true, + }, + { + name: "update a pod with no pod group name should be a no-op", + oldPod: noPodGroupPod, + newPod: updatedNoPodGroupPod, + genericWorkloadEnabled: true, + }, + { + name: "update a pod", + isAssumedPod: true, + oldPod: pod, + newPod: updatedPod, + genericWorkloadEnabled: true, + expectInUnscheduledPods: true, + }, + { + name: "update a pod, move to assigned", + isAssumedPod: true, + oldPod: pod, + newPod: assignedPod, + genericWorkloadEnabled: true, + expectInAssignedPods: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + cache := newCache(context.Background(), time.Second, nil, true) + cache.AddPodGroupMember(tt.oldPod) + cache.genericWorkloadEnabled = tt.genericWorkloadEnabled + + newPod := tt.newPod + if newPod == nil { + newPod = tt.oldPod + } + cache.UpdatePodGroupMember(logger, tt.oldPod, newPod) + + if newPod.Spec.SchedulingGroup == nil { + if tt.expectInAssumedPods || tt.expectInUnscheduledPods || tt.expectInAssignedPods { + t.Errorf("Expected pod group to exist, but pod has no SchedulingGroup") + } + return + } + + podGroupState, err := cache.PodGroupStates().Get(newPod.Namespace, *newPod.Spec.SchedulingGroup.PodGroupName) + if err != nil { + return + } + + _, inUnscheduledPods := podGroupState.UnscheduledPods()[newPod.Name] + if inUnscheduledPods != tt.expectInUnscheduledPods { + t.Errorf("expected pod in UnscheduledPods: %v, got %v", tt.expectInUnscheduledPods, inUnscheduledPods) + } + + if inAssignedPods := podGroupState.AssignedPods().Has(newPod.UID); inAssignedPods != tt.expectInAssignedPods { + t.Errorf("expected pod in AssignedPods: %v, got %v", tt.expectInAssignedPods, inAssignedPods) + } + + if inAssumedPods := podGroupState.AssumedPods().Has(newPod.UID); inAssumedPods != tt.expectInAssumedPods { + t.Errorf("expected pod in AssumedPods: %v, got %v", tt.expectInAssumedPods, inAssumedPods) + } + + if !tt.genericWorkloadEnabled { + return + } + + podGroupKey := newPodGroupKey(newPod.Namespace, *newPod.Spec.SchedulingGroup.PodGroupName) + gotPod := cache.podGroupStates[podGroupKey].allPods[newPod.UID] + if diff := cmp.Diff(tt.newPod, gotPod); diff != "" { + t.Errorf("stored pod does not match newPod (-want +got):\n%s", diff) + } + }) + } +} + +func Test_RemovePodGroupMember(t *testing.T) { + podGroupName := "pg" + pod1 := st.MakePod().Namespace("namespace").Name("unscheduled-pod").UID("pod1"). + PodGroupName(podGroupName).Obj() + pod2 := st.MakePod().Namespace("namespace").Name("assigned-pod").UID("pod2").Node("node"). + PodGroupName(podGroupName).Obj() + + tests := []struct { + name string + initPods []*v1.Pod + podToDelete *v1.Pod + expectPodGroupStateCount int + genericWorkloadEnabled bool + }{ + { + name: "remove a pod from a group with multiple pods", + initPods: []*v1.Pod{pod1, pod2}, + podToDelete: pod1, + expectPodGroupStateCount: 1, + genericWorkloadEnabled: true, + }, + { + name: "remove a last pod from a group", + initPods: []*v1.Pod{pod1}, + podToDelete: pod1, + expectPodGroupStateCount: 0, + genericWorkloadEnabled: true, + }, + { + name: "remove a non-existent pod from a group should be a no-op", + podToDelete: pod1, + expectPodGroupStateCount: 0, + genericWorkloadEnabled: true, + }, + { + name: "remove a non-existent pod from a group should be a no-op", + podToDelete: pod1, + expectPodGroupStateCount: 0, + genericWorkloadEnabled: true, + }, + { + name: "remove a pod while generic workload disabled should be a no-op", + initPods: []*v1.Pod{pod1}, + expectPodGroupStateCount: 0, + podToDelete: pod1, + genericWorkloadEnabled: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cache := newCache(context.Background(), time.Second, nil, tt.genericWorkloadEnabled) + + for _, pod := range tt.initPods { + cache.AddPodGroupMember(pod) + } + + cache.RemovePodGroupMember(tt.podToDelete) + + podGroupStateCount := len(cache.podGroupStates) + if podGroupStateCount != tt.expectPodGroupStateCount { + t.Errorf("expected %d pod groups remaining, got %d", tt.expectPodGroupStateCount, podGroupStateCount) + } + + if podGroupStateCount == 0 { + return + } + + podGroupState, err := cache.PodGroupStates().Get(tt.podToDelete.Namespace, *tt.podToDelete.Spec.SchedulingGroup.PodGroupName) + if err != nil { + t.Fatalf("Unexpected error getting pod group state: %v", err) + } + + if podGroupState.AllPods().Has(tt.podToDelete.UID) { + t.Errorf("Expected pod %s to be deleted from pod group but it still exists", tt.podToDelete.UID) + } + }) + } +} + +// TestUpdatePodGroupStateSnapshot tests that pod group states of the snapshot have +// their data and generations updated properly. +func TestUpdatePodGroupStateSnapshot(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + cache := newCache(ctx, time.Second, nil, true) + + podGroupName1 := "pg1" + podGroupName2 := "pg2" + pod1 := st.MakePod().Namespace("ns").Name("pod1").UID("uid1").PodGroupName(podGroupName1).Obj() + pod2 := st.MakePod().Namespace("ns").Name("pod2").UID("uid2").PodGroupName(podGroupName1).Obj() + pod3 := st.MakePod().Namespace("ns").Name("pod3").UID("uid3").PodGroupName(podGroupName2).Obj() + + snapshot := NewEmptySnapshot() + + tests := []struct { + name string + action func() + expectedPods []*v1.Pod + }{ + { + name: "add a pod group member and update snapshot", + action: func() { cache.AddPodGroupMember(pod1) }, + expectedPods: []*v1.Pod{pod1}, + }, + { + name: "add a pod with different pod group and update snapshot", + action: func() { cache.AddPodGroupMember(pod3) }, + expectedPods: []*v1.Pod{pod1, pod3}, + }, + { + name: "remove a last pod group member and update snapshot", + action: func() { cache.RemovePodGroupMember(pod1) }, + expectedPods: []*v1.Pod{pod3}, + }, + { + name: "add a pod to a recently deleted pod group and update snapshot", + action: func() { cache.AddPodGroupMember(pod2) }, + expectedPods: []*v1.Pod{pod2, pod3}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Capture cache generations before snapshot update to detect which pod groups are going to be modified. + prevCacheGenerations := make(map[podGroupKey]int64, len(cache.podGroupStates)) + for key, pgs := range cache.podGroupStates { + prevCacheGenerations[key] = pgs.generation + } + + tt.action() + if err := cache.UpdateSnapshot(logger, snapshot); err != nil { + t.Fatalf("UpdateSnapshot failed: %v", err) + } + + // For each pod group that the action modified (its cache generation advanced), the snapshot generation must have advanced too. + // Unmodified pod groups keep their previous generation. + for key, pgs := range snapshot.podGroupStates { + cachePgs, ok := cache.podGroupStates[key] + if !ok { + continue + } + if cachePgs.generation > prevCacheGenerations[key] { + if pgs.generation <= prevCacheGenerations[key] { + t.Errorf("pod group %s was modified but snapshot generation (%d) was not incremented (%d)", key, pgs.generation, prevCacheGenerations[key]) + } + } + } + + expectedPodGroupStatesSnapshot := createPodGroupStates(tt.expectedPods) + if diff := cmp.Diff(expectedPodGroupStatesSnapshot, snapshot.podGroupStates, podGroupStateCmpOpts...); diff != "" { + t.Errorf("snapshot data mismatch (-want +got):\n%s", diff) + } + }) + } +} + +// Test_BindingPodGroupMember simulates binding and tests that when an assumed pod +// gets bound, its state within pod group transitions from assumed to assigned. +func Test_BindingPodGroupMember(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + cache := newCache(ctx, time.Second, nil, true) + podGroupName := "pg" + pod := st.MakePod().Namespace("namespace").Name("pod1").UID("pod1-uid"). + PodGroupName(podGroupName).Obj() + + // Simulate the informer firing an Add event for an unscheduled + // pod (no NodeName set) reflecting on PodGroupStates. + cache.AddPodGroupMember(pod) + + // Simulate the scheduler assuming the pod on a node. + assumedPod := pod.DeepCopy() + assumedPod.Spec.NodeName = "node1" + if err := cache.AssumePod(logger, assumedPod); err != nil { + t.Fatalf("AssumePod failed: %v", err) + } + + podGroupState, err := cache.PodGroupStates().Get(pod.Namespace, podGroupName) + if err != nil { + t.Fatalf("Unexpected error getting pod group state after AssumePod: %v", err) + } + if !podGroupState.AssumedPods().Has(assumedPod.UID) { + t.Errorf("Expected pod to be in AssumedPods after AssumePod") + } + if podGroupState.AssignedPods().Has(assumedPod.UID) { + t.Errorf("Expected pod NOT to be in AssignedPods after AssumePod") + } + + // Simulate binding confirmation: the informer fires an Add event with NodeName set. + if err := cache.AddPod(logger, assumedPod); err != nil { + t.Fatalf("AddPod (binding confirmation) failed: %v", err) + } + + podGroupState, err = cache.PodGroupStates().Get(pod.Namespace, podGroupName) + if err != nil { + t.Fatalf("Unexpected error getting pod group state after AddPod: %v", err) + } + if podGroupState.AssumedPods().Has(assumedPod.UID) { + t.Errorf("Expected pod not to be in AssumedPods after binding confirmation") + } + if !podGroupState.AssignedPods().Has(assumedPod.UID) { + t.Errorf("Expected pod to be in AssignedPods after binding confirmation") + } +} + // TestRemovePod tests after added pod is removed, its information should also be subtracted. func TestRemovePod(t *testing.T) { pod := makeBasePod(t, "node-1", "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}) @@ -720,7 +1110,7 @@ func TestRemovePod(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() nodeName := pod.Spec.NodeName - cache := newCache(ctx, time.Second, nil) + cache := newCache(ctx, time.Second, nil, false) // Add/Assume pod succeeds even before adding the nodes. if tt.assume { if err := cache.AddPod(logger, pod); err != nil { @@ -768,7 +1158,7 @@ func TestForgetPod(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() - cache := newCache(ctx, time.Second, nil) + cache := newCache(ctx, time.Second, nil, false) for _, pod := range pods { if err := cache.AssumePod(logger, pod); err != nil { t.Fatalf("assumePod failed: %v", err) @@ -981,7 +1371,7 @@ func TestNodeOperators(t *testing.T) { imageStates := buildImageStates(tc.nodes) expected := buildNodeInfo(node, tc.pods, imageStates) - cache := newCache(ctx, time.Second, nil) + cache := newCache(ctx, time.Second, nil, false) for _, nodeItem := range tc.nodes { cache.AddNode(logger, nodeItem) } @@ -1115,9 +1505,180 @@ func TestNodeOperators(t *testing.T) { } } +// TestPodGroupPodOperations tests that operations (Add, Update, Remove, Assume, Forget) on +// pods with pod group name properly update PodGroupStates only when GenericWorkload feature gate is enabled. +func TestPodGroupPodOperations(t *testing.T) { + groupName := "pg" + pod := st.MakePod().Namespace("test-ns").Name("pod-0").UID("uid-0"). + PodGroupName(groupName).Obj() + + type state struct { + podGroupStatesCount int + assignedCount int + unscheduledCount int + assumedCount int + } + + tests := []struct { + name string + genericWorkloadEnabled bool + setup func(*testing.T, *cacheImpl, context.Context) + operation func(*testing.T, *cacheImpl, context.Context) + expected state + }{ + { + name: "AddPod with GenericWorkload disabled", + genericWorkloadEnabled: false, + operation: func(t *testing.T, cache *cacheImpl, ctx context.Context) { + if err := cache.AddPod(klog.FromContext(ctx), pod); err != nil { + t.Fatalf("AddPod failed: %v", err) + } + }, + expected: state{podGroupStatesCount: 0}, + }, + { + name: "AddPod with GenericWorkload enabled", + genericWorkloadEnabled: true, + operation: func(t *testing.T, cache *cacheImpl, ctx context.Context) { + if err := cache.AddPod(klog.FromContext(ctx), pod); err != nil { + t.Fatalf("AddPod failed: %v", err) + } + }, + expected: state{podGroupStatesCount: 1, unscheduledCount: 1, assignedCount: 0, assumedCount: 0}, + }, + { + name: "AssumePod with GenericWorkload disabled", + genericWorkloadEnabled: false, + operation: func(t *testing.T, cache *cacheImpl, ctx context.Context) { + if err := cache.AssumePod(klog.FromContext(ctx), pod); err != nil { + t.Fatalf("AssumePod failed: %v", err) + } + }, + expected: state{podGroupStatesCount: 0}, + }, + { + name: "AssumePod with GenericWorkload enabled", + genericWorkloadEnabled: true, + operation: func(t *testing.T, cache *cacheImpl, ctx context.Context) { + if err := cache.AssumePod(klog.FromContext(ctx), pod); err != nil { + t.Fatalf("AssumePod failed: %v", err) + } + }, + expected: state{podGroupStatesCount: 1, assignedCount: 0, unscheduledCount: 0, assumedCount: 1}, + }, + { + name: "ForgetPod with GenericWorkload disabled", + genericWorkloadEnabled: false, + setup: func(t *testing.T, cache *cacheImpl, ctx context.Context) { + if err := cache.AssumePod(klog.FromContext(ctx), pod); err != nil { + t.Fatalf("AssumePod failed: %v", err) + } + }, + operation: func(t *testing.T, cache *cacheImpl, ctx context.Context) { + if err := cache.ForgetPod(klog.FromContext(ctx), pod); err != nil { + t.Fatalf("ForgetPod failed: %v", err) + } + }, + expected: state{podGroupStatesCount: 1, assignedCount: 0, unscheduledCount: 0, assumedCount: 1}, + }, + { + name: "ForgetPod with GenericWorkload enabled", + genericWorkloadEnabled: true, + setup: func(t *testing.T, cache *cacheImpl, ctx context.Context) { + if err := cache.AssumePod(klog.FromContext(ctx), pod); err != nil { + t.Fatalf("AssumePod failed: %v", err) + } + }, + operation: func(t *testing.T, cache *cacheImpl, ctx context.Context) { + if err := cache.ForgetPod(klog.FromContext(ctx), pod); err != nil { + t.Fatalf("ForgetPod failed: %v", err) + } + }, + expected: state{podGroupStatesCount: 1, unscheduledCount: 1}, + }, + { + name: "RemovePod with GenericWorkload disabled", + genericWorkloadEnabled: false, + setup: func(t *testing.T, cache *cacheImpl, ctx context.Context) { + if err := cache.AddPod(klog.FromContext(ctx), pod); err != nil { + t.Fatalf("AddPod failed: %v", err) + } + }, + operation: func(t *testing.T, cache *cacheImpl, ctx context.Context) { + if err := cache.RemovePod(klog.FromContext(ctx), pod); err != nil { + t.Fatalf("RemovePod failed: %v", err) + } + }, + expected: state{podGroupStatesCount: 1, assignedCount: 0, unscheduledCount: 1, assumedCount: 0}, + }, + { + name: "RemovePod with GenericWorkload enabled", + genericWorkloadEnabled: true, + setup: func(t *testing.T, cache *cacheImpl, ctx context.Context) { + if err := cache.AddPod(klog.FromContext(ctx), pod); err != nil { + t.Fatalf("AddPod failed: %v", err) + } + }, + operation: func(t *testing.T, cache *cacheImpl, ctx context.Context) { + if err := cache.RemovePod(klog.FromContext(ctx), pod); err != nil { + t.Fatalf("RemovePod failed: %v", err) + } + }, + expected: state{podGroupStatesCount: 0}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Initialize cache with feature gate enabled to ensure group state is + // properly established for operations that require it. + cache := newCache(ctx, time.Second, nil, true) + if tt.setup != nil { + tt.setup(t, cache, ctx) + } + cache.genericWorkloadEnabled = tt.genericWorkloadEnabled + tt.operation(t, cache, ctx) + + if count := len(cache.podGroupStates); count != tt.expected.podGroupStatesCount { + t.Errorf("expected %d pod group states, got %d", tt.expected.podGroupStatesCount, count) + } + + if tt.expected.podGroupStatesCount == 0 { + return + } + + pgs, err := cache.PodGroupStates().Get("test-ns", groupName) + if err != nil { + t.Fatalf("unexpected error getting pod group state: %v", err) + } + + assignedCount := pgs.AssignedPods().Len() + if assignedCount != tt.expected.assignedCount { + t.Errorf("expected %d pods in assignedPods, got %d", tt.expected.assignedCount, assignedCount) + } + + unscheduledCount := len(pgs.UnscheduledPods()) + if unscheduledCount != tt.expected.unscheduledCount { + t.Errorf("expected %d pods in unscheduledPods, got %d", tt.expected.unscheduledCount, unscheduledCount) + } + + assumedCount := pgs.AssumedPods().Len() + if assumedCount != tt.expected.assumedCount { + t.Errorf("expected %d pods in assumedPods, got %d", tt.expected.assumedCount, assumedCount) + } + }) + } +} + func TestSchedulerCache_UpdateSnapshot(t *testing.T) { logger, _ := ktesting.NewTestContext(t) + var podGroupName = "pg" + // Create a few nodes to be used in tests. var nodes []*v1.Node for i := 0; i < 10; i++ { @@ -1175,6 +1736,15 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { PVC(fmt.Sprintf("test-pvc%v", pvcID)).Node(fmt.Sprintf("test-node%v", node)).Obj() } + // Add a few pods with a pod group name + var podsWithPodGroupName []*v1.Pod + for i := range 20 { + pod := st.MakePod().Name(fmt.Sprintf("p-podgroup-%v", i)).Namespace("test-ns").UID(fmt.Sprintf("puid-podgroup-%v", i)). + PodGroupName(fmt.Sprintf("%s-%v", podGroupName, i)). + Node(fmt.Sprintf("test-node%v", i)).Obj() + podsWithPodGroupName = append(podsWithPodGroupName, pod) + } + var cache *cacheImpl var snapshot *Snapshot type operation = func(t *testing.T) @@ -1218,6 +1788,13 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { } } } + addPodWithPodGroupName := func(i int) operation { + return func(t *testing.T) { + if err := cache.AddPod(logger, podsWithPodGroupName[i]); err != nil { + t.Error(err) + } + } + } removePod := func(i int) operation { return func(t *testing.T) { if err := cache.RemovePod(logger, pods[i]); err != nil { @@ -1240,6 +1817,13 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { } } } + removePodWithPodGroupName := func(i int) operation { + return func(t *testing.T) { + if err := cache.RemovePod(logger, podsWithPodGroupName[i]); err != nil { + t.Error(err) + } + } + } updatePod := func(i int) operation { return func(t *testing.T) { if err := cache.UpdatePod(logger, pods[i], updatedPods[i]); err != nil { @@ -1247,6 +1831,13 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { } } } + updatePodWithPodGroupName := func(i int) operation { + return func(t *testing.T) { + if err := cache.UpdatePod(logger, podsWithPodGroupName[i], podsWithPodGroupName[i]); err != nil { + t.Error(err) + } + } + } assumePod := func(i int) operation { return func(t *testing.T) { if err := cache.AssumePod(logger, pods[i]); err != nil { @@ -1348,11 +1939,12 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { } tests := []struct { - name string - operations []operation - expected []*v1.Node - expectedHavePodsWithAffinity int - expectedUsedPVCSet sets.Set[string] + name string + operations []operation + expected []*v1.Node + expectedHavePodsWithAffinity int + expectedPodGroupStatesSnapshot map[podGroupKey]*podGroupStateSnapshot + expectedUsedPVCSet sets.Set[string] }{ { name: "Empty cache", @@ -1547,6 +2139,33 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { expected: []*v1.Node{nodes[1], nodes[0]}, expectedUsedPVCSet: sets.New("test-ns/test-pvc2", "test-ns/test-pvc3"), }, + { + name: "Add, Update and Remove multiple pods with SchedulingGroup", + operations: []operation{ + addNode(0), addNode(1), addNode(2), addPodWithPodGroupName(0), addPodWithPodGroupName(1), + addPodWithPodGroupName(2), updateSnapshot(), + updatePodWithPodGroupName(0), removePodWithPodGroupName(1), updateSnapshot(), + }, + expected: []*v1.Node{nodes[1], nodes[0], nodes[2]}, + expectedPodGroupStatesSnapshot: map[podGroupKey]*podGroupStateSnapshot{ + newPodGroupKey("test-ns", "pg-0"): { + podGroupStateData: podGroupStateData{ + allPods: map[types.UID]*v1.Pod{"puid-podgroup-0": podsWithPodGroupName[0]}, + assignedPods: sets.New[types.UID]("puid-podgroup-0"), + unscheduledPods: sets.New[types.UID](), + assumedPods: sets.New[types.UID](), + }, + }, + newPodGroupKey("test-ns", "pg-2"): { + podGroupStateData: podGroupStateData{ + allPods: map[types.UID]*v1.Pod{"puid-podgroup-2": podsWithPodGroupName[2]}, + assignedPods: sets.New[types.UID]("puid-podgroup-2"), + unscheduledPods: sets.New[types.UID](), + assumedPods: sets.New[types.UID](), + }, + }, + }, + }, { name: "Add and Remove multiple pods with PVC", operations: []operation{ @@ -1596,7 +2215,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { _, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() - cache = newCache(ctx, time.Second, nil) + cache = newCache(ctx, time.Second, nil, true) snapshot = NewEmptySnapshot() for _, op := range test.operations { @@ -1619,6 +2238,11 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { t.Errorf("Not all the nodes were visited by following the NodeInfo linked list. Expected to see %v nodes, saw %v.", len(cache.nodes), i) } + // Check pod group states in the snapshot. + if diff := cmp.Diff(test.expectedPodGroupStatesSnapshot, snapshot.podGroupStates, podGroupStateCmpOpts...); diff != "" { + t.Errorf("unexpected podGroupStates in snapshot (-want, +got):\n%s", diff) + } + // Check number of nodes with pods with affinity if len(snapshot.havePodsWithAffinityNodeInfoList) != test.expectedHavePodsWithAffinity { t.Errorf("unexpected number of HavePodsWithAffinity nodes. Expected: %v, got: %v", test.expectedHavePodsWithAffinity, len(snapshot.havePodsWithAffinityNodeInfoList)) @@ -1830,7 +2454,7 @@ func TestSchedulerCache_updateNodeInfoSnapshotList(t *testing.T) { _, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() - cache = newCache(ctx, time.Second, nil) + cache = newCache(ctx, time.Second, nil, false) snapshot = NewEmptySnapshot() test.operations(t) @@ -1911,7 +2535,7 @@ func setupCacheOf1kNodes30kPods(b *testing.B) Cache { logger, ctx := ktesting.NewTestContext(b) ctx, cancel := context.WithCancel(ctx) defer cancel() - cache := newCache(ctx, time.Second, nil) + cache := newCache(ctx, time.Second, nil, false) for i := 0; i < 1000; i++ { nodeName := fmt.Sprintf("node-%d", i) cache.AddNode(logger, st.MakeNode().Name(nodeName).Obj()) diff --git a/pkg/scheduler/backend/cache/interface.go b/pkg/scheduler/backend/cache/interface.go index ad3d7d94d4c..f4403d917c6 100644 --- a/pkg/scheduler/backend/cache/interface.go +++ b/pkg/scheduler/backend/cache/interface.go @@ -20,6 +20,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" + fwk "k8s.io/kube-scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -107,6 +108,18 @@ type Cache interface { // BindPod handles the pod binding by adding a bind API call to the dispatcher. // This method should be used only if the SchedulerAsyncAPICalls feature gate is enabled. BindPod(binding *v1.Binding) (<-chan error, error) + + // PodGroupStates returns a PodGroupStateLister. + PodGroupStates() fwk.PodGroupStateLister + + // AddPodGroupMember adds not assigned and not assumed pod to its pod group state. + AddPodGroupMember(pod *v1.Pod) + + // UpdatePodGroupMember updates a pod in its pod group state. + UpdatePodGroupMember(logger klog.Logger, oldPod, newPod *v1.Pod) + + // RemovePodGroupMember removes a pod from its pod group state. + RemovePodGroupMember(pod *v1.Pod) } // Dump is a dump of the cache state. diff --git a/pkg/scheduler/backend/cache/podgroupstate.go b/pkg/scheduler/backend/cache/podgroupstate.go new file mode 100644 index 00000000000..a5c8b43dff8 --- /dev/null +++ b/pkg/scheduler/backend/cache/podgroupstate.go @@ -0,0 +1,354 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "maps" + "sync" + "sync/atomic" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" +) + +var generation atomic.Int64 + +// nextPodGroupGeneration increments generation numbers monotonically for a pod group state (instead of per-instance increment) +// to prevent generation reset or collision when a pod group is deleted and recreated with the same name. +func nextPodGroupGeneration() int64 { + return generation.Add(1) +} + +// podGroupKey uniquely identifies a specific instance of a PodGroup. +type podGroupKey struct { + name string + namespace string +} + +func (pgk podGroupKey) GetName() string { + return pgk.name +} + +func (pgk podGroupKey) GetNamespace() string { + return pgk.namespace +} + +func (pgk podGroupKey) String() string { + return pgk.namespace + "/" + pgk.GetName() +} + +var _ klog.KMetadata = &podGroupKey{} + +func newPodGroupKey(namespace string, name string) podGroupKey { + return podGroupKey{ + namespace: namespace, + name: name, + } +} + +// podGroupStateData holds data and functionality shared between podGroupState and podGroupStateSnapshot. +type podGroupStateData struct { + // generation gets bumped whenever the data is changed. + // It's used to detect changes and avoid unnecessary cloning when taking a snapshot. + generation int64 + // allPods tracks all pods belonging to the group that are known to the scheduler. + allPods map[types.UID]*v1.Pod + // unscheduledPods tracks all pods that are unscheduled for this group, + // i.e., are neither assumed nor assigned. + unscheduledPods sets.Set[types.UID] + // assumedPods tracks pods that have reached the Reserve stage and are waiting + // for the rest of the gang to arrive before being allowed to bind. + assumedPods sets.Set[types.UID] + // assignedPods tracks all pods belonging to the group that are assigned (bound). + assignedPods sets.Set[types.UID] +} + +func newPodGroupStateData() podGroupStateData { + return podGroupStateData{ + allPods: make(map[types.UID]*v1.Pod), + unscheduledPods: sets.New[types.UID](), + assumedPods: sets.New[types.UID](), + assignedPods: sets.New[types.UID](), + } +} + +// addPod adds the pod to this group. +// Depending on the NodeName, it can insert the pod into either assignedPods or unscheduledPods. +func (d *podGroupStateData) addPod(pod *v1.Pod) { + d.generation = nextPodGroupGeneration() + d.allPods[pod.UID] = pod + if pod.Spec.NodeName != "" { + d.assignedPods.Insert(pod.UID) + } else { + d.unscheduledPods.Insert(pod.UID) + } +} + +// updatePod updates the pod in this group. +// In case of binding, it moves the pod to assignedPods. +func (d *podGroupStateData) updatePod(oldPod, newPod *v1.Pod) { + d.generation = nextPodGroupGeneration() + d.allPods[newPod.UID] = newPod + if oldPod.Spec.NodeName == "" && newPod.Spec.NodeName != "" { + d.assignedPods.Insert(newPod.UID) + // Clear pod from unscheduled and assumed when it is assigned. + d.unscheduledPods.Delete(newPod.UID) + d.assumedPods.Delete(newPod.UID) + } +} + +// deletePod removes the pod from this pod group state. +func (d *podGroupStateData) deletePod(podUID types.UID) { + d.generation = nextPodGroupGeneration() + delete(d.allPods, podUID) + d.unscheduledPods.Delete(podUID) + d.assumedPods.Delete(podUID) + d.assignedPods.Delete(podUID) +} + +// assumePod marks a pod as assumed within the pod group state. +func (d *podGroupStateData) assumePod(podUID types.UID) { + + pod := d.allPods[podUID] + // A scheduling pod may be removed from the cluster. + // In that case, we just ignore it. + if pod == nil { + return + } + + d.generation = nextPodGroupGeneration() + // If the pod is already assigned, put it into assignedPods. + // Otherwise put it to assumedPods. + if pod.Spec.NodeName != "" { + d.assignedPods.Insert(pod.UID) + } else { + d.assumedPods.Insert(pod.UID) + } + d.unscheduledPods.Delete(pod.UID) +} + +// forgetPod moves a pod back from the assumed state to unscheduled within the pod group state. +func (d *podGroupStateData) forgetPod(podUID types.UID) { + + pod := d.allPods[podUID] + // A scheduling pod may be removed from the cluster. + // In that case, we just ignore it. + if pod == nil { + return + } + + d.generation = nextPodGroupGeneration() + + d.assumedPods.Delete(podUID) + + // If the pod is already assigned, put it into assignedPods. + // Otherwise, put it into unscheduledPods. + if pod.Spec.NodeName != "" { + d.assignedPods.Insert(podUID) + } else { + d.unscheduledPods.Insert(podUID) + } +} + +// scheduledPods returns the pods that are either assumed or assigned for this pod group. +func (d *podGroupStateData) scheduledPods() []*v1.Pod { + scheduledPods := make([]*v1.Pod, 0, len(d.assignedPods)+len(d.assumedPods)) + for uid := range d.assignedPods { + scheduledPods = append(scheduledPods, d.allPods[uid]) + } + for uid := range d.assumedPods { + scheduledPods = append(scheduledPods, d.allPods[uid]) + } + return scheduledPods +} + +// empty returns true when the pod group state contains no pods. +func (d *podGroupStateData) empty() bool { + return len(d.allPods) == 0 +} + +// allPodsCount returns the number of all pods known to the scheduler for this group. +func (d *podGroupStateData) allPodsCount() int { + return len(d.allPods) +} + +// scheduledPodsCount returns the number of pods for this group that are either assumed or assigned. +func (d *podGroupStateData) scheduledPodsCount() int { + return len(d.assumedPods) + len(d.assignedPods) +} + +// deepCopy returns a deep copy of the pod group state data. +func (d *podGroupStateData) deepCopy() podGroupStateData { + return podGroupStateData{ + generation: d.generation, + allPods: maps.Clone(d.allPods), + unscheduledPods: d.unscheduledPods.Clone(), + assumedPods: d.assumedPods.Clone(), + assignedPods: d.assignedPods.Clone(), + } +} + +// unscheduledPodsMap returns all unscheduled pods for this pod group. +func (d *podGroupStateData) unscheduledPodsMap() map[string]*v1.Pod { + result := make(map[string]*v1.Pod, len(d.unscheduledPods)) + for podUID := range d.unscheduledPods { + pod := d.allPods[podUID] + result[pod.Name] = pod + } + return result +} + +// podGroupState holds the runtime state of a pod group. +type podGroupState struct { + lock sync.RWMutex + podGroupStateData +} + +func newPodGroupState() *podGroupState { + return &podGroupState{podGroupStateData: newPodGroupStateData()} +} + +// snapshot returns a deep copy of the live pod group state as an immutable snapshot. +// It must be called under the cache lock. +func (pgs *podGroupState) snapshot() *podGroupStateSnapshot { + return &podGroupStateSnapshot{podGroupStateData: pgs.podGroupStateData.deepCopy()} +} + +// empty returns true when the group contains no pods. +// It must be called under the cache lock. +func (pgs *podGroupState) empty() bool { + return pgs.podGroupStateData.empty() +} + +// forgetPod moves a pod back from the assumed state to unscheduled. +// It must be called under the cache lock. +func (pgs *podGroupState) forgetPod(podUID types.UID) { + pgs.podGroupStateData.forgetPod(podUID) +} + +// AllPods returns the UIDs of all pods known to the scheduler for this group. +func (pgs *podGroupState) AllPods() sets.Set[types.UID] { + pgs.lock.RLock() + defer pgs.lock.RUnlock() + + return sets.KeySet(pgs.podGroupStateData.allPods) +} + +// AllPodsCount returns the number of all pods known to the scheduler for this group. +func (pgs *podGroupState) AllPodsCount() int { + pgs.lock.RLock() + defer pgs.lock.RUnlock() + + return pgs.podGroupStateData.allPodsCount() +} + +// UnscheduledPods returns all pods that are unscheduled for this group, +// i.e., are neither assumed nor assigned. +// The returned map type corresponds to the argument of the PodActivator.Activate method. +func (pgs *podGroupState) UnscheduledPods() map[string]*v1.Pod { + pgs.lock.RLock() + defer pgs.lock.RUnlock() + + return pgs.podGroupStateData.unscheduledPodsMap() +} + +// AssumedPods returns the UIDs of all pods for this group in the assumed state, +// i.e., that have passed the Reserve stage. +func (pgs *podGroupState) AssumedPods() sets.Set[types.UID] { + pgs.lock.RLock() + defer pgs.lock.RUnlock() + + return pgs.podGroupStateData.assumedPods.Clone() +} + +// AssignedPods returns the UIDs of all pods already assigned (bound) for this group. +func (pgs *podGroupState) AssignedPods() sets.Set[types.UID] { + pgs.lock.RLock() + defer pgs.lock.RUnlock() + + return pgs.podGroupStateData.assignedPods.Clone() +} + +// ScheduledPods returns the pods that are either assumed or assigned for this pod group. +func (pgs *podGroupState) ScheduledPods() []*v1.Pod { + pgs.lock.RLock() + defer pgs.lock.RUnlock() + + return pgs.podGroupStateData.scheduledPods() +} + +// ScheduledPodsCount returns the number of pods for this group that are either assumed or assigned. +func (pgs *podGroupState) ScheduledPodsCount() int { + pgs.lock.RLock() + defer pgs.lock.RUnlock() + + return pgs.podGroupStateData.scheduledPodsCount() +} + +// podGroupStateSnapshot is an immutable, point-in-time copy of a podGroupState. +// It is taken before a pod group scheduling cycle and used to track states of pods +// during the cycle without modifying the live state of pods. +type podGroupStateSnapshot struct { + podGroupStateData +} + +// assumePod marks a pod within the pod group state snapshot as assumed. +func (s *podGroupStateSnapshot) assumePod(podUID types.UID) { + s.podGroupStateData.assumePod(podUID) +} + +// forgetPod removes a pod from the assumed state within the snapshot. +func (s *podGroupStateSnapshot) forgetPod(podUID types.UID) { + s.podGroupStateData.forgetPod(podUID) +} + +// AllPods returns the UIDs of all pods known to the scheduler for this group. +func (s *podGroupStateSnapshot) AllPods() sets.Set[types.UID] { + return sets.KeySet(s.podGroupStateData.allPods) +} + +// UnscheduledPods returns all pods that are unscheduled for this group. +func (s *podGroupStateSnapshot) UnscheduledPods() map[string]*v1.Pod { + return s.podGroupStateData.unscheduledPodsMap() +} + +// AssumedPods returns the UIDs of all assumed pods for this group. +func (s *podGroupStateSnapshot) AssumedPods() sets.Set[types.UID] { + return s.podGroupStateData.assumedPods +} + +// AssignedPods returns the UIDs of all assigned (bound) pods for this group. +func (s *podGroupStateSnapshot) AssignedPods() sets.Set[types.UID] { + return s.podGroupStateData.assignedPods +} + +// ScheduledPods returns the pods that are either assumed or assigned for this pod group. +func (s *podGroupStateSnapshot) ScheduledPods() []*v1.Pod { + return s.podGroupStateData.scheduledPods() +} + +// AllPodsCount returns the number of all pods known to the scheduler for this group. +func (s *podGroupStateSnapshot) AllPodsCount() int { + return s.podGroupStateData.allPodsCount() +} + +// ScheduledPodsCount returns the number of pods for this group that are either assumed or assigned. +func (s *podGroupStateSnapshot) ScheduledPodsCount() int { + return s.podGroupStateData.scheduledPodsCount() +} diff --git a/pkg/scheduler/backend/podgroupmanager/podgroupstate_test.go b/pkg/scheduler/backend/cache/podgroupstate_test.go similarity index 63% rename from pkg/scheduler/backend/podgroupmanager/podgroupstate_test.go rename to pkg/scheduler/backend/cache/podgroupstate_test.go index 95ce3422112..afec094a3db 100644 --- a/pkg/scheduler/backend/podgroupmanager/podgroupstate_test.go +++ b/pkg/scheduler/backend/cache/podgroupstate_test.go @@ -14,14 +14,14 @@ See the License for the specific language governing permissions and limitations under the License. */ -package podgroupmanager +package cache import ( "testing" - "time" + "github.com/google/go-cmp/cmp" + v1 "k8s.io/api/core/v1" st "k8s.io/kubernetes/pkg/scheduler/testing" - "k8s.io/utils/ptr" ) func TestPodGroupState_AssumeForget(t *testing.T) { @@ -36,7 +36,7 @@ func TestPodGroupState_AssumeForget(t *testing.T) { t.Fatal("Pod should be initially in UnscheduledPods") } - pgs.AssumePod(pod.UID) + pgs.assumePod(pod.UID) if !pgs.AssumedPods().Has(pod.UID) { t.Fatal("Pod should be in AssumedPods after AssumePod") } @@ -44,7 +44,7 @@ func TestPodGroupState_AssumeForget(t *testing.T) { t.Fatal("UnscheduledPods should be empty after AssumePod") } - pgs.ForgetPod(pod.UID) + pgs.forgetPod(pod.UID) if pgs.AssumedPods().Has(pod.UID) { t.Fatal("Pod should not be in AssumedPods after ForgetPod") } @@ -53,41 +53,52 @@ func TestPodGroupState_AssumeForget(t *testing.T) { } } -func TestPodGroupState_SchedulingTimeout(t *testing.T) { +func TestPodGroupState_Clone(t *testing.T) { pgs := newPodGroupState() - timeout := pgs.SchedulingTimeout() - if pgs.schedulingDeadline == nil { - t.Fatal("Scheduling deadline should be set after SchedulingTimeout call, but is nil") - } - if timeout <= 0 { - t.Errorf("Expected positive timeout duration, got %v", timeout) + pod1 := st.MakePod().Namespace("ns1").Name("p1").UID("p1"). + PodGroupName("pg").Obj() + pod2 := st.MakePod().Namespace("ns1").Name("p2").UID("p2"). + PodGroupName("pg").Obj() + + pgs.addPod(pod1) + pgs.addPod(pod2) + pgs.assumePod(pod2.UID) + + snap := pgs.snapshot() + + // Clone has the same generation. + if snap.generation != pgs.generation { + t.Errorf("expected clone generation %d, got %d", pgs.generation, snap.generation) } - // Sleep for a while to ensure that the time has increased, - // especially when testing on Windows machines with lower resolution. - time.Sleep(10 * time.Millisecond) - - deadline := *pgs.schedulingDeadline - newTimeout := pgs.SchedulingTimeout() - if !deadline.Equal(*pgs.schedulingDeadline) { - t.Errorf("Previous deadline should not be changed: previous: %v, current: %v", deadline, *pgs.schedulingDeadline) - } - if newTimeout >= timeout { - t.Errorf("Expected lower timeout duration: previous: %v, current: %v", timeout, newTimeout) + // Clone contains both pods. + if !snap.AllPods().Has(pod1.UID) || !snap.AllPods().Has(pod2.UID) { + t.Error("expected both pods in clone's AllPods") } - // Sleep for a while to ensure that the time has increased, - // especially when testing on Windows machines with lower resolution. - time.Sleep(10 * time.Millisecond) - - pgs.schedulingDeadline = ptr.To(time.Now().Add(-1 * time.Second)) - newTimeout = pgs.SchedulingTimeout() - if deadline.Equal(*pgs.schedulingDeadline) { - t.Error("Deadline should be reset after it has expired, but it wasn't") + // Clone preserves pod1 as unscheduled. + if _, ok := snap.UnscheduledPods()[pod1.Name]; !ok { + t.Error("expected pod1 in clone's UnscheduledPods") } - if newTimeout <= 0 { - t.Errorf("Expected positive timeout duration after reset, got %v", timeout) + + // Clone preserves pod2 as assumed. + if !snap.AssumedPods().Has(pod2.UID) { + t.Error("expected pod2 in clone's AssumedPods") + } + + // Mutating the clone does not affect the original. + snap.assumePod(pod1.UID) + if pgs.assumedPods.Has(pod1.UID) { + t.Error("mutation to clone should not affect original's assumedPods") + } + + // Mutating the original does not affect the clone. + pod3 := st.MakePod().Namespace("ns1").Name("p3").UID("p3"). + PodGroupName("pg").Obj() + pgs.addPod(pod3) + if snap.AllPods().Has(pod3.UID) { + t.Error("mutation to original should not affect clone's AllPods") } } @@ -121,7 +132,7 @@ func TestPodGroupState_PodCounts(t *testing.T) { } // Assuming a pod should move it from unscheduled to assumed, increasing the count of scheduled pods. - pgs.AssumePod(pod1.UID) + pgs.assumePod(pod1.UID) if count := pgs.AllPodsCount(); count != 3 { t.Errorf("Expected AllPodsCount to be 3, got %d", count) } @@ -130,7 +141,7 @@ func TestPodGroupState_PodCounts(t *testing.T) { } // Assuming a pod that is already scheduled should not change the counts. - pgs.AssumePod(pod3.UID) + pgs.assumePod(pod3.UID) if count := pgs.AllPodsCount(); count != 3 { t.Errorf("Expected AllPodsCount to be 3, got %d", count) } @@ -139,7 +150,7 @@ func TestPodGroupState_PodCounts(t *testing.T) { } // Assuming a pod that is not in the state should not change the counts. - pgs.AssumePod(pod4.UID) + pgs.assumePod(pod4.UID) if count := pgs.AllPodsCount(); count != 3 { t.Errorf("Expected AllPodsCount to be 3, got %d", count) } @@ -148,7 +159,7 @@ func TestPodGroupState_PodCounts(t *testing.T) { } // Forgetting a pod that is already scheduled should not change the counts. - pgs.ForgetPod(pod3.UID) + pgs.forgetPod(pod3.UID) if count := pgs.AllPodsCount(); count != 3 { t.Errorf("Expected AllPodsCount to be 3, got %d", count) } @@ -158,7 +169,7 @@ func TestPodGroupState_PodCounts(t *testing.T) { // Forgetting a pod that is in the assumed state should move it back to unscheduled, // decreasing the count of scheduled pods. - pgs.ForgetPod(pod1.UID) + pgs.forgetPod(pod1.UID) if count := pgs.AllPodsCount(); count != 3 { t.Errorf("Expected AllPodsCount to be 3, got %d", count) } @@ -167,7 +178,7 @@ func TestPodGroupState_PodCounts(t *testing.T) { } // Forgetting a pod that is not assumed should not change the counts. - pgs.ForgetPod(pod1.UID) + pgs.forgetPod(pod1.UID) if count := pgs.AllPodsCount(); count != 3 { t.Errorf("Expected AllPodsCount to be 3, got %d", count) } @@ -176,7 +187,7 @@ func TestPodGroupState_PodCounts(t *testing.T) { } // Assuming a pod again should move it back to assumed, increasing the count of scheduled pods. - pgs.AssumePod(pod2.UID) + pgs.assumePod(pod2.UID) if count := pgs.AllPodsCount(); count != 3 { t.Errorf("Expected AllPodsCount to be 3, got %d", count) } @@ -185,7 +196,7 @@ func TestPodGroupState_PodCounts(t *testing.T) { } // Forgetting a pod that is not in the state should not change the counts. - pgs.ForgetPod(pod4.UID) + pgs.forgetPod(pod4.UID) if count := pgs.AllPodsCount(); count != 3 { t.Errorf("Expected AllPodsCount to be 3, got %d", count) } @@ -193,3 +204,37 @@ func TestPodGroupState_PodCounts(t *testing.T) { t.Errorf("Expected ScheduledPodsCount to be 2, got %d", count) } } + +// TestPodGroupState_ScheduledPods tests that ScheduledPods returns pods that +// are currently either assumed or assigned altogether. +func TestPodGroupState_ScheduledPods(t *testing.T) { + + pgs := newPodGroupState() + unscheduledPod := st.MakePod().Namespace("ns").Name("p1").UID("p1"). + PodGroupName("pg").Obj() + assumedPod := st.MakePod().Namespace("ns").Name("p2").UID("p2"). + PodGroupName("pg").Obj() + assignedPod := st.MakePod().Namespace("ns").Name("p3").UID("p3").Node("node1"). + PodGroupName("pg").Obj() + + pgs.addPod(assignedPod) + pgs.addPod(unscheduledPod) + pgs.addPod(assumedPod) + + pgs.assumePod(assumedPod.UID) + scheduledPods := pgs.ScheduledPods() + + snapshot := pgs.snapshot() + pgs.assumePod(unscheduledPod.UID) + snapshotScheduledPods := snapshot.ScheduledPods() + + expectedScheduledPods := []*v1.Pod{assignedPod, assumedPod} + + if diff := cmp.Diff(expectedScheduledPods, scheduledPods); diff != "" { + t.Errorf("unexpected ScheduledPods result (-want,+got):\n%s", diff) + } + + if diff := cmp.Diff(expectedScheduledPods, snapshotScheduledPods); diff != "" { + t.Errorf("unexpected snapshot ScheduledPods result (-want,+got):\n%s", diff) + } +} diff --git a/pkg/scheduler/backend/cache/snapshot.go b/pkg/scheduler/backend/cache/snapshot.go index 06a6a3df900..358bc1177ae 100644 --- a/pkg/scheduler/backend/cache/snapshot.go +++ b/pkg/scheduler/backend/cache/snapshot.go @@ -22,8 +22,10 @@ import ( v1 "k8s.io/api/core/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog/v2" fwk "k8s.io/kube-scheduler/framework" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -57,12 +59,15 @@ type Snapshot struct { // assumedPods maps a pod key to an assumed pod object during a single pod group scheduling cycle. // This map should be emptied before the next cycle starts. assumedPods map[string]*v1.Pod - + // podGroupStates maps a pod group key to a snapshot of its state, used during a pod group scheduling cycle. + podGroupStates map[podGroupKey]*podGroupStateSnapshot // placementNodes stores nodes that are present in the current placement. // If placement is not set, this is nil. // It should only be set in the pod group scheduling cycle, when checking if pod group can be scheduled within the placement. // This field should be cleared once the pod group has been checked for the placement. placementNodes *placementNodes + // genericWorkloadEnabled stores the GenericWorkload feature gate value. + genericWorkloadEnabled bool } var _ fwk.SharedLister = &Snapshot{} @@ -70,9 +75,11 @@ var _ fwk.SharedLister = &Snapshot{} // NewEmptySnapshot initializes a Snapshot struct and returns it. func NewEmptySnapshot() *Snapshot { return &Snapshot{ - nodeInfoMap: make(map[string]*framework.NodeInfo), - usedPVCSet: sets.New[string](), - assumedPods: make(map[string]*v1.Pod), + nodeInfoMap: make(map[string]*framework.NodeInfo), + usedPVCSet: sets.New[string](), + assumedPods: make(map[string]*v1.Pod), + podGroupStates: make(map[podGroupKey]*podGroupStateSnapshot), + genericWorkloadEnabled: utilfeature.DefaultFeatureGate.Enabled(features.GenericWorkload), } } @@ -98,10 +105,31 @@ func NewSnapshot(pods []*v1.Pod, nodes []*v1.Node) *Snapshot { s.havePodsWithAffinityNodeInfoList = havePodsWithAffinityNodeInfoList s.havePodsWithRequiredAntiAffinityNodeInfoList = havePodsWithRequiredAntiAffinityNodeInfoList s.usedPVCSet = createUsedPVCSet(pods) + if s.genericWorkloadEnabled { + s.podGroupStates = createPodGroupStates(pods) + } return s } +// createPodGroupStates builds the initial pod group state snapshot map from a list of pods. +func createPodGroupStates(pods []*v1.Pod) map[podGroupKey]*podGroupStateSnapshot { + podGroupStates := make(map[podGroupKey]*podGroupStateSnapshot) + for _, pod := range pods { + if pod.Spec.SchedulingGroup == nil { + continue + } + key := newPodGroupKey(pod.Namespace, *pod.Spec.SchedulingGroup.PodGroupName) + pgs, ok := podGroupStates[key] + if !ok { + pgs = &podGroupStateSnapshot{podGroupStateData: newPodGroupStateData()} + podGroupStates[key] = pgs + } + pgs.addPod(pod) + } + return podGroupStates +} + // createNodeInfoMap obtains a list of pods and pivots that list into a map // where the keys are node names and the values are the aggregated information // for that node. @@ -188,6 +216,27 @@ func (s *Snapshot) StorageInfos() fwk.StorageInfoLister { return s } +// PodGroupStates returns a PodGroupStateLister. +func (s *Snapshot) PodGroupStates() fwk.PodGroupStateLister { + return &podGroupStateSnapshotLister{podGroupStates: s.podGroupStates} +} + +var _ fwk.PodGroupStateLister = &podGroupStateSnapshotLister{} + +type podGroupStateSnapshotLister struct { + podGroupStates map[podGroupKey]*podGroupStateSnapshot +} + +// Get returns the pod group state from the snapshot for the given pod group. +func (l *podGroupStateSnapshotLister) Get(namespace string, podGroupName string) (fwk.PodGroupState, error) { + key := newPodGroupKey(namespace, podGroupName) + state, ok := l.podGroupStates[key] + if !ok { + return nil, fmt.Errorf("pod group state not found for pod group %s", key) + } + return state, nil +} + // NumNodesInPlacement returns the number of nodes in the snapshot for the current placement. // If no placement is set, it returns the number of nodes in the snapshot. // This function is not thread safe so it should be executed when no other routines can write to the snapshot. @@ -247,6 +296,14 @@ func (s *Snapshot) AssumePod(podInfo *framework.PodInfo) error { nodeInfo.AddPodInfo(podInfo) nodeInfo.Generation = oldGeneration s.assumedPods[key] = pod + // Update the pod group state in the snapshot if the pod belongs to a pod group. + if !s.genericWorkloadEnabled || pod.Spec.SchedulingGroup == nil { + return nil + } + pgKey := newPodGroupKey(pod.Namespace, *pod.Spec.SchedulingGroup.PodGroupName) + if pgs, ok := s.podGroupStates[pgKey]; ok { + pgs.assumePod(pod.UID) + } return nil } @@ -277,6 +334,14 @@ func (s *Snapshot) ForgetPod(logger klog.Logger, pod *v1.Pod) error { delete(s.nodeInfoMap, nodeName) } } + // Update the pod group state in the snapshot if the pod belongs to a pod group. + if !s.genericWorkloadEnabled || pod.Spec.SchedulingGroup == nil { + return nil + } + pgKey := newPodGroupKey(assumedPod.Namespace, *assumedPod.Spec.SchedulingGroup.PodGroupName) + if pgs, ok := s.podGroupStates[pgKey]; ok { + pgs.forgetPod(assumedPod.UID) + } return nil } diff --git a/pkg/scheduler/backend/podgroupmanager/podgroupmanager.go b/pkg/scheduler/backend/podgroupmanager/podgroupmanager.go deleted file mode 100644 index fddb48a2190..00000000000 --- a/pkg/scheduler/backend/podgroupmanager/podgroupmanager.go +++ /dev/null @@ -1,134 +0,0 @@ -/* -Copyright 2025 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package podgroupmanager - -import ( - "fmt" - "sync" - - v1 "k8s.io/api/core/v1" - "k8s.io/klog/v2" - fwk "k8s.io/kube-scheduler/framework" -) - -// PodGroupManager is the central source of truth for the state of pods belonging to PodGroup objects. -// It is designed to be driven explicitly by the scheduler's event handlers to ensure thread safety -// and avoid race conditions with the main scheduling queue. -// Note: The current implementation assumes that pod.Spec.SchedulingGroup is immutable. -// Allowing mutability would require changes to the manager, e.g., by properly handling pod updates. -type PodGroupManager interface { - fwk.PodGroupManager - - // AddPod is called by the scheduler when a Pod/Add event is observed. - AddPod(pod *v1.Pod) - // UpdatePod is called by the scheduler when a Pod/Update event is observed. - UpdatePod(oldPod, newPod *v1.Pod) - // DeletePod is called by the scheduler when a Pod/Delete event is observed. - DeletePod(pod *v1.Pod) -} - -// podGroupManager is the concrete implementation of the PodGroupManager. -type podGroupManager struct { - lock sync.RWMutex - - // podGroupStates stores the runtime state for each known pod group. - podGroupStates map[podGroupKey]*podGroupState - - logger klog.Logger -} - -// New initializes a new pod group manager and returns it. -func New(logger klog.Logger) *podGroupManager { - return &podGroupManager{ - podGroupStates: make(map[podGroupKey]*podGroupState), - logger: logger, - } -} - -// AddPod adds a pod to the pod group manager if it has a scheduling group. -// Pod is added to the available pods set for its corresponding pod group. -func (pgm *podGroupManager) AddPod(pod *v1.Pod) { - if pod.Spec.SchedulingGroup == nil { - return - } - pgm.lock.Lock() - defer pgm.lock.Unlock() - - key := newPodGroupKey(pod.Namespace, pod.Spec.SchedulingGroup) - state, ok := pgm.podGroupStates[key] - if !ok { - state = newPodGroupState() - pgm.podGroupStates[key] = state - } - state.addPod(pod) -} - -// UpdatePod updates a pod in the pod group manager if it has a scheduling group. -// Note: The current implementation assumes that newPod.Spec.SchedulingGroup is immutable. -func (pgm *podGroupManager) UpdatePod(oldPod, newPod *v1.Pod) { - if newPod.Spec.SchedulingGroup == nil { - return - } - pgm.lock.Lock() - defer pgm.lock.Unlock() - - key := newPodGroupKey(newPod.Namespace, newPod.Spec.SchedulingGroup) - state, ok := pgm.podGroupStates[key] - if !ok { - // Shouldn't happen, but handling this case gracefully. - state = newPodGroupState() - pgm.podGroupStates[key] = state - state.addPod(newPod) - pgm.logger.Error(nil, "UpdatePod found no existing PodGroup for pod. Created new PodGroup for the pod", "pod", klog.KObj(newPod), "podGroupKey", klog.KObj(key)) - return - } - state.updatePod(oldPod, newPod) -} - -// DeletePod removes a pod from the pod group manager if it has a scheduling group. -// Pod is removed from the pods sets for its corresponding pod group. -func (pgm *podGroupManager) DeletePod(pod *v1.Pod) { - if pod.Spec.SchedulingGroup == nil { - return - } - pgm.lock.Lock() - defer pgm.lock.Unlock() - - key := newPodGroupKey(pod.Namespace, pod.Spec.SchedulingGroup) - state, ok := pgm.podGroupStates[key] - if !ok { - // The pod group may have already been cleaned up, or the pod was never added. - return - } - state.deletePod(pod.UID) - // Clean up the map entry if no pods are left in the group. - if state.empty() { - delete(pgm.podGroupStates, key) - } -} - -// PodGroupState returns the runtime state of a pod group. -func (pgm *podGroupManager) PodGroupState(namespace string, schedulingGroup *v1.PodSchedulingGroup) (fwk.PodGroupState, error) { - pgm.lock.RLock() - defer pgm.lock.RUnlock() - - state, ok := pgm.podGroupStates[newPodGroupKey(namespace, schedulingGroup)] - if !ok { - return nil, fmt.Errorf("internal pod group state doesn't exist for a pod's scheduling group") - } - return state, nil -} diff --git a/pkg/scheduler/backend/podgroupmanager/podgroupmanager_test.go b/pkg/scheduler/backend/podgroupmanager/podgroupmanager_test.go deleted file mode 100644 index 257cd0a14ea..00000000000 --- a/pkg/scheduler/backend/podgroupmanager/podgroupmanager_test.go +++ /dev/null @@ -1,280 +0,0 @@ -/* -Copyright 2025 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package podgroupmanager - -import ( - "testing" - - v1 "k8s.io/api/core/v1" - "k8s.io/klog/v2/ktesting" - st "k8s.io/kubernetes/pkg/scheduler/testing" -) - -func TestPodGroupManager_AddPod(t *testing.T) { - p1 := st.MakePod().Namespace("ns1").Name("p1").UID("p1").PodGroupName("pg1").Obj() - // Assigned - p2 := st.MakePod().Namespace("ns1").Name("p2").UID("p2").Node("node1").PodGroupName("pg1").Obj() - // Different ns - p3 := st.MakePod().Namespace("ns2").Name("p3").UID("p3").PodGroupName("pg1").Obj() - nonPodGroupPod := st.MakePod().Namespace("ns1").Name("non-podgroup").Obj() - - tests := []struct { - name string - initPods []*v1.Pod - podToAdd *v1.Pod - - expectedPodGroups int - expectInAllPods bool - expectInUnscheduledPods bool - expectInAssumedPods bool - expectInAssignedPods bool - }{ - { - name: "adding an unscheduled pod", - podToAdd: p1, - expectedPodGroups: 1, - expectInAllPods: true, - expectInUnscheduledPods: true, - }, - { - name: "adding an assigned pod", - podToAdd: p2, - expectedPodGroups: 1, - expectInAllPods: true, - expectInAssignedPods: true, - }, - { - name: "adding pod with different namespace", - initPods: []*v1.Pod{p1}, - podToAdd: p3, - expectedPodGroups: 2, - expectInAllPods: true, - expectInUnscheduledPods: true, - }, - { - name: "adding a non-podgroup pod is a no-op", - podToAdd: nonPodGroupPod, - expectedPodGroups: 0, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) - - manager := New(logger) - for _, p := range tt.initPods { - manager.AddPod(p) - } - - manager.AddPod(tt.podToAdd) - - gotPodGroups := len(manager.podGroupStates) - if gotPodGroups != tt.expectedPodGroups { - t.Fatalf("Expected %v pod group(s), got %v", tt.expectedPodGroups, gotPodGroups) - } - if gotPodGroups == 0 { - return - } - state, err := manager.PodGroupState(tt.podToAdd.Namespace, tt.podToAdd.Spec.SchedulingGroup) - if err != nil { - t.Fatalf("Unexpected error getting pod group state: %v", err) - } - if inAll := state.AllPods().Has(tt.podToAdd.UID); inAll != tt.expectInAllPods { - t.Errorf("Unexpected AllPods state, want: %v, got: %v", tt.expectInAllPods, inAll) - } - if inAssumed := state.AssumedPods().Has(tt.podToAdd.UID); inAssumed != tt.expectInAssumedPods { - t.Errorf("Unexpected AssumedPods state, want: %v, got: %v", tt.expectInAssumedPods, inAssumed) - } - if inAssigned := state.AssignedPods().Has(tt.podToAdd.UID); inAssigned != tt.expectInAssignedPods { - t.Errorf("Unexpected AssignedPods state, want: %v, got: %v", tt.expectInAssignedPods, inAssigned) - } - }) - } -} - -func TestPodGroupManager_UpdatePod(t *testing.T) { - pod := st.MakePod().Namespace("ns1").Name("p1").UID("p1"). - PodGroupName("pg1").Obj() - updatedPod := st.MakePod().Namespace("ns1").Name("p1").UID("p1").Labels(map[string]string{"foo": "bar"}). - PodGroupName("pg1").Obj() - - assignedPod := st.MakePod().Namespace("ns1").Name("p2").UID("p2").Node("node1"). - PodGroupName("pg1").Obj() - updatedAssignedPod := st.MakePod().Namespace("ns1").Name("p2").UID("p2").Node("node1").Labels(map[string]string{"foo": "bar"}). - PodGroupName("pg1").Obj() - - nonPodGroup := st.MakePod().Namespace("ns1").Name("non-podgroup").Obj() - updatedNonPodGroupPod := st.MakePod().Namespace("ns1").Name("non-podgroup").Labels(map[string]string{"foo": "bar"}).Obj() - - tests := []struct { - name string - assumePod bool - oldPod *v1.Pod - newPod *v1.Pod - - expectInAllPods bool - expectInUnscheduledPods bool - expectInAssumedPods bool - expectInAssignedPods bool - }{ - { - name: "updating an unscheduled pod", - oldPod: pod, - newPod: updatedPod, - expectInAllPods: true, - expectInUnscheduledPods: true, - }, - { - name: "updating an assumed pod", - assumePod: true, - oldPod: pod, - newPod: updatedPod, - expectInAllPods: true, - expectInAssumedPods: true, - }, - { - name: "updating an assigned pod", - oldPod: assignedPod, - newPod: updatedAssignedPod, - expectInAllPods: true, - expectInAssignedPods: true, - }, - { - name: "binding an unscheduled pod", - oldPod: pod, - newPod: assignedPod, - expectInAllPods: true, - expectInAssignedPods: true, - }, - { - name: "binding an assumed pod", - assumePod: true, - oldPod: pod, - newPod: assignedPod, - expectInAllPods: true, - expectInAssignedPods: true, - }, - { - name: "updating a non-podgroup pod is a no-op", - oldPod: nonPodGroup, - newPod: updatedNonPodGroupPod, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) - - manager := New(logger) - - manager.AddPod(tt.oldPod) - if tt.assumePod { - state, err := manager.PodGroupState(tt.oldPod.Namespace, tt.oldPod.Spec.SchedulingGroup) - if err != nil { - t.Fatalf("Unexpected error getting pod group state: %v", err) - } - state.AssumePod(tt.oldPod.UID) - } - - manager.UpdatePod(tt.oldPod, tt.newPod) - - gotPodGroups := len(manager.podGroupStates) - if gotPodGroups == 0 { - if tt.expectInAllPods { - t.Fatalf("Expected pod group, but got none") - } - return - } - if !tt.expectInAllPods { - t.Fatalf("Expected no pod groups, but got %v", gotPodGroups) - } - state, err := manager.PodGroupState(tt.newPod.Namespace, tt.newPod.Spec.SchedulingGroup) - if err != nil { - t.Fatalf("Unexpected error getting pod group state: %v", err) - } - if inAll := state.AllPods().Has(tt.newPod.UID); inAll != tt.expectInAllPods { - t.Errorf("Unexpected AllPods state, want: %v, got: %v", tt.expectInAllPods, inAll) - } - if inAssumed := state.AssumedPods().Has(tt.newPod.UID); inAssumed != tt.expectInAssumedPods { - t.Errorf("Unexpected AssumedPods state, want: %v, got: %v", tt.expectInAssumedPods, inAssumed) - } - if inAssigned := state.AssignedPods().Has(tt.newPod.UID); inAssigned != tt.expectInAssignedPods { - t.Errorf("Unexpected AssignedPods state, want: %v, got: %v", tt.expectInAssignedPods, inAssigned) - } - }) - } -} - -func TestPodGroupManager_DeletePod(t *testing.T) { - p1 := st.MakePod().Namespace("ns1").Name("p1").UID("p1").PodGroupName("pg1").Obj() - p2 := st.MakePod().Namespace("ns1").Name("p2").UID("p2").PodGroupName("pg1").Obj() - - tests := []struct { - name string - initPods []*v1.Pod - podToDelete *v1.Pod - - expectedPodGroups int - }{ - { - name: "deleting a pod from a group with multiple pods", - initPods: []*v1.Pod{p1, p2}, - podToDelete: p1, - expectedPodGroups: 1, - }, - { - name: "deleting the last pod cleans up the state", - initPods: []*v1.Pod{p1}, - podToDelete: p1, - }, - { - name: "deleting a non-existent pod is a no-op", - podToDelete: p1, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) - - manager := New(logger) - for _, p := range tt.initPods { - manager.AddPod(p) - } - manager.DeletePod(tt.podToDelete) - - gotPodGroups := len(manager.podGroupStates) - if gotPodGroups != tt.expectedPodGroups { - t.Fatalf("Expected %v pod group(s), got %v", tt.expectedPodGroups, gotPodGroups) - } - if gotPodGroups == 0 { - return - } - state, err := manager.PodGroupState(tt.podToDelete.Namespace, tt.podToDelete.Spec.SchedulingGroup) - if err != nil { - t.Fatalf("Unexpected error getting pod group state: %v", err) - } - if state.AllPodsCount() == 0 { - t.Errorf("Expected AllPods to be non-empty") - } - if state.AllPods().Has(p1.UID) { - t.Errorf("Expected pod to be deleted") - } - }) - } -} diff --git a/pkg/scheduler/backend/podgroupmanager/podgroupstate.go b/pkg/scheduler/backend/podgroupmanager/podgroupstate.go deleted file mode 100644 index 8d36b619437..00000000000 --- a/pkg/scheduler/backend/podgroupmanager/podgroupstate.go +++ /dev/null @@ -1,248 +0,0 @@ -/* -Copyright 2025 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package podgroupmanager - -import ( - "sync" - "time" - - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/klog/v2" - "k8s.io/utils/ptr" -) - -// DefaultSchedulingTimeoutDuration defines how long the gang pods should wait at the -// Permit stage for a quorum before being rejected. -// Variable is exported only for testing purposes. -var DefaultSchedulingTimeoutDuration = 5 * time.Minute - -// podGroupKey uniquely identifies a specific instance of a PodGroup. -type podGroupKey struct { - name string - namespace string -} - -func (pgk podGroupKey) GetName() string { - return pgk.name -} - -func (pgk podGroupKey) GetNamespace() string { - return pgk.namespace -} - -var _ klog.KMetadata = &podGroupKey{} - -func newPodGroupKey(namespace string, schedulingGroup *v1.PodSchedulingGroup) podGroupKey { - return podGroupKey{ - name: *schedulingGroup.PodGroupName, - namespace: namespace, - } -} - -// podGroupState holds the runtime state of a pod group. -type podGroupState struct { - lock sync.RWMutex - // allPods tracks all pods belonging to the group that are known to the scheduler. - allPods map[types.UID]*v1.Pod - // unscheduledPods tracks all pods that are unscheduled for this group, - // i.e., are neither assumed nor scheduled. - unscheduledPods sets.Set[types.UID] - // assumedPods tracks pods that have reached the Reserve stage and are waiting - // for the rest of the gang to arrive before being allowed to bind. - assumedPods sets.Set[types.UID] - // assignedPods tracks all pods belonging to the group that are assigned (bound). - assignedPods sets.Set[types.UID] - // schedulingDeadline stores the time at which the gang will time out. - // It is initialized when the first pod from the group enters the Permit stage. - schedulingDeadline *time.Time -} - -func newPodGroupState() *podGroupState { - return &podGroupState{ - allPods: make(map[types.UID]*v1.Pod), - unscheduledPods: sets.New[types.UID](), - assumedPods: sets.New[types.UID](), - assignedPods: sets.New[types.UID](), - } -} - -// addPod adds the pod to this group. -// Depending on the NodeName, it can insert the pod to assignedPods set. -func (pgs *podGroupState) addPod(pod *v1.Pod) { - pgs.lock.Lock() - defer pgs.lock.Unlock() - - pgs.allPods[pod.UID] = pod - if pod.Spec.NodeName != "" { - pgs.assignedPods.Insert(pod.UID) - } else { - pgs.unscheduledPods.Insert(pod.UID) - } -} - -// updatePod updates the pod in this group. -// In case of binding, it moves the pod to assignedPods. -func (pgs *podGroupState) updatePod(oldPod, newPod *v1.Pod) { - pgs.lock.Lock() - defer pgs.lock.Unlock() - - pgs.allPods[newPod.UID] = newPod - if oldPod.Spec.NodeName == "" && newPod.Spec.NodeName != "" { - pgs.assignedPods.Insert(newPod.UID) - // Clear pod from unscheduled and assumed when it is assigned. - pgs.unscheduledPods.Delete(newPod.UID) - pgs.assumedPods.Delete(newPod.UID) - } -} - -// deletePod completely deletes the pod from this group. -func (pgs *podGroupState) deletePod(podUID types.UID) { - pgs.lock.Lock() - defer pgs.lock.Unlock() - - delete(pgs.allPods, podUID) - pgs.unscheduledPods.Delete(podUID) - pgs.assumedPods.Delete(podUID) - pgs.assignedPods.Delete(podUID) -} - -// empty returns true when the group is empty. -func (pgs *podGroupState) empty() bool { - pgs.lock.RLock() - defer pgs.lock.RUnlock() - - return len(pgs.allPods) == 0 -} - -// AllPods returns the UIDs of all pods known to the scheduler for this group. -func (pgs *podGroupState) AllPods() sets.Set[types.UID] { - pgs.lock.RLock() - defer pgs.lock.RUnlock() - - return sets.KeySet(pgs.allPods) -} - -// AllPodsCount returns the number of all pods known to the scheduler for this group. -func (pgs *podGroupState) AllPodsCount() int { - pgs.lock.RLock() - defer pgs.lock.RUnlock() - - return len(pgs.allPods) -} - -// UnscheduledPods returns all pods that are unscheduled for this group, -// i.e., are neither assumed nor assigned. -// The returned map type corresponds to the argument of the PodActivator.Activate method. -func (pgs *podGroupState) UnscheduledPods() map[string]*v1.Pod { - pgs.lock.RLock() - defer pgs.lock.RUnlock() - - unscheduledPods := make(map[string]*v1.Pod, len(pgs.unscheduledPods)) - for podUID := range pgs.unscheduledPods { - pod := pgs.allPods[podUID] - unscheduledPods[pod.Name] = pod - } - return unscheduledPods -} - -// AssumedPods returns the UIDs of all pods for this group in the assumed state, -// i.e., passed the Reserve gate. -func (pgs *podGroupState) AssumedPods() sets.Set[types.UID] { - pgs.lock.RLock() - defer pgs.lock.RUnlock() - - return pgs.assumedPods.Clone() -} - -// AssignedPods returns the UIDs of all pods already assigned (bound) for this group. -func (pgs *podGroupState) AssignedPods() sets.Set[types.UID] { - pgs.lock.RLock() - defer pgs.lock.RUnlock() - - return pgs.assignedPods.Clone() -} - -// ScheduledPodsCount returns the number of pods for this group that are either assumed or assigned. -func (pgs *podGroupState) ScheduledPodsCount() int { - pgs.lock.RLock() - defer pgs.lock.RUnlock() - - return len(pgs.assumedPods) + len(pgs.assignedPods) -} - -// SchedulingTimeout returns the remaining time until the pod group scheduling times out. -// A new deadline is created if one doesn't exist, or if the previous one has expired. -func (pgs *podGroupState) SchedulingTimeout() time.Duration { - pgs.lock.Lock() - defer pgs.lock.Unlock() - - now := time.Now() - // A new deadline is set if one doesn't exist, or if the old one has passed. - // This allows a new attempt to form a gang after a previous attempt timed out. - if pgs.schedulingDeadline == nil || pgs.schedulingDeadline.Before(now) { - pgs.schedulingDeadline = ptr.To(now.Add(DefaultSchedulingTimeoutDuration)) - } - return pgs.schedulingDeadline.Sub(now) -} - -// AssumePod marks a pod as having reached the Reserve stage. -func (pgs *podGroupState) AssumePod(podUID types.UID) { - pgs.lock.Lock() - defer pgs.lock.Unlock() - - pod := pgs.allPods[podUID] - // A scheduling pod may be removed from the cluster. - // In that case, we just ignore it. - if pod == nil { - return - } - - // If the pod is already assigned, put it into assignedPods. - // Otherwise put it to assumedPods. - if pod.Spec.NodeName != "" { - pgs.assignedPods.Insert(podUID) - } else { - pgs.assumedPods.Insert(podUID) - } - - pgs.unscheduledPods.Delete(podUID) -} - -// ForgetPod removes a pod from the assumed state. -func (pgs *podGroupState) ForgetPod(podUID types.UID) { - pgs.lock.Lock() - defer pgs.lock.Unlock() - - pod := pgs.allPods[podUID] - // A scheduling pod may be removed from the cluster. - // In that case, we just ignore it. - if pod == nil { - return - } - - pgs.assumedPods.Delete(podUID) - - // If the pod is already assigned, put it into assignedPods. - // Otherwise, put it into unscheduledPods. - if pod.Spec.NodeName != "" { - pgs.assignedPods.Insert(podUID) - } else { - pgs.unscheduledPods.Insert(podUID) - } -} diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index 2745421af89..77a011dcb3b 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -133,10 +133,6 @@ func (sched *Scheduler) addPod(obj interface{}) { return } - if sched.PodGroupManager != nil { - // Register pod into pod group manager before adding to the cache or scheduling queue. - sched.PodGroupManager.AddPod(pod) - } if assignedPod(pod) { sched.addAssignedPodToCache(pod) } else if responsibleForPod(pod, sched.Profiles) { @@ -157,10 +153,6 @@ func (sched *Scheduler) updatePod(oldObj, newObj interface{}) { return } - if sched.PodGroupManager != nil { - // Update pod in pod group manager before updating it in the cache or scheduling queue. - sched.PodGroupManager.UpdatePod(oldPod, newPod) - } if assignedPod(oldPod) { sched.updateAssignedPodInCache(oldPod, newPod) } else if assignedPod(newPod) { @@ -185,10 +177,6 @@ func (sched *Scheduler) deletePod(obj interface{}) { switch t := obj.(type) { case *v1.Pod: pod = t - if sched.PodGroupManager != nil { - // Delete pod from pod group manager before deleting the pod from cache or scheduling queue. - sched.PodGroupManager.DeletePod(pod) - } if assignedPod(pod) { sched.deleteAssignedPodFromCache(pod) } else if responsibleForPod(pod, sched.Profiles) { @@ -204,10 +192,6 @@ func (sched *Scheduler) deletePod(obj interface{}) { utilruntime.HandleErrorWithLogger(logger, nil, "Cannot convert to *v1.Pod", "obj", t.Obj) return } - if sched.PodGroupManager != nil { - // Delete pod from pod group manager before deleting the pod from cache or scheduling queue. - sched.PodGroupManager.DeletePod(pod) - } // The carried object may be stale, so we don't use it to check if // it's assigned or not. Attempting to cleanup anyways. sched.deleteAssignedPodFromCache(pod) @@ -228,6 +212,7 @@ func (sched *Scheduler) addPodToSchedulingQueue(pod *v1.Pod) { logger := sched.logger logger.V(3).Info("Add event for unscheduled pod", "pod", klog.KObj(pod)) + sched.Cache.AddPodGroupMember(pod) sched.SchedulingQueue.Add(klog.NewContext(context.Background(), logger), pod) if utilfeature.DefaultFeatureGate.Enabled(features.GangScheduling) { sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.EventUnscheduledPodAdd, nil, pod, nil) @@ -313,6 +298,8 @@ func (sched *Scheduler) updatePodInSchedulingQueue(oldPod, newPod *v1.Pod) { _ = sched.syncPodWithDispatcher(newPod) } + sched.Cache.UpdatePodGroupMember(logger, oldPod, newPod) + isAssumed, err := sched.Cache.IsAssumedPod(newPod) if err != nil { utilruntime.HandleErrorWithLogger(logger, err, "Failed to check whether pod is assumed", "pod", klog.KObj(newPod)) @@ -354,6 +341,7 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(pod *v1.Pod, inBinding bool // once the https://github.com/kubernetes/kubernetes/issues/134859 is fixed. return } + sched.Cache.RemovePodGroupMember(pod) isAssumed, err := sched.Cache.IsAssumedPod(pod) if err != nil { utilruntime.HandleErrorWithLogger(logger, err, "Failed to check whether pod is assumed", "pod", klog.KObj(pod)) diff --git a/pkg/scheduler/eventhandlers_test.go b/pkg/scheduler/eventhandlers_test.go index 1a68a17bde9..0294103e6f2 100644 --- a/pkg/scheduler/eventhandlers_test.go +++ b/pkg/scheduler/eventhandlers_test.go @@ -184,7 +184,7 @@ func TestEventHandlers_MoveToActiveOnNominatedNodeUpdate(t *testing.T) { // disable backoff queue internalqueue.WithPodInitialBackoffDuration(0), internalqueue.WithPodMaxBackoffDuration(0)) - schedulerCache := internalcache.New(ctx, nil) + schedulerCache := internalcache.New(ctx, nil, false) // Put test pods into unschedulable queue for _, pod := range unschedulablePods { @@ -254,7 +254,7 @@ func TestUpdateAssignedPodInCache(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() sched := &Scheduler{ - Cache: internalcache.New(ctx, nil), + Cache: internalcache.New(ctx, nil, false), SchedulingQueue: internalqueue.NewTestQueue(ctx, nil), logger: logger, } @@ -795,7 +795,7 @@ func TestAddPod(t *testing.T) { defer cancel() sched := &Scheduler{ - Cache: internalcache.New(ctx, nil), + Cache: internalcache.New(ctx, nil, false), SchedulingQueue: internalqueue.NewTestQueue(ctx, nil), logger: logger, Profiles: profile.Map{ @@ -930,7 +930,7 @@ func TestUpdatePod(t *testing.T) { t.Fatalf("Failed to create framework: %v", err) } sched := &Scheduler{ - Cache: internalcache.New(ctx, nil), + Cache: internalcache.New(ctx, nil, false), SchedulingQueue: internalqueue.NewTestQueue(ctx, nil), logger: logger, Profiles: profile.Map{ @@ -1049,7 +1049,7 @@ func TestDeletePod(t *testing.T) { t.Fatalf("Failed to create framework: %v", err) } sched := &Scheduler{ - Cache: internalcache.New(ctx, nil), + Cache: internalcache.New(ctx, nil, false), SchedulingQueue: internalqueue.NewTestQueue(ctx, nil), logger: logger, Profiles: profile.Map{ diff --git a/pkg/scheduler/extender_test.go b/pkg/scheduler/extender_test.go index 5e3e6049938..e65e5f4a4bf 100644 --- a/pkg/scheduler/extender_test.go +++ b/pkg/scheduler/extender_test.go @@ -332,7 +332,7 @@ func TestSchedulerWithExtenders(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() - cache := internalcache.New(ctx, nil) + cache := internalcache.New(ctx, nil, false) for _, name := range test.nodes { cache.AddNode(logger, createNode(name)) } diff --git a/pkg/scheduler/framework/autoscaler_contract/lister_contract_test.go b/pkg/scheduler/framework/autoscaler_contract/lister_contract_test.go index ad1b79e2e37..f496d1f2cbc 100644 --- a/pkg/scheduler/framework/autoscaler_contract/lister_contract_test.go +++ b/pkg/scheduler/framework/autoscaler_contract/lister_contract_test.go @@ -33,6 +33,8 @@ var _ fwk.NodeInfoLister = &nodeInfoListerContract{} var _ fwk.StorageInfoLister = &storageInfoListerContract{} var _ fwk.SharedLister = &shareListerContract{} var _ fwk.ResourceSliceLister = &resourceSliceListerContract{} +var _ fwk.PodGroupStateLister = &podGroupStateListerContract{} +var _ fwk.PodGroupState = &podGroupStateContract{} var _ fwk.DeviceClassLister = &deviceClassListerContract{} var _ fwk.ResourceClaimTracker = &resourceClaimTrackerContract{} var _ fwk.DeviceClassResolver = &deviceClassResolverContract{} @@ -72,6 +74,46 @@ func (c *shareListerContract) StorageInfos() fwk.StorageInfoLister { return nil } +func (c *shareListerContract) PodGroupStates() fwk.PodGroupStateLister { + return nil +} + +type podGroupStateListerContract struct{} + +func (c *podGroupStateListerContract) Get(_ string, _ string) (fwk.PodGroupState, error) { + return nil, nil +} + +type podGroupStateContract struct{} + +func (c *podGroupStateContract) AllPods() sets.Set[types.UID] { + return nil +} + +func (c *podGroupStateContract) UnscheduledPods() map[string]*v1.Pod { + return nil +} + +func (c *podGroupStateContract) AssumedPods() sets.Set[types.UID] { + return nil +} + +func (c *podGroupStateContract) AssignedPods() sets.Set[types.UID] { + return nil +} + +func (c *podGroupStateContract) AllPodsCount() int { + return 0 +} + +func (c *podGroupStateContract) ScheduledPodsCount() int { + return 0 +} + +func (c *podGroupStateContract) ScheduledPods() []*v1.Pod { + return nil +} + type resourceSliceListerContract struct{} func (c *resourceSliceListerContract) ListWithDeviceTaintRules() ([]*resourceapi.ResourceSlice, error) { diff --git a/pkg/scheduler/framework/cycle_state.go b/pkg/scheduler/framework/cycle_state.go index d64e1550bef..a2a3078e6d7 100644 --- a/pkg/scheduler/framework/cycle_state.go +++ b/pkg/scheduler/framework/cycle_state.go @@ -39,6 +39,8 @@ type CycleState struct { // GetParallelPreBindPlugins returns plugins that can be run in parallel with other plugins // in the PreBind extension point. parallelPreBindPlugins sets.Set[string] + // isPodGroupSchedulingCycle indicates whether this cycle is a pod group scheduling cycle or not. + isPodGroupSchedulingCycle bool } // NewCycleState initializes a new CycleState and returns its pointer. @@ -94,6 +96,14 @@ func (c *CycleState) GetParallelPreBindPlugins() sets.Set[string] { return c.parallelPreBindPlugins } +func (c *CycleState) IsPodGroupSchedulingCycle() bool { + return c.isPodGroupSchedulingCycle +} + +func (c *CycleState) SetPodGroupSchedulingCycle(isPodGroupSchedulingCycle bool) { + c.isPodGroupSchedulingCycle = isPodGroupSchedulingCycle +} + // Clone creates a copy of CycleState and returns its pointer. Clone returns // nil if the context being cloned is nil. func (c *CycleState) Clone() fwk.CycleState { @@ -112,6 +122,7 @@ func (c *CycleState) Clone() fwk.CycleState { copy.skipScorePlugins = c.skipScorePlugins copy.skipPreBindPlugins = c.skipPreBindPlugins copy.parallelPreBindPlugins = c.parallelPreBindPlugins + copy.isPodGroupSchedulingCycle = c.isPodGroupSchedulingCycle return copy } diff --git a/pkg/scheduler/framework/plugins/defaultbinder/default_binder_test.go b/pkg/scheduler/framework/plugins/defaultbinder/default_binder_test.go index 7d4aedf569f..3b6ab93dc03 100644 --- a/pkg/scheduler/framework/plugins/defaultbinder/default_binder_test.go +++ b/pkg/scheduler/framework/plugins/defaultbinder/default_binder_test.go @@ -94,7 +94,7 @@ func TestDefaultBinder(t *testing.T) { t.Fatal(err) } if asyncAPICallsEnabled { - cache := internalcache.New(ctx, apiDispatcher) + cache := internalcache.New(ctx, apiDispatcher, false) fh.SetAPICacher(apicache.New(nil, cache)) } diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go index f37bb99f2cf..5818b35b63d 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go @@ -456,7 +456,7 @@ func TestPostFilter(t *testing.T) { t.Fatal(err) } if asyncAPICallsEnabled { - cache := internalcache.New(ctx, apiDispatcher) + cache := internalcache.New(ctx, apiDispatcher, false) f.SetAPICacher(apicache.New(nil, cache)) } @@ -2183,7 +2183,7 @@ func TestPreempt(t *testing.T) { defer apiDispatcher.Close() } - cache := internalcache.New(ctx, apiDispatcher) + cache := internalcache.New(ctx, apiDispatcher, false) for _, pod := range testPods { if err := cache.AddPod(logger, pod.DeepCopy()); err != nil { t.Fatalf("Failed to add pod %s: %v", pod.Name, err) diff --git a/pkg/scheduler/framework/plugins/gangscheduling/gangscheduling.go b/pkg/scheduler/framework/plugins/gangscheduling/gangscheduling.go index 8d852caa130..130b379f2ee 100644 --- a/pkg/scheduler/framework/plugins/gangscheduling/gangscheduling.go +++ b/pkg/scheduler/framework/plugins/gangscheduling/gangscheduling.go @@ -25,7 +25,6 @@ import ( schedulingapi "k8s.io/api/scheduling/v1alpha2" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" schedulinglisters "k8s.io/client-go/listers/scheduling/v1alpha2" "k8s.io/klog/v2" fwk "k8s.io/kube-scheduler/framework" @@ -38,25 +37,31 @@ import ( const ( // Name is the name of the plugin used in the plugin registry and configurations. Name = names.GangScheduling + // permitTimeoutDuration defines how long the gang pods should + // wait at the permit stage for a quorum before being rejected. + permitTimeoutDuration = 5 * time.Minute ) // GangScheduling is a plugin that enforces "all-or-nothing" scheduling for pods // belonging to a PodGroup with a Gang scheduling policy. type GangScheduling struct { - handle fwk.Handle - podGroupLister schedulinglisters.PodGroupLister + handle fwk.Handle + podGroupLister schedulinglisters.PodGroupLister + podGroupManager fwk.PodGroupManager + snapshotLister fwk.SharedLister } var _ fwk.EnqueueExtensions = &GangScheduling{} var _ fwk.PreEnqueuePlugin = &GangScheduling{} -var _ fwk.ReservePlugin = &GangScheduling{} var _ fwk.PermitPlugin = &GangScheduling{} // New initializes a new plugin and returns it. func New(_ context.Context, _ runtime.Object, fh fwk.Handle, fts feature.Features) (fwk.Plugin, error) { return &GangScheduling{ - handle: fh, - podGroupLister: fh.SharedInformerFactory().Scheduling().V1alpha2().PodGroups().Lister(), + handle: fh, + podGroupLister: fh.SharedInformerFactory().Scheduling().V1alpha2().PodGroups().Lister(), + podGroupManager: fh.PodGroupManager(), + snapshotLister: fh.SnapshotSharedLister(), }, nil } @@ -137,7 +142,7 @@ func (pl *GangScheduling) PreEnqueue(ctx context.Context, pod *v1.Pod) *fwk.Stat return nil } - podGroupState, err := pl.handle.PodGroupManager().PodGroupState(namespace, schedulingGroup) + podGroupState, err := pl.podGroupManager.PodGroupStates().Get(namespace, *schedulingGroup.PodGroupName) if err != nil { return fwk.AsStatus(err) } @@ -150,35 +155,6 @@ func (pl *GangScheduling) PreEnqueue(ctx context.Context, pod *v1.Pod) *fwk.Stat return nil } -// Reserve is called after a node has been selected for the pod. For gang pods, -// this stage marks the pod as "assumed" in the PodGroupManager, -// contributing to the count of pods ready to be co-scheduled at the Permit stage. -func (pl *GangScheduling) Reserve(ctx context.Context, cs fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status { - if pod.Spec.SchedulingGroup == nil { - return nil - } - podGroupState, err := pl.handle.PodGroupManager().PodGroupState(pod.Namespace, pod.Spec.SchedulingGroup) - if err != nil { - return fwk.AsStatus(err) - } - podGroupState.AssumePod(pod.UID) - return nil -} - -// Unreserve removes the gang pod from the "assumed" state in the PodGroupManager, -// ensuring it doesn't count towards the Permit quorum. -func (pl *GangScheduling) Unreserve(ctx context.Context, cs fwk.CycleState, pod *v1.Pod, nodeName string) { - if pod.Spec.SchedulingGroup == nil { - return - } - podGroupState, err := pl.handle.PodGroupManager().PodGroupState(pod.Namespace, pod.Spec.SchedulingGroup) - if err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "Failed to get pod group state", "pod", klog.KObj(pod), "schedulingGroup", pod.Spec.SchedulingGroup) - return - } - podGroupState.ForgetPod(pod.UID) -} - // Permit forces all pods in a gang to wait at this stage. Once the number of waiting (assumed) pods // reaches the gang's MinCount, all pods in the gang are permitted to proceed to binding simultaneously. func (pl *GangScheduling) Permit(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (*fwk.Status, time.Duration) { @@ -203,7 +179,16 @@ func (pl *GangScheduling) Permit(ctx context.Context, state fwk.CycleState, pod return nil, 0 } - podGroupState, err := pl.handle.PodGroupManager().PodGroupState(namespace, schedulingGroup) + // Select a lister for the pod group state based on the currently executed scheduling phase. + // In the pod group scheduling cycle, it reads from the snapshot. + // Otherwise, it reads the runtime state of the pod group from the cache. + var podGroupStateLister fwk.PodGroupStateLister + if state.IsPodGroupSchedulingCycle() { + podGroupStateLister = pl.snapshotLister.PodGroupStates() + } else { + podGroupStateLister = pl.podGroupManager.PodGroupStates() + } + podGroupState, err := podGroupStateLister.Get(namespace, *schedulingGroup.PodGroupName) if err != nil { return fwk.AsStatus(err), 0 } @@ -213,7 +198,7 @@ func (pl *GangScheduling) Permit(ctx context.Context, state fwk.CycleState, pod unscheduledPods := podGroupState.UnscheduledPods() pl.handle.Activate(klog.FromContext(ctx), unscheduledPods) logger.V(4).Info("Quorum is not met for a gang. Waiting for another pod to allow", "pod", klog.KObj(pod), "schedulingGroup", schedulingGroup, "activatedPods", len(unscheduledPods)) - return fwk.NewStatus(fwk.Wait, "waiting for minCount pods from a gang to be scheduled"), podGroupState.SchedulingTimeout() + return fwk.NewStatus(fwk.Wait, "waiting for minCount pods from a gang to be scheduled"), permitTimeoutDuration } assumedPods := podGroupState.AssumedPods() diff --git a/pkg/scheduler/framework/plugins/gangscheduling/gangscheduling_test.go b/pkg/scheduler/framework/plugins/gangscheduling/gangscheduling_test.go index bb64d0fa1a0..73d1d6009ae 100644 --- a/pkg/scheduler/framework/plugins/gangscheduling/gangscheduling_test.go +++ b/pkg/scheduler/framework/plugins/gangscheduling/gangscheduling_test.go @@ -24,17 +24,27 @@ import ( v1 "k8s.io/api/core/v1" schedulingapi "k8s.io/api/scheduling/v1alpha2" "k8s.io/apimachinery/pkg/types" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/klog/v2" "k8s.io/klog/v2/ktesting" fwk "k8s.io/kube-scheduler/framework" - "k8s.io/kubernetes/pkg/scheduler/backend/podgroupmanager" + "k8s.io/kubernetes/pkg/features" + internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + "k8s.io/kubernetes/pkg/scheduler/metrics" st "k8s.io/kubernetes/pkg/scheduler/testing" ) +func init() { + // This is required for tests where cache is initialized, and cache attempts to update metrics. + metrics.Register() +} + func Test_isSchedulableAfterPodAdded(t *testing.T) { tests := []struct { name string @@ -161,7 +171,6 @@ func TestGangSchedulingFlow(t *testing.T) { p3 := st.MakePod().Namespace("ns1").Name("p3").UID("p3").PodGroupName("pg1").Obj() p4 := st.MakePod().Namespace("ns1").Name("p4").UID("p4").PodGroupName("pg2").Obj() - p5 := st.MakePod().Namespace("ns1").Name("p5").UID("p5").PodGroupName("pg2").Obj() basicPolicyPod := st.MakePod().Namespace("ns1").Name("basic-pod").UID("basic-pod").PodGroupName("pg3").Obj() @@ -169,15 +178,16 @@ func TestGangSchedulingFlow(t *testing.T) { nonGangPod := st.MakePod().Namespace("ns1").Name("non-gang").UID("non-gang").Obj() tests := []struct { - name string - pod *v1.Pod - initialPods []*v1.Pod - initialPodGroups []*schedulingapi.PodGroup - podsWaitingOnPermit []*v1.Pod - wantPreEnqueueStatus *fwk.Status - wantPermitStatus *fwk.Status - wantActivatedPods []*v1.Pod - wantAllowedPods []types.UID + name string + pod *v1.Pod + initialPods []*v1.Pod + initialPodGroups []*schedulingapi.PodGroup + podsWaitingOnPermit []*v1.Pod + isDuringPodGroupSchedulingCycle bool + wantPreEnqueueStatus *fwk.Status + wantPermitStatus *fwk.Status + wantActivatedPods []*v1.Pod + wantAllowedPods []types.UID }{ { name: "non-gang pod succeeds immediately", @@ -228,23 +238,35 @@ func TestGangSchedulingFlow(t *testing.T) { wantPermitStatus: nil, wantAllowedPods: []types.UID{"p1", "p2", "p3"}, }, + { + name: "final gang pod arrives at Permit during pod group scheduling cycle", + pod: p1, + initialPods: []*v1.Pod{p2, p3, p4, p5}, + initialPodGroups: []*schedulingapi.PodGroup{gangPodGroup1, gangPodGroup2}, + podsWaitingOnPermit: []*v1.Pod{p2, p3, p4, p5}, + isDuringPodGroupSchedulingCycle: true, + wantPreEnqueueStatus: nil, + wantPermitStatus: nil, + wantAllowedPods: []types.UID{"p1", "p2", "p3"}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.GenericWorkload, true) logger, ctx := ktesting.NewTestContext(t) - manager := podgroupmanager.New(logger) + cache := internalcache.New(ctx, nil, true) informerFactory := informers.NewSharedInformerFactory(fake.NewClientset(), 0) podGroupInformer := informerFactory.Scheduling().V1alpha2().PodGroups() - fakeActivator := &podActivatorMock{} - + snapshot := internalcache.NewEmptySnapshot() fh, err := frameworkruntime.NewFramework(ctx, nil, nil, frameworkruntime.WithInformerFactory(informerFactory), - frameworkruntime.WithPodGroupManager(manager), + frameworkruntime.WithPodGroupManager(cache), frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()), frameworkruntime.WithPodActivator(fakeActivator), + frameworkruntime.WithSnapshotSharedLister(snapshot), ) if err != nil { t.Fatalf("Failed to create framework: %v", err) @@ -257,10 +279,11 @@ func TestGangSchedulingFlow(t *testing.T) { t.Fatalf("Failed to add podGroup %s to store: %v", wl.Name, err) } } + for _, p := range tt.initialPods { - manager.AddPod(p) + cache.AddPodGroupMember(p) } - manager.AddPod(tt.pod) + cache.AddPodGroupMember(tt.pod) p, err := New(ctx, nil, fh, feature.Features{EnableGangScheduling: true}) if err != nil { @@ -279,32 +302,62 @@ func TestGangSchedulingFlow(t *testing.T) { // Simulate that other pods have already hit Permit and are now waiting. for _, p := range tt.podsWaitingOnPermit { - // Run Reserve and Permit for these pods to get them into the "assumed" state inside the manager. - status := pl.Reserve(ctx, nil, p, "some-node") - if !status.IsSuccess() { - t.Fatalf("Unexpected Reserve status for pod %q: %v", p.Name, status) + pod := p.DeepCopy() + pod.Spec.NodeName = "some-node" + if err := cache.AssumePod(logger, pod); err != nil { + t.Fatalf("Failed to assume pod %q: %v", pod.Name, err) } - status, _ = pl.Permit(ctx, nil, p, "some-node") + status, _ := pl.Permit(ctx, schedulerframework.NewCycleState(), pod, "some-node") if status.Code() != fwk.Wait { - t.Fatalf("Expected Wait status while permitting a pod %q: %v", p.Name, status) + t.Fatalf("Expected Wait status while permitting a pod %q: %v", pod.Name, status) } } - status := pl.Reserve(ctx, nil, tt.pod, "some-node") - if !status.IsSuccess() { - t.Fatalf("Unexpected Reserve status: %v", status) - } - // Clear activated pods to assert those activated in tt.pod Permit. fakeActivator.activatedPods = nil - gotPermitStatus, _ := pl.Permit(ctx, nil, tt.pod, "some-node") + cycleState := schedulerframework.NewCycleState() + cycleState.SetPodGroupSchedulingCycle(tt.isDuringPodGroupSchedulingCycle) + + pod := tt.pod.DeepCopy() + pod.Spec.NodeName = "some-node" + + // In a pod group scheduling cycle, a snapshot is taken after all + // waiting pods are assumed, so that Permit can read from it. + if tt.isDuringPodGroupSchedulingCycle { + if err := cache.UpdateSnapshot(logger, snapshot); err != nil { + t.Fatalf("Failed to update snapshot: %v", err) + } + podInfo, err := schedulerframework.NewPodInfo(pod) + if err != nil { + t.Fatalf("Failed to create pod info for %q: %v", pod.Name, err) + } + // Assume pod in the snapshot, as in a pod group scheduling cycle. + if err := snapshot.AssumePod(podInfo); err != nil { + t.Fatalf("Failed to assume pod %q in snapshot: %v", pod.Name, err) + } + } else { + // Assume pod in the cache, as in a pod-by-pod scheduling cycle, where Permit reads from cache. + if err := cache.AssumePod(logger, pod); err != nil { + t.Fatalf("Failed to assume pod %q in cache: %v", pod.Name, err) + } + } + + gotPermitStatus, _ := pl.Permit(ctx, cycleState, pod, "some-node") if diff := cmp.Diff(tt.wantPermitStatus, gotPermitStatus); diff != "" { t.Fatalf("Unexpected Permit status (-want, +got):\n%s", diff) } if gotPermitStatus.Code() == fwk.Wait { - // Pod waits for others from a gang. Simulate its eventual Unreserve. - pl.Unreserve(ctx, nil, tt.pod, "some-node") + // Pod waits for others from a gang. Simulate its eventual forget. + if tt.isDuringPodGroupSchedulingCycle { + if err := snapshot.ForgetPod(logger, pod); err != nil { + t.Fatalf("Failed to forget pod %q from snapshot: %v", pod.Name, err) + } + } else { + if err := cache.ForgetPod(logger, pod); err != nil { + t.Fatalf("Failed to forget pod %q from cache: %v", pod.Name, err) + } + } return } diff --git a/pkg/scheduler/framework/preemption/executor_test.go b/pkg/scheduler/framework/preemption/executor_test.go index aa844027867..af9b24fab30 100644 --- a/pkg/scheduler/framework/preemption/executor_test.go +++ b/pkg/scheduler/framework/preemption/executor_test.go @@ -602,7 +602,7 @@ func TestPrepareCandidate(t *testing.T) { informerFactory.Start(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done()) if asyncAPICallsEnabled { - cache := internalcache.New(ctx, apiDispatcher) + cache := internalcache.New(ctx, apiDispatcher, false) fwk.SetAPICacher(apicache.New(nil, cache)) } @@ -829,7 +829,7 @@ func TestPrepareCandidateAsyncSetsPreemptingSets(t *testing.T) { } informerFactory.Start(ctx.Done()) if asyncAPICallsEnabled { - cache := internalcache.New(ctx, apiDispatcher) + cache := internalcache.New(ctx, apiDispatcher, false) fwk.SetAPICacher(apicache.New(nil, cache)) } diff --git a/pkg/scheduler/framework/preemption/preemption_test.go b/pkg/scheduler/framework/preemption/preemption_test.go index 70de71cf378..b7667d9f1cf 100644 --- a/pkg/scheduler/framework/preemption/preemption_test.go +++ b/pkg/scheduler/framework/preemption/preemption_test.go @@ -598,7 +598,7 @@ func TestCallExtenders(t *testing.T) { } informerFactory.Start(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done()) - cache := internalcache.New(ctx, apiDispatcher) + cache := internalcache.New(ctx, apiDispatcher, false) fwk.SetAPICacher(apicache.New(nil, cache)) fakePreemptionScorePostFilterPlugin := &FakePreemptionScorePostFilterPlugin{} diff --git a/pkg/scheduler/framework/runtime/batch_test.go b/pkg/scheduler/framework/runtime/batch_test.go index 81974a92cd9..7743ae15a1b 100644 --- a/pkg/scheduler/framework/runtime/batch_test.go +++ b/pkg/scheduler/framework/runtime/batch_test.go @@ -82,6 +82,10 @@ func (s sharedLister) StorageInfos() fwk.StorageInfoLister { return &storageInfoListerContract{} } +func (s sharedLister) PodGroupStates() fwk.PodGroupStateLister { + return nil +} + var batchRegistry = func() Registry { r := make(Registry) err := r.Register("batchTest", newBatchTestPlugin) diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 41c6569ba21..2bfc640293a 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -262,7 +262,7 @@ func WithAPIDispatcher(apiDispatcher *apidispatcher.APIDispatcher) Option { } } -// WithPodGroupManager sets Pod group manager for the scheduling frameworkImpl. +// WithPodGroupManager sets the PodGroupManager for the scheduling frameworkImpl. func WithPodGroupManager(podGroupManager fwk.PodGroupManager) Option { return func(o *frameworkOptions) { o.podGroupManager = podGroupManager diff --git a/pkg/scheduler/schedule_one_podgroup.go b/pkg/scheduler/schedule_one_podgroup.go index 61a391baa4f..89978399438 100644 --- a/pkg/scheduler/schedule_one_podgroup.go +++ b/pkg/scheduler/schedule_one_podgroup.go @@ -120,7 +120,7 @@ func (sched *Scheduler) podGroupInfoForPod(ctx context.Context, pInfo *framework logger := klog.FromContext(ctx) // Get the actual pod group state - podGroupState, err := sched.PodGroupManager.PodGroupState(pInfo.Pod.Namespace, pInfo.Pod.Spec.SchedulingGroup) + podGroupState, err := sched.Cache.PodGroupStates().Get(pInfo.Pod.Namespace, *pInfo.Pod.Spec.SchedulingGroup.PodGroupName) if err != nil { return nil, fmt.Errorf("error while retrieving pod group state: %w", err) } @@ -193,6 +193,9 @@ func initPodSchedulingContext(ctx context.Context, pod *v1.Pod) *podSchedulingCo podsToActivate := framework.NewPodsToActivate() state.Write(framework.PodsToActivateKey, podsToActivate) + // Marks this cycle as a pod group scheduling cycle. + state.SetPodGroupSchedulingCycle(true) + return &podSchedulingContext{ logger: logger, state: state, @@ -439,6 +442,8 @@ func (sched *Scheduler) submitPodGroupAlgorithmResult(ctx context.Context, sched case podGroupResult.status.IsSuccess(): // Pod no longer needs a pod group scheduling cycle. Setting it to false to disable any checks in further functions. pInfo.NeedsPodGroupScheduling = false + // Disable pod group scheduling in cycle state before binding. + podCtx.state.SetPodGroupSchedulingCycle(false) // Schedule result is applied for pod and its binding cycle executes. assumedPodInfo, status := sched.prepareForBindingCycle(ctx, podCtx.state, schedFwk, pInfo, podCtx.podsToActivate, podResult.scheduleResult) if !status.IsSuccess() { diff --git a/pkg/scheduler/schedule_one_podgroup_test.go b/pkg/scheduler/schedule_one_podgroup_test.go index e5cd7c74e24..3c6714744bc 100644 --- a/pkg/scheduler/schedule_one_podgroup_test.go +++ b/pkg/scheduler/schedule_one_podgroup_test.go @@ -42,7 +42,6 @@ import ( "k8s.io/kubernetes/pkg/scheduler/apis/config" internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache" fakecache "k8s.io/kubernetes/pkg/scheduler/backend/cache/fake" - "k8s.io/kubernetes/pkg/scheduler/backend/podgroupmanager" internalqueue "k8s.io/kubernetes/pkg/scheduler/backend/queue" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" @@ -149,11 +148,11 @@ func TestPodGroupInfoForPod(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) - manager := podgroupmanager.New(logger) + cache := internalcache.New(ctx, nil, true) - manager.AddPod(tt.pInfo.Pod) + cache.AddPodGroupMember(tt.pInfo.Pod) for _, pod := range tt.unscheduledPods { - manager.AddPod(pod) + cache.AddPodGroupMember(pod) } q := internalqueue.NewTestQueue(ctx, nil) @@ -164,7 +163,7 @@ func TestPodGroupInfoForPod(t *testing.T) { } } sched := &Scheduler{ - PodGroupManager: manager, + Cache: cache, SchedulingQueue: q, } @@ -274,7 +273,7 @@ func TestSkipPodGroupPodSchedule(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) - cache := internalcache.New(ctx, nil) + cache := internalcache.New(ctx, nil, true) registry := frameworkruntime.Registry{ queuesort.Name: queuesort.New, defaultbinder.Name: defaultbinder.New, @@ -362,7 +361,7 @@ func TestPodGroupCycle_UpdateSnapshotError(t *testing.T) { // Create fake cache that returns error on UpdateSnapshot updateSnapshotErr := fmt.Errorf("update snapshot error") cache := &fakecache.Cache{ - Cache: internalcache.New(ctx, nil), + Cache: internalcache.New(ctx, nil, true), UpdateSnapshotFunc: func(nodeSnapshot *internalcache.Snapshot) error { return updateSnapshotErr }, @@ -795,6 +794,7 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) { client := clientsetfake.NewClientset(testNode) informerFactory := informers.NewSharedInformerFactory(client, 0) queue := internalqueue.NewSchedulingQueue(nil, informerFactory) + snapshot := internalcache.NewEmptySnapshot() registry := []tf.RegisterPluginFunc{ tf.RegisterFilterPlugin(tt.plugin.Name(), func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) { @@ -816,19 +816,19 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) { frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(events.NewFakeRecorder(100)), frameworkruntime.WithInformerFactory(informerFactory), - frameworkruntime.WithSnapshotSharedLister(internalcache.NewEmptySnapshot()), + frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithPodNominator(queue), ) if err != nil { t.Fatalf("Failed to create new framework: %v", err) } - cache := internalcache.New(ctx, nil) + cache := internalcache.New(ctx, nil, true) cache.AddNode(logger, testNode) sched := &Scheduler{ Cache: cache, - nodeInfoSnapshot: internalcache.NewEmptySnapshot(), + nodeInfoSnapshot: snapshot, SchedulingQueue: queue, Profiles: profile.Map{"test-scheduler": schedFwk}, } @@ -1236,7 +1236,7 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) { t.Fatalf("Failed to create new framework: %v", err) } - cache := internalcache.New(ctx, nil) + cache := internalcache.New(ctx, nil, true) cache.AddNode(klog.FromContext(ctx), testNode) sched := &Scheduler{ diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index 550cc232860..58cd28547bb 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -65,7 +65,6 @@ import ( apidispatcher "k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher" internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache" fakecache "k8s.io/kubernetes/pkg/scheduler/backend/cache/fake" - "k8s.io/kubernetes/pkg/scheduler/backend/podgroupmanager" internalqueue "k8s.io/kubernetes/pkg/scheduler/backend/queue" "k8s.io/kubernetes/pkg/scheduler/framework" apicalls "k8s.io/kubernetes/pkg/scheduler/framework/api_calls" @@ -1030,20 +1029,6 @@ func TestSchedulerScheduleOne(t *testing.T) { var gotBinding *v1.Binding var gotNominatingInfo *fwk.NominatingInfo - var pgm podgroupmanager.PodGroupManager - if scheduleAsPodGroup { - group := &v1.PodSchedulingGroup{ - PodGroupName: new("pg"), - } - // When scheduling a pod as a pod group, set scheduling group to all relevant pods. - item.sendPod = withSchedulingGroup(item.sendPod, group) - item.expectErrorPod = withSchedulingGroup(item.expectErrorPod, group) - item.expectPodInBackoffQ = withSchedulingGroup(item.expectPodInBackoffQ, group) - item.expectPodInUnschedulable = withSchedulingGroup(item.expectPodInUnschedulable, group) - pgm = podgroupmanager.New(logger) - pgm.AddPod(item.sendPod) - } - client := clientsetfake.NewClientset(item.sendPod) informerFactory := informers.NewSharedInformerFactory(client, 0) client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { @@ -1061,7 +1046,19 @@ func TestSchedulerScheduleOne(t *testing.T) { defer apiDispatcher.Close() } - internalCache := internalcache.New(ctx, apiDispatcher) + internalCache := internalcache.New(ctx, apiDispatcher, scheduleAsPodGroup) + + if scheduleAsPodGroup { + group := &v1.PodSchedulingGroup{ + PodGroupName: new("pg"), + } + // When scheduling a pod as a pod group, set scheduling group to all relevant pods. + item.sendPod = withSchedulingGroup(item.sendPod, group) + item.expectErrorPod = withSchedulingGroup(item.expectErrorPod, group) + item.expectPodInBackoffQ = withSchedulingGroup(item.expectPodInBackoffQ, group) + item.expectPodInUnschedulable = withSchedulingGroup(item.expectPodInUnschedulable, group) + internalCache.AddPodGroupMember(item.sendPod) + } cache := &fakecache.Cache{ Cache: internalCache, ForgetFunc: func(pod *v1.Pod) { @@ -1119,7 +1116,6 @@ func TestSchedulerScheduleOne(t *testing.T) { frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()), frameworkruntime.WithPodsInPreBind(frameworkruntime.NewPodsInPreBindMap()), frameworkruntime.WithInformerFactory(informerFactory), - frameworkruntime.WithPodGroupManager(pgm), ) if err != nil { t.Fatal(err) @@ -1138,7 +1134,6 @@ func TestSchedulerScheduleOne(t *testing.T) { SchedulingQueue: queue, Profiles: profile.Map{testSchedulerName: schedFramework}, APIDispatcher: apiDispatcher, - PodGroupManager: pgm, nominatedNodeNameForExpectationEnabled: features.nominatedNodeNameForExpectationEnabled, } queue.Add(ctx, item.sendPod) @@ -1717,7 +1712,7 @@ func TestScheduleOneMarksPodAsProcessedBeforePreBind(t *testing.T) { defer apiDispatcher.Close() } - internalCache := internalcache.New(ctx, apiDispatcher) + internalCache := internalcache.New(ctx, apiDispatcher, false) cache := &fakecache.Cache{ Cache: internalCache, ForgetFunc: func(pod *v1.Pod) { @@ -1906,7 +1901,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { defer apiDispatcher.Close() } - scache := internalcache.New(ctx, apiDispatcher) + scache := internalcache.New(ctx, apiDispatcher, false) firstPod := podWithPort("pod.Name", "", 8080) node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1", UID: types.UID("node1")}} scache.AddNode(logger, &node) @@ -1995,7 +1990,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { defer apiDispatcher.Close() } - scache := internalcache.New(ctx, apiDispatcher) + scache := internalcache.New(ctx, apiDispatcher, false) // Design the baseline for the pods, and we will make nodes that don't fit it later. var cpu = int64(4) @@ -2303,7 +2298,7 @@ func TestSchedulerBinding(t *testing.T) { if err != nil { t.Fatal(err) } - cache := internalcache.New(ctx, apiDispatcher) + cache := internalcache.New(ctx, apiDispatcher, false) if asyncAPICallsEnabled { informerFactory := informers.NewSharedInformerFactory(client, 0) ar := metrics.NewMetricsAsyncRecorder(10, 1*time.Second, ctx.Done()) @@ -3626,7 +3621,7 @@ func TestSchedulerSchedulePod(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() - cache := internalcache.New(ctx, nil) + cache := internalcache.New(ctx, nil, false) for _, pod := range test.pods { cache.AddPod(logger, pod) } @@ -4240,7 +4235,7 @@ func Test_prioritizeNodes(t *testing.T) { _, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() - cache := internalcache.New(ctx, nil) + cache := internalcache.New(ctx, nil, false) for _, node := range test.nodes { cache.AddNode(klog.FromContext(ctx), node) } @@ -4442,7 +4437,7 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) { nodes := makeNodeList([]string{"node1", "node2", "node3"}) client := clientsetfake.NewClientset(test.pod) informerFactory := informers.NewSharedInformerFactory(client, 0) - cache := internalcache.New(ctx, nil) + cache := internalcache.New(ctx, nil, false) for _, n := range nodes { cache.AddNode(logger, n) } @@ -4525,7 +4520,7 @@ func makeNodeList(nodeNames []string) []*v1.Node { // makeScheduler makes a simple Scheduler for testing. func makeScheduler(ctx context.Context, nodes []*v1.Node) *Scheduler { logger := klog.FromContext(ctx) - cache := internalcache.New(ctx, nil) + cache := internalcache.New(ctx, nil, false) for _, n := range nodes { cache.AddNode(logger, n) } @@ -4689,7 +4684,7 @@ func setupTestSchedulerWithVolumeBinding(ctx context.Context, t *testing.T, clie t.Cleanup(apiDispatcher.Close) } - scache := internalcache.New(ctx, apiDispatcher) + scache := internalcache.New(ctx, apiDispatcher, false) scache.AddNode(logger, &testNode) informerFactory := informers.NewSharedInformerFactory(client, 0) pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 53a70f3c635..541e91a45f1 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -44,7 +44,6 @@ import ( apidispatcher "k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher" internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache" cachedebugger "k8s.io/kubernetes/pkg/scheduler/backend/cache/debugger" - internalpodgroupmanager "k8s.io/kubernetes/pkg/scheduler/backend/podgroupmanager" internalqueue "k8s.io/kubernetes/pkg/scheduler/backend/queue" "k8s.io/kubernetes/pkg/scheduler/framework" apicalls "k8s.io/kubernetes/pkg/scheduler/framework/api_calls" @@ -98,9 +97,6 @@ type Scheduler struct { // framework.APICache should be used instead. APIDispatcher *apidispatcher.APIDispatcher - // PodGroupManager can be used to provide workload-aware scheduling. - PodGroupManager internalpodgroupmanager.PodGroupManager - // Profiles are the scheduling profiles. Profiles profile.Map @@ -351,10 +347,8 @@ func New(ctx context.Context, if feature.DefaultFeatureGate.Enabled(features.SchedulerAsyncAPICalls) { apiDispatcher = apidispatcher.New(client, int(options.parallelism), apicalls.Relevances) } - var podGroupManager internalpodgroupmanager.PodGroupManager - if feature.DefaultFeatureGate.Enabled(features.GenericWorkload) { - podGroupManager = internalpodgroupmanager.New(logger) - } + + schedulerCache := internalcache.New(ctx, apiDispatcher, feature.DefaultFeatureGate.Enabled(features.GenericWorkload)) profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory, frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion), @@ -371,7 +365,7 @@ func New(ctx context.Context, frameworkruntime.WithPodsInPreBind(podsInPreBind), frameworkruntime.WithAPIDispatcher(apiDispatcher), frameworkruntime.WithSharedCSIManager(sharedCSIManager), - frameworkruntime.WithPodGroupManager(podGroupManager), + frameworkruntime.WithPodGroupManager(schedulerCache), ) if err != nil { return nil, fmt.Errorf("initializing profiles: %v", err) @@ -423,8 +417,6 @@ func New(ctx context.Context, internalqueue.WithPodSigners(podSigners), ) - schedulerCache := internalcache.New(ctx, apiDispatcher) - var apiCache fwk.APICacher if apiDispatcher != nil { apiCache = apicache.New(podQueue, schedulerCache) @@ -452,7 +444,6 @@ func New(ctx context.Context, logger: logger, APIDispatcher: apiDispatcher, nominatedNodeNameForExpectationEnabled: feature.DefaultFeatureGate.Enabled(features.NominatedNodeNameForExpectation), - PodGroupManager: podGroupManager, } sched.NextPod = podQueue.Pop sched.applyDefaultHandlers() diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index dea0134fb9b..4c5d0f2cd40 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -310,7 +310,7 @@ func TestFailureHandler(t *testing.T) { recorder := metrics.NewMetricsAsyncRecorder(3, 20*time.Microsecond, ctx.Done()) queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(testingclock.NewFakeClock(time.Now())), internalqueue.WithMetricsRecorder(recorder), internalqueue.WithAPIDispatcher(apiDispatcher)) - schedulerCache := internalcache.New(ctx, apiDispatcher) + schedulerCache := internalcache.New(ctx, apiDispatcher, false) queue.Add(ctx, testPod) @@ -384,7 +384,7 @@ func TestFailureHandler_PodAlreadyBound(t *testing.T) { } queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(testingclock.NewFakeClock(time.Now())), internalqueue.WithAPIDispatcher(apiDispatcher)) - schedulerCache := internalcache.New(ctx, apiDispatcher) + schedulerCache := internalcache.New(ctx, apiDispatcher, false) // Add node to schedulerCache no matter it's deleted in API server or not. schedulerCache.AddNode(logger, &nodeFoo) diff --git a/staging/src/k8s.io/kube-scheduler/framework/cycle_state.go b/staging/src/k8s.io/kube-scheduler/framework/cycle_state.go index e868e102035..6c4335b3980 100644 --- a/staging/src/k8s.io/kube-scheduler/framework/cycle_state.go +++ b/staging/src/k8s.io/kube-scheduler/framework/cycle_state.go @@ -89,6 +89,10 @@ type CycleState interface { // Clone creates a copy of CycleState and returns its pointer. Clone returns // nil if the context being cloned is nil. Clone() CycleState + // IsPodGroupSchedulingCycle returns true if this cycle is a pod group scheduling cycle. + IsPodGroupSchedulingCycle() bool + // SetPodGroupSchedulingCycle sets whether this cycle is a pod group scheduling cycle or not. + SetPodGroupSchedulingCycle(bool) } // PodGroupCycleState provides a mechanism for plugins that operate on pod groups to store and retrieve arbitrary data. diff --git a/staging/src/k8s.io/kube-scheduler/framework/interface.go b/staging/src/k8s.io/kube-scheduler/framework/interface.go index fb2aea9a34d..ce465fd6a5e 100644 --- a/staging/src/k8s.io/kube-scheduler/framework/interface.go +++ b/staging/src/k8s.io/kube-scheduler/framework/interface.go @@ -882,7 +882,7 @@ type Handle interface { // ProfileName returns the profile name associated to a profile. ProfileName() string - // PodGroupManager can be used to provide workload-aware scheduling. + // PodGroupManager provides an interface for runtime information about pod groups from scheduler's cache. PodGroupManager() PodGroupManager // SignPod creates a PodSignature for a pod. diff --git a/staging/src/k8s.io/kube-scheduler/framework/listers.go b/staging/src/k8s.io/kube-scheduler/framework/listers.go index 996e1741319..903453b640e 100644 --- a/staging/src/k8s.io/kube-scheduler/framework/listers.go +++ b/staging/src/k8s.io/kube-scheduler/framework/listers.go @@ -17,8 +17,6 @@ limitations under the License. package framework import ( - "time" - v1 "k8s.io/api/core/v1" resourceapi "k8s.io/api/resource/v1" storagev1 "k8s.io/api/storage/v1" @@ -50,6 +48,13 @@ type StorageInfoLister interface { type SharedLister interface { NodeInfos() NodeInfoLister StorageInfos() StorageInfoLister + PodGroupStates() PodGroupStateLister +} + +// PodGroupStateLister provides read access to pod group states. +type PodGroupStateLister interface { + // Get returns the PodGroupState of the given pod group. + Get(namespace string, podGroupName string) (PodGroupState, error) } type CSINodeLister interface { @@ -146,14 +151,13 @@ type CSIManager interface { CSINodes() CSINodeLister } -// PodGroupManager provides an interface for scheduling plugins to provide workload-aware scheduling. -// It acts as the central source of truth for runtime information about pod groups. +// PodGroupManager provides an interface for runtime information about pod groups in the scheduler cache. type PodGroupManager interface { - // PodGroupState retrieves the runtime state for a specific pod group, identified by pod group's name and namespace. - PodGroupState(namespace string, schedulingGroup *v1.PodSchedulingGroup) (PodGroupState, error) + // PodGroupStates returns the PodGroupStateLister. + PodGroupStates() PodGroupStateLister } -// PodGroupState provides an interface to view and modify the state of a single pod group. +// PodGroupState provides an interface to view the state of a single pod group. type PodGroupState interface { // AllPods returns the UIDs of all pods known to the scheduler for this group. AllPods() sets.Set[types.UID] @@ -168,13 +172,8 @@ type PodGroupState interface { AssumedPods() sets.Set[types.UID] // AssignedPods returns the UIDs of all pods already assigned (bound) for this group. AssignedPods() sets.Set[types.UID] + // ScheduledPods returns the pods that are either assumed or assigned for this pod group. + ScheduledPods() []*v1.Pod // ScheduledPodsCount returns the number of pods for this group that are either assumed or assigned. ScheduledPodsCount() int - // AssumePod marks a pod as having reached the Reserve stage. - AssumePod(podUID types.UID) - // ForgetPod removes a pod from the assumed state. - ForgetPod(podUID types.UID) - // SchedulingTimeout returns the remaining time until the pod group scheduling times out. - // A new deadline is created if one doesn't exist, or if the previous one has expired. - SchedulingTimeout() time.Duration } diff --git a/test/integration/scheduler/podgroup/podgroup_test.go b/test/integration/scheduler/podgroup/podgroup_test.go index d69e1b6f16c..ef6a58d7d48 100644 --- a/test/integration/scheduler/podgroup/podgroup_test.go +++ b/test/integration/scheduler/podgroup/podgroup_test.go @@ -30,7 +30,6 @@ import ( featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler" - "k8s.io/kubernetes/pkg/scheduler/backend/podgroupmanager" "k8s.io/kubernetes/pkg/scheduler/backend/queue" st "k8s.io/kubernetes/pkg/scheduler/testing" testutils "k8s.io/kubernetes/test/integration/util" @@ -393,8 +392,6 @@ func TestPodGroupScheduling(t *testing.T) { features.GangScheduling: true, }) - podgroupmanager.DefaultSchedulingTimeoutDuration = 5 * time.Second - testCtx := testutils.InitTestSchedulerWithNS(t, "podgroup-scheduling", // disable backoff scheduler.WithPodMaxBackoffSeconds(0),