snapshot pod group state before scheduling cycle and embed pod group manager into cache

This commit is contained in:
Omar Sayed 2026-03-09 02:54:22 +00:00 committed by iomarsayed
parent 2fd6c47656
commit e1b18e34ff
31 changed files with 1593 additions and 909 deletions

View file

@ -41,8 +41,8 @@ var (
// New returns a Cache implementation.
// It automatically starts a go routine that exports cache metrics.
// "ctx" is the context that would close the background goroutine.
func New(ctx context.Context, apiDispatcher fwk.APIDispatcher) Cache {
cache := newCache(ctx, updateMetricsPeriod, apiDispatcher)
func New(ctx context.Context, apiDispatcher fwk.APIDispatcher, genericWorkloadEnabled bool) Cache {
cache := newCache(ctx, updateMetricsPeriod, apiDispatcher, genericWorkloadEnabled)
cache.run()
return cache
}
@ -74,7 +74,10 @@ type cacheImpl struct {
nodeTree *nodeTree
// A map from image name to its ImageStateSummary.
imageStates map[string]*fwk.ImageStateSummary
// podGroupStates stores the runtime state for each known pod group (only if GenericWorkload feature gate is enabled).
podGroupStates map[podGroupKey]*podGroupState
// genericWorkloadEnabled stores the GenericWorkload feature gate value.
genericWorkloadEnabled bool
// apiDispatcher is used for the methods that are expected to send API calls.
// It's non-nil only if the SchedulerAsyncAPICalls feature gate is enabled.
apiDispatcher fwk.APIDispatcher
@ -84,18 +87,20 @@ type podState struct {
pod *v1.Pod
}
func newCache(ctx context.Context, period time.Duration, apiDispatcher fwk.APIDispatcher) *cacheImpl {
func newCache(ctx context.Context, period time.Duration, apiDispatcher fwk.APIDispatcher, genericWorkloadEnabled bool) *cacheImpl {
logger := klog.FromContext(ctx)
return &cacheImpl{
period: period,
stop: ctx.Done(),
nodes: make(map[string]*nodeInfoListItem),
nodeTree: newNodeTree(logger, nil),
assumedPods: sets.New[string](),
podStates: make(map[string]*podState),
imageStates: make(map[string]*fwk.ImageStateSummary),
apiDispatcher: apiDispatcher,
nodes: make(map[string]*nodeInfoListItem),
nodeTree: newNodeTree(logger, nil),
assumedPods: sets.New[string](),
podStates: make(map[string]*podState),
imageStates: make(map[string]*fwk.ImageStateSummary),
podGroupStates: make(map[podGroupKey]*podGroupState),
genericWorkloadEnabled: genericWorkloadEnabled,
apiDispatcher: apiDispatcher,
}
}
@ -284,9 +289,32 @@ func (cache *cacheImpl) UpdateSnapshot(logger klog.Logger, nodeSnapshot *Snapsho
return errors.New(errMsg)
}
// Take a snapshot of pod group states for this scheduling cycle.
cache.updatePodGroupStateSnapshot(nodeSnapshot)
return nil
}
// updatePodGroupStateSnapshot updates the pod group state portion of the given snapshot.
// It assumes that the cache lock is already held.
// It removes entries that no longer exist in the live cache
// and clones entries whose generation has advanced since the last snapshot.
func (cache *cacheImpl) updatePodGroupStateSnapshot(snapshot *Snapshot) {
// Remove pod group states from snapshot that no longer exist in cache.
for key := range snapshot.podGroupStates {
if _, exists := cache.podGroupStates[key]; !exists {
delete(snapshot.podGroupStates, key)
}
}
// Clone only pod group states that changed since the last snapshot.
for key, podGroupState := range cache.podGroupStates {
if existing, ok := snapshot.podGroupStates[key]; ok && existing.generation == podGroupState.generation {
continue
}
snapshot.podGroupStates[key] = podGroupState.snapshot()
}
}
func (cache *cacheImpl) updateNodeInfoSnapshotList(logger klog.Logger, snapshot *Snapshot, updateAll bool) {
snapshot.havePodsWithAffinityNodeInfoList = make([]fwk.NodeInfo, 0, cache.nodeTree.numNodes)
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = make([]fwk.NodeInfo, 0, cache.nodeTree.numNodes)
@ -400,7 +428,7 @@ func (cache *cacheImpl) ForgetPod(logger klog.Logger, pod *v1.Pod) error {
}
// Only assumed pod can be forgotten.
if cache.assumedPods.Has(key) {
return cache.removePod(logger, pod)
return cache.removePod(logger, pod, true)
}
return fmt.Errorf("pod %v(%v) wasn't assumed so cannot be forgotten", key, klog.KObj(pod))
}
@ -425,12 +453,21 @@ func (cache *cacheImpl) addPod(logger klog.Logger, pod *v1.Pod, assumePod bool)
if assumePod {
cache.assumedPods.Insert(key)
}
if !cache.isPodGroupMember(pod) {
return nil
}
if assumePod {
cache.assumePodGroupMember(pod)
} else {
cache.addPodGroupMember(pod)
}
return nil
}
// Assumes that lock is already acquired.
func (cache *cacheImpl) updatePod(logger klog.Logger, oldPod, newPod *v1.Pod) error {
if err := cache.removePod(logger, oldPod); err != nil {
if err := cache.removePod(logger, oldPod, false); err != nil {
return err
}
return cache.addPod(logger, newPod, false)
@ -440,7 +477,7 @@ func (cache *cacheImpl) updatePod(logger klog.Logger, oldPod, newPod *v1.Pod) er
// Removes a pod from the cached node info. If the node information was already
// removed and there are no more pods left in the node, cleans up the node from
// the cache.
func (cache *cacheImpl) removePod(logger klog.Logger, pod *v1.Pod) error {
func (cache *cacheImpl) removePod(logger klog.Logger, pod *v1.Pod, forgetPod bool) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
@ -462,6 +499,16 @@ func (cache *cacheImpl) removePod(logger klog.Logger, pod *v1.Pod) error {
delete(cache.podStates, key)
delete(cache.assumedPods, key)
if !cache.isPodGroupMember(pod) {
return nil
}
if forgetPod {
cache.forgetPodGroupMember(logger, pod)
} else {
cache.removePodGroupMember(pod)
}
return nil
}
@ -546,7 +593,7 @@ func (cache *cacheImpl) RemovePod(logger klog.Logger, pod *v1.Pod) error {
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}
return cache.removePod(logger, currState.pod)
return cache.removePod(logger, currState.pod, false)
}
func (cache *cacheImpl) IsAssumedPod(pod *v1.Pod) (bool, error) {
@ -714,6 +761,130 @@ func (cache *cacheImpl) updateMetrics() {
metrics.CacheSize.WithLabelValues("nodes").Set(float64(len(cache.nodes)))
}
// isPodGroupMember returns true if the pod belongs to a pod group,
// provided that GenericWorkload feature gate is enabled.
func (cache *cacheImpl) isPodGroupMember(pod *v1.Pod) bool {
return cache.genericWorkloadEnabled && pod.Spec.SchedulingGroup != nil
}
// AddPodGroupMember adds not assigned and not assumed pod to its pod group state in the cache.
func (cache *cacheImpl) AddPodGroupMember(pod *v1.Pod) {
if !cache.isPodGroupMember(pod) {
return
}
cache.mu.Lock()
defer cache.mu.Unlock()
cache.addPodGroupMember(pod)
}
// UpdatePodGroupMember updates a pod's entry inside its pod group state in the cache.
func (cache *cacheImpl) UpdatePodGroupMember(logger klog.Logger, oldPod, newPod *v1.Pod) {
if !cache.isPodGroupMember(newPod) {
return
}
cache.mu.Lock()
defer cache.mu.Unlock()
cache.updatePodGroupMember(logger, oldPod, newPod)
}
// RemovePodGroupMember removes the pod from its pod group state in the cache.
func (cache *cacheImpl) RemovePodGroupMember(pod *v1.Pod) {
if !cache.isPodGroupMember(pod) {
return
}
cache.mu.Lock()
defer cache.mu.Unlock()
cache.removePodGroupMember(pod)
}
// addPodGroupMember adds the pod to its pod group state, creating the group entry if it doesn't exist yet.
// Assumes that the cache lock is already held.
func (cache *cacheImpl) addPodGroupMember(pod *v1.Pod) {
key := newPodGroupKey(pod.Namespace, *pod.Spec.SchedulingGroup.PodGroupName)
podGroupState, exists := cache.podGroupStates[key]
if !exists {
podGroupState = newPodGroupState()
cache.podGroupStates[key] = podGroupState
}
podGroupState.addPod(pod)
}
// updatePodGroupMember updates the pod entry inside its pod group state.
// Assumes that the cache lock is already held.
func (cache *cacheImpl) updatePodGroupMember(logger klog.Logger, oldPod, newPod *v1.Pod) {
key := newPodGroupKey(newPod.Namespace, *newPod.Spec.SchedulingGroup.PodGroupName)
podGroupState, exists := cache.podGroupStates[key]
if !exists {
// This should not happen: the pod group state should have been already created by a prior pod add action.
utilruntime.HandleErrorWithLogger(logger, nil, "Pod group state not found for update, this indicates a missed add event", "pod", klog.KObj(newPod), "podGroupKey", key)
return
}
podGroupState.updatePod(oldPod, newPod)
}
// removePodGroupMember removes the pod from its pod group state, deleting the group entry when empty.
// Assumes that the cache lock is already held.
func (cache *cacheImpl) removePodGroupMember(pod *v1.Pod) {
key := newPodGroupKey(pod.Namespace, *pod.Spec.SchedulingGroup.PodGroupName)
podGroupState, exists := cache.podGroupStates[key]
if !exists {
return
}
podGroupState.deletePod(pod.UID)
if podGroupState.empty() {
delete(cache.podGroupStates, key)
}
}
// assumePodGroupMember marks the pod as assumed in its pod group state.
// Assumes that the cache lock is already held.
func (cache *cacheImpl) assumePodGroupMember(pod *v1.Pod) {
key := newPodGroupKey(pod.Namespace, *pod.Spec.SchedulingGroup.PodGroupName)
podGroupState, exists := cache.podGroupStates[key]
if !exists {
podGroupState = newPodGroupState()
cache.podGroupStates[key] = podGroupState
podGroupState.allPods[pod.UID] = pod
}
podGroupState.assumePod(pod.UID)
}
// forgetPodGroupMember moves the pod back from assumed to unscheduled in its pod group state.
// Assumes that the cache lock is already held.
func (cache *cacheImpl) forgetPodGroupMember(logger klog.Logger, pod *v1.Pod) {
key := newPodGroupKey(pod.Namespace, *pod.Spec.SchedulingGroup.PodGroupName)
pgs, exists := cache.podGroupStates[key]
if !exists {
// This should not happen: the pod group state should have been already created by a prior pod add or assume action.
utilruntime.HandleErrorWithLogger(logger, nil, "Pod group state not found for forget, this indicates a missed add or assume event", "pod", klog.KObj(pod), "podGroupKey", key)
return
}
pgs.forgetPod(pod.UID)
}
// PodGroupStates returns the PodGroupStateLister for this cache.
func (cache *cacheImpl) PodGroupStates() fwk.PodGroupStateLister {
return cache
}
// Get returns the pod group state for the given pod group.
func (cache *cacheImpl) Get(namespace string, podGroupName string) (fwk.PodGroupState, error) {
cache.mu.RLock()
defer cache.mu.RUnlock()
key := newPodGroupKey(namespace, podGroupName)
podGroupState, exists := cache.podGroupStates[key]
if !exists {
return nil, fmt.Errorf("pod group state not found for pod group %s", key)
}
return podGroupState, nil
}
// BindPod handles the pod binding by adding a bind API call to the dispatcher.
// This method should be used only if the SchedulerAsyncAPICalls feature gate is enabled.
func (cache *cacheImpl) BindPod(binding *v1.Binding) (<-chan error, error) {

View file

@ -47,6 +47,12 @@ var nodeInfoCmpOpts = []cmp.Option{
cmpopts.IgnoreFields(framework.PodInfo{}, "cachedResource"),
}
var podGroupStateCmpOpts = []cmp.Option{
cmp.AllowUnexported(podGroupStateSnapshot{}, podGroupStateData{}, podGroupKey{}),
cmpopts.IgnoreFields(podGroupStateData{}, "generation"),
cmpopts.EquateEmpty(),
}
func init() {
metrics.Register()
}
@ -235,7 +241,7 @@ func TestAssumePodScheduled(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
cache := newCache(ctx, time.Second, nil)
cache := newCache(ctx, time.Second, nil, false)
for _, pod := range tc.pods {
if err := cache.AssumePod(logger, pod); err != nil {
t.Fatalf("AssumePod failed: %v", err)
@ -296,7 +302,7 @@ func TestAddPodWillConfirm(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
cache := newCache(ctx, time.Second, nil)
cache := newCache(ctx, time.Second, nil, false)
for _, podToAssume := range test.podsToAssume {
if err := cache.AssumePod(logger, podToAssume); err != nil {
t.Fatalf("assumePod failed: %v", err)
@ -349,7 +355,7 @@ func TestDump(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
cache := newCache(ctx, time.Second, nil)
cache := newCache(ctx, time.Second, nil, false)
for _, podToAssume := range test.podsToAssume {
if err := cache.AssumePod(logger, podToAssume); err != nil {
t.Errorf("assumePod failed: %v", err)
@ -415,7 +421,7 @@ func TestAddPodAlwaysUpdatesPodInfoInNodeInfo(t *testing.T) {
},
}
cache := newCache(ctx, time.Second, nil)
cache := newCache(ctx, time.Second, nil, false)
for _, podToAssume := range test.podsToAssume {
if err := cache.AssumePod(logger, podToAssume); err != nil {
t.Fatalf("assumePod failed: %v", err)
@ -471,7 +477,7 @@ func TestAddPodWillReplaceAssumed(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
cache := newCache(ctx, time.Second, nil)
cache := newCache(ctx, time.Second, nil, false)
for _, podToAssume := range test.podsToAssume {
if err := cache.AssumePod(logger, podToAssume); err != nil {
t.Fatalf("assumePod failed: %v", err)
@ -540,7 +546,7 @@ func TestUpdatePod(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
cache := newCache(ctx, time.Second, nil)
cache := newCache(ctx, time.Second, nil, false)
for _, podToAdd := range test.podsToAdd {
if err := cache.AddPod(logger, podToAdd); err != nil {
t.Fatalf("AddPod failed: %v", err)
@ -600,7 +606,7 @@ func TestUpdatePodAndGet(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
cache := newCache(ctx, time.Second, nil)
cache := newCache(ctx, time.Second, nil, false)
// trying to get an unknown pod should return an error
// podToUpdate has not been added yet
if _, err := cache.GetPod(tc.podToUpdate); err == nil {
@ -666,7 +672,7 @@ func TestEphemeralStorageResource(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
cache := newCache(ctx, time.Second, nil)
cache := newCache(ctx, time.Second, nil, false)
if err := cache.AddPod(logger, test.pod); err != nil {
t.Fatalf("AddPod failed: %v", err)
}
@ -683,6 +689,390 @@ func TestEphemeralStorageResource(t *testing.T) {
}
}
func Test_AddPodGroupMember(t *testing.T) {
podGroupName := "pg"
// Pod with no pod group name.
pod1 := st.MakePod().Namespace("namespace").Name("non-workload-pod").Obj()
// Unscheduled pod with a pod group name.
pod2 := st.MakePod().Namespace("namespace").Name("unscheduled-pod").PodGroupName(podGroupName).Obj()
// Assigned pod with the same pod group name.
pod3 := st.MakePod().Namespace("namespace").Name("assigned-pod").Node("node1").PodGroupName(podGroupName).Obj()
tests := []struct {
name string
pod *v1.Pod
genericWorkloadEnabled bool
expectInUnscheduledPods bool
expectInAssignedPods bool
}{
{
name: "generic workload disabled",
pod: pod2,
genericWorkloadEnabled: false,
},
{
name: "pod with no pod group name",
pod: pod1,
genericWorkloadEnabled: true,
},
{
name: "unscheduled pod with a pod group name",
pod: pod2,
genericWorkloadEnabled: true,
expectInUnscheduledPods: true,
},
{
name: "assigned pod with a pod group name",
pod: pod3,
genericWorkloadEnabled: true,
expectInAssignedPods: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cache := newCache(context.Background(), time.Second, nil, tt.genericWorkloadEnabled)
cache.AddPodGroupMember(tt.pod)
if tt.pod.Spec.SchedulingGroup == nil {
if tt.expectInAssignedPods || tt.expectInUnscheduledPods {
t.Errorf("Expected pod group to exist, but pod has no pod group")
}
return
}
podGroupState, err := cache.PodGroupStates().Get(tt.pod.Namespace, *tt.pod.Spec.SchedulingGroup.PodGroupName)
if err != nil {
if tt.genericWorkloadEnabled {
t.Errorf("Expected pod group to exist, but got error: %v", err)
}
return
}
_, inUnscheduledPods := podGroupState.UnscheduledPods()[tt.pod.Name]
if inUnscheduledPods != tt.expectInUnscheduledPods {
t.Errorf("expected pod in UnscheduledPods: %v, got %v", tt.expectInUnscheduledPods, inUnscheduledPods)
}
if inAssignedPods := podGroupState.AssignedPods().Has(tt.pod.UID); inAssignedPods != tt.expectInAssignedPods {
t.Errorf("expected pod in AssignedPods: %v, got %v", tt.expectInAssignedPods, inAssignedPods)
}
})
}
}
func Test_UpdatePodGroupMember(t *testing.T) {
podGroupName := "pg"
// unscheduled pod with a pod group name
pod := st.MakePod().Namespace("namespace").Name("unscheduled-pod").UID("pod1").
PodGroupName(podGroupName).Obj()
// updated unscheduled pod with a pod group name
updatedPod := st.MakePod().Namespace("namespace").Name("unscheduled-pod").UID("pod1").
Labels(map[string]string{"foo": "bar"}).PodGroupName(podGroupName).Obj()
// assigned pod with a pod group name
assignedPod := st.MakePod().Namespace("namespace").Name("assigned-pod").UID("pod2").Node("node").PodGroupName(podGroupName).Obj()
// pod with no pod group name
noPodGroupPod := st.MakePod().Namespace("namespace").Name("no-pod-group-pod").UID("pod3").Obj()
// updated pod with no pod group name
updatedNoPodGroupPod := st.MakePod().Namespace("namespace").Name("no-pod-group-pod").UID("pod3").
Labels(map[string]string{"foo": "bar"}).Obj()
tests := []struct {
name string
isAssumedPod bool
oldPod *v1.Pod
newPod *v1.Pod
genericWorkloadEnabled bool
expectInAssumedPods bool
expectInUnscheduledPods bool
expectInAssignedPods bool
}{
{
name: "updating a pod with genericWorkload disabled should be a no-op",
oldPod: pod,
newPod: updatedPod,
genericWorkloadEnabled: false,
expectInUnscheduledPods: true,
},
{
name: "update a pod with no pod group name should be a no-op",
oldPod: noPodGroupPod,
newPod: updatedNoPodGroupPod,
genericWorkloadEnabled: true,
},
{
name: "update a pod",
isAssumedPod: true,
oldPod: pod,
newPod: updatedPod,
genericWorkloadEnabled: true,
expectInUnscheduledPods: true,
},
{
name: "update a pod, move to assigned",
isAssumedPod: true,
oldPod: pod,
newPod: assignedPod,
genericWorkloadEnabled: true,
expectInAssignedPods: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
cache := newCache(context.Background(), time.Second, nil, true)
cache.AddPodGroupMember(tt.oldPod)
cache.genericWorkloadEnabled = tt.genericWorkloadEnabled
newPod := tt.newPod
if newPod == nil {
newPod = tt.oldPod
}
cache.UpdatePodGroupMember(logger, tt.oldPod, newPod)
if newPod.Spec.SchedulingGroup == nil {
if tt.expectInAssumedPods || tt.expectInUnscheduledPods || tt.expectInAssignedPods {
t.Errorf("Expected pod group to exist, but pod has no SchedulingGroup")
}
return
}
podGroupState, err := cache.PodGroupStates().Get(newPod.Namespace, *newPod.Spec.SchedulingGroup.PodGroupName)
if err != nil {
return
}
_, inUnscheduledPods := podGroupState.UnscheduledPods()[newPod.Name]
if inUnscheduledPods != tt.expectInUnscheduledPods {
t.Errorf("expected pod in UnscheduledPods: %v, got %v", tt.expectInUnscheduledPods, inUnscheduledPods)
}
if inAssignedPods := podGroupState.AssignedPods().Has(newPod.UID); inAssignedPods != tt.expectInAssignedPods {
t.Errorf("expected pod in AssignedPods: %v, got %v", tt.expectInAssignedPods, inAssignedPods)
}
if inAssumedPods := podGroupState.AssumedPods().Has(newPod.UID); inAssumedPods != tt.expectInAssumedPods {
t.Errorf("expected pod in AssumedPods: %v, got %v", tt.expectInAssumedPods, inAssumedPods)
}
if !tt.genericWorkloadEnabled {
return
}
podGroupKey := newPodGroupKey(newPod.Namespace, *newPod.Spec.SchedulingGroup.PodGroupName)
gotPod := cache.podGroupStates[podGroupKey].allPods[newPod.UID]
if diff := cmp.Diff(tt.newPod, gotPod); diff != "" {
t.Errorf("stored pod does not match newPod (-want +got):\n%s", diff)
}
})
}
}
func Test_RemovePodGroupMember(t *testing.T) {
podGroupName := "pg"
pod1 := st.MakePod().Namespace("namespace").Name("unscheduled-pod").UID("pod1").
PodGroupName(podGroupName).Obj()
pod2 := st.MakePod().Namespace("namespace").Name("assigned-pod").UID("pod2").Node("node").
PodGroupName(podGroupName).Obj()
tests := []struct {
name string
initPods []*v1.Pod
podToDelete *v1.Pod
expectPodGroupStateCount int
genericWorkloadEnabled bool
}{
{
name: "remove a pod from a group with multiple pods",
initPods: []*v1.Pod{pod1, pod2},
podToDelete: pod1,
expectPodGroupStateCount: 1,
genericWorkloadEnabled: true,
},
{
name: "remove a last pod from a group",
initPods: []*v1.Pod{pod1},
podToDelete: pod1,
expectPodGroupStateCount: 0,
genericWorkloadEnabled: true,
},
{
name: "remove a non-existent pod from a group should be a no-op",
podToDelete: pod1,
expectPodGroupStateCount: 0,
genericWorkloadEnabled: true,
},
{
name: "remove a non-existent pod from a group should be a no-op",
podToDelete: pod1,
expectPodGroupStateCount: 0,
genericWorkloadEnabled: true,
},
{
name: "remove a pod while generic workload disabled should be a no-op",
initPods: []*v1.Pod{pod1},
expectPodGroupStateCount: 0,
podToDelete: pod1,
genericWorkloadEnabled: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cache := newCache(context.Background(), time.Second, nil, tt.genericWorkloadEnabled)
for _, pod := range tt.initPods {
cache.AddPodGroupMember(pod)
}
cache.RemovePodGroupMember(tt.podToDelete)
podGroupStateCount := len(cache.podGroupStates)
if podGroupStateCount != tt.expectPodGroupStateCount {
t.Errorf("expected %d pod groups remaining, got %d", tt.expectPodGroupStateCount, podGroupStateCount)
}
if podGroupStateCount == 0 {
return
}
podGroupState, err := cache.PodGroupStates().Get(tt.podToDelete.Namespace, *tt.podToDelete.Spec.SchedulingGroup.PodGroupName)
if err != nil {
t.Fatalf("Unexpected error getting pod group state: %v", err)
}
if podGroupState.AllPods().Has(tt.podToDelete.UID) {
t.Errorf("Expected pod %s to be deleted from pod group but it still exists", tt.podToDelete.UID)
}
})
}
}
// TestUpdatePodGroupStateSnapshot tests that pod group states of the snapshot have
// their data and generations updated properly.
func TestUpdatePodGroupStateSnapshot(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
cache := newCache(ctx, time.Second, nil, true)
podGroupName1 := "pg1"
podGroupName2 := "pg2"
pod1 := st.MakePod().Namespace("ns").Name("pod1").UID("uid1").PodGroupName(podGroupName1).Obj()
pod2 := st.MakePod().Namespace("ns").Name("pod2").UID("uid2").PodGroupName(podGroupName1).Obj()
pod3 := st.MakePod().Namespace("ns").Name("pod3").UID("uid3").PodGroupName(podGroupName2).Obj()
snapshot := NewEmptySnapshot()
tests := []struct {
name string
action func()
expectedPods []*v1.Pod
}{
{
name: "add a pod group member and update snapshot",
action: func() { cache.AddPodGroupMember(pod1) },
expectedPods: []*v1.Pod{pod1},
},
{
name: "add a pod with different pod group and update snapshot",
action: func() { cache.AddPodGroupMember(pod3) },
expectedPods: []*v1.Pod{pod1, pod3},
},
{
name: "remove a last pod group member and update snapshot",
action: func() { cache.RemovePodGroupMember(pod1) },
expectedPods: []*v1.Pod{pod3},
},
{
name: "add a pod to a recently deleted pod group and update snapshot",
action: func() { cache.AddPodGroupMember(pod2) },
expectedPods: []*v1.Pod{pod2, pod3},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Capture cache generations before snapshot update to detect which pod groups are going to be modified.
prevCacheGenerations := make(map[podGroupKey]int64, len(cache.podGroupStates))
for key, pgs := range cache.podGroupStates {
prevCacheGenerations[key] = pgs.generation
}
tt.action()
if err := cache.UpdateSnapshot(logger, snapshot); err != nil {
t.Fatalf("UpdateSnapshot failed: %v", err)
}
// For each pod group that the action modified (its cache generation advanced), the snapshot generation must have advanced too.
// Unmodified pod groups keep their previous generation.
for key, pgs := range snapshot.podGroupStates {
cachePgs, ok := cache.podGroupStates[key]
if !ok {
continue
}
if cachePgs.generation > prevCacheGenerations[key] {
if pgs.generation <= prevCacheGenerations[key] {
t.Errorf("pod group %s was modified but snapshot generation (%d) was not incremented (%d)", key, pgs.generation, prevCacheGenerations[key])
}
}
}
expectedPodGroupStatesSnapshot := createPodGroupStates(tt.expectedPods)
if diff := cmp.Diff(expectedPodGroupStatesSnapshot, snapshot.podGroupStates, podGroupStateCmpOpts...); diff != "" {
t.Errorf("snapshot data mismatch (-want +got):\n%s", diff)
}
})
}
}
// Test_BindingPodGroupMember simulates binding and tests that when an assumed pod
// gets bound, its state within pod group transitions from assumed to assigned.
func Test_BindingPodGroupMember(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
cache := newCache(ctx, time.Second, nil, true)
podGroupName := "pg"
pod := st.MakePod().Namespace("namespace").Name("pod1").UID("pod1-uid").
PodGroupName(podGroupName).Obj()
// Simulate the informer firing an Add event for an unscheduled
// pod (no NodeName set) reflecting on PodGroupStates.
cache.AddPodGroupMember(pod)
// Simulate the scheduler assuming the pod on a node.
assumedPod := pod.DeepCopy()
assumedPod.Spec.NodeName = "node1"
if err := cache.AssumePod(logger, assumedPod); err != nil {
t.Fatalf("AssumePod failed: %v", err)
}
podGroupState, err := cache.PodGroupStates().Get(pod.Namespace, podGroupName)
if err != nil {
t.Fatalf("Unexpected error getting pod group state after AssumePod: %v", err)
}
if !podGroupState.AssumedPods().Has(assumedPod.UID) {
t.Errorf("Expected pod to be in AssumedPods after AssumePod")
}
if podGroupState.AssignedPods().Has(assumedPod.UID) {
t.Errorf("Expected pod NOT to be in AssignedPods after AssumePod")
}
// Simulate binding confirmation: the informer fires an Add event with NodeName set.
if err := cache.AddPod(logger, assumedPod); err != nil {
t.Fatalf("AddPod (binding confirmation) failed: %v", err)
}
podGroupState, err = cache.PodGroupStates().Get(pod.Namespace, podGroupName)
if err != nil {
t.Fatalf("Unexpected error getting pod group state after AddPod: %v", err)
}
if podGroupState.AssumedPods().Has(assumedPod.UID) {
t.Errorf("Expected pod not to be in AssumedPods after binding confirmation")
}
if !podGroupState.AssignedPods().Has(assumedPod.UID) {
t.Errorf("Expected pod to be in AssignedPods after binding confirmation")
}
}
// TestRemovePod tests after added pod is removed, its information should also be subtracted.
func TestRemovePod(t *testing.T) {
pod := makeBasePod(t, "node-1", "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
@ -720,7 +1110,7 @@ func TestRemovePod(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
nodeName := pod.Spec.NodeName
cache := newCache(ctx, time.Second, nil)
cache := newCache(ctx, time.Second, nil, false)
// Add/Assume pod succeeds even before adding the nodes.
if tt.assume {
if err := cache.AddPod(logger, pod); err != nil {
@ -768,7 +1158,7 @@ func TestForgetPod(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
cache := newCache(ctx, time.Second, nil)
cache := newCache(ctx, time.Second, nil, false)
for _, pod := range pods {
if err := cache.AssumePod(logger, pod); err != nil {
t.Fatalf("assumePod failed: %v", err)
@ -981,7 +1371,7 @@ func TestNodeOperators(t *testing.T) {
imageStates := buildImageStates(tc.nodes)
expected := buildNodeInfo(node, tc.pods, imageStates)
cache := newCache(ctx, time.Second, nil)
cache := newCache(ctx, time.Second, nil, false)
for _, nodeItem := range tc.nodes {
cache.AddNode(logger, nodeItem)
}
@ -1115,9 +1505,180 @@ func TestNodeOperators(t *testing.T) {
}
}
// TestPodGroupPodOperations tests that operations (Add, Update, Remove, Assume, Forget) on
// pods with pod group name properly update PodGroupStates only when GenericWorkload feature gate is enabled.
func TestPodGroupPodOperations(t *testing.T) {
groupName := "pg"
pod := st.MakePod().Namespace("test-ns").Name("pod-0").UID("uid-0").
PodGroupName(groupName).Obj()
type state struct {
podGroupStatesCount int
assignedCount int
unscheduledCount int
assumedCount int
}
tests := []struct {
name string
genericWorkloadEnabled bool
setup func(*testing.T, *cacheImpl, context.Context)
operation func(*testing.T, *cacheImpl, context.Context)
expected state
}{
{
name: "AddPod with GenericWorkload disabled",
genericWorkloadEnabled: false,
operation: func(t *testing.T, cache *cacheImpl, ctx context.Context) {
if err := cache.AddPod(klog.FromContext(ctx), pod); err != nil {
t.Fatalf("AddPod failed: %v", err)
}
},
expected: state{podGroupStatesCount: 0},
},
{
name: "AddPod with GenericWorkload enabled",
genericWorkloadEnabled: true,
operation: func(t *testing.T, cache *cacheImpl, ctx context.Context) {
if err := cache.AddPod(klog.FromContext(ctx), pod); err != nil {
t.Fatalf("AddPod failed: %v", err)
}
},
expected: state{podGroupStatesCount: 1, unscheduledCount: 1, assignedCount: 0, assumedCount: 0},
},
{
name: "AssumePod with GenericWorkload disabled",
genericWorkloadEnabled: false,
operation: func(t *testing.T, cache *cacheImpl, ctx context.Context) {
if err := cache.AssumePod(klog.FromContext(ctx), pod); err != nil {
t.Fatalf("AssumePod failed: %v", err)
}
},
expected: state{podGroupStatesCount: 0},
},
{
name: "AssumePod with GenericWorkload enabled",
genericWorkloadEnabled: true,
operation: func(t *testing.T, cache *cacheImpl, ctx context.Context) {
if err := cache.AssumePod(klog.FromContext(ctx), pod); err != nil {
t.Fatalf("AssumePod failed: %v", err)
}
},
expected: state{podGroupStatesCount: 1, assignedCount: 0, unscheduledCount: 0, assumedCount: 1},
},
{
name: "ForgetPod with GenericWorkload disabled",
genericWorkloadEnabled: false,
setup: func(t *testing.T, cache *cacheImpl, ctx context.Context) {
if err := cache.AssumePod(klog.FromContext(ctx), pod); err != nil {
t.Fatalf("AssumePod failed: %v", err)
}
},
operation: func(t *testing.T, cache *cacheImpl, ctx context.Context) {
if err := cache.ForgetPod(klog.FromContext(ctx), pod); err != nil {
t.Fatalf("ForgetPod failed: %v", err)
}
},
expected: state{podGroupStatesCount: 1, assignedCount: 0, unscheduledCount: 0, assumedCount: 1},
},
{
name: "ForgetPod with GenericWorkload enabled",
genericWorkloadEnabled: true,
setup: func(t *testing.T, cache *cacheImpl, ctx context.Context) {
if err := cache.AssumePod(klog.FromContext(ctx), pod); err != nil {
t.Fatalf("AssumePod failed: %v", err)
}
},
operation: func(t *testing.T, cache *cacheImpl, ctx context.Context) {
if err := cache.ForgetPod(klog.FromContext(ctx), pod); err != nil {
t.Fatalf("ForgetPod failed: %v", err)
}
},
expected: state{podGroupStatesCount: 1, unscheduledCount: 1},
},
{
name: "RemovePod with GenericWorkload disabled",
genericWorkloadEnabled: false,
setup: func(t *testing.T, cache *cacheImpl, ctx context.Context) {
if err := cache.AddPod(klog.FromContext(ctx), pod); err != nil {
t.Fatalf("AddPod failed: %v", err)
}
},
operation: func(t *testing.T, cache *cacheImpl, ctx context.Context) {
if err := cache.RemovePod(klog.FromContext(ctx), pod); err != nil {
t.Fatalf("RemovePod failed: %v", err)
}
},
expected: state{podGroupStatesCount: 1, assignedCount: 0, unscheduledCount: 1, assumedCount: 0},
},
{
name: "RemovePod with GenericWorkload enabled",
genericWorkloadEnabled: true,
setup: func(t *testing.T, cache *cacheImpl, ctx context.Context) {
if err := cache.AddPod(klog.FromContext(ctx), pod); err != nil {
t.Fatalf("AddPod failed: %v", err)
}
},
operation: func(t *testing.T, cache *cacheImpl, ctx context.Context) {
if err := cache.RemovePod(klog.FromContext(ctx), pod); err != nil {
t.Fatalf("RemovePod failed: %v", err)
}
},
expected: state{podGroupStatesCount: 0},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Initialize cache with feature gate enabled to ensure group state is
// properly established for operations that require it.
cache := newCache(ctx, time.Second, nil, true)
if tt.setup != nil {
tt.setup(t, cache, ctx)
}
cache.genericWorkloadEnabled = tt.genericWorkloadEnabled
tt.operation(t, cache, ctx)
if count := len(cache.podGroupStates); count != tt.expected.podGroupStatesCount {
t.Errorf("expected %d pod group states, got %d", tt.expected.podGroupStatesCount, count)
}
if tt.expected.podGroupStatesCount == 0 {
return
}
pgs, err := cache.PodGroupStates().Get("test-ns", groupName)
if err != nil {
t.Fatalf("unexpected error getting pod group state: %v", err)
}
assignedCount := pgs.AssignedPods().Len()
if assignedCount != tt.expected.assignedCount {
t.Errorf("expected %d pods in assignedPods, got %d", tt.expected.assignedCount, assignedCount)
}
unscheduledCount := len(pgs.UnscheduledPods())
if unscheduledCount != tt.expected.unscheduledCount {
t.Errorf("expected %d pods in unscheduledPods, got %d", tt.expected.unscheduledCount, unscheduledCount)
}
assumedCount := pgs.AssumedPods().Len()
if assumedCount != tt.expected.assumedCount {
t.Errorf("expected %d pods in assumedPods, got %d", tt.expected.assumedCount, assumedCount)
}
})
}
}
func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
var podGroupName = "pg"
// Create a few nodes to be used in tests.
var nodes []*v1.Node
for i := 0; i < 10; i++ {
@ -1175,6 +1736,15 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
PVC(fmt.Sprintf("test-pvc%v", pvcID)).Node(fmt.Sprintf("test-node%v", node)).Obj()
}
// Add a few pods with a pod group name
var podsWithPodGroupName []*v1.Pod
for i := range 20 {
pod := st.MakePod().Name(fmt.Sprintf("p-podgroup-%v", i)).Namespace("test-ns").UID(fmt.Sprintf("puid-podgroup-%v", i)).
PodGroupName(fmt.Sprintf("%s-%v", podGroupName, i)).
Node(fmt.Sprintf("test-node%v", i)).Obj()
podsWithPodGroupName = append(podsWithPodGroupName, pod)
}
var cache *cacheImpl
var snapshot *Snapshot
type operation = func(t *testing.T)
@ -1218,6 +1788,13 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
}
}
}
addPodWithPodGroupName := func(i int) operation {
return func(t *testing.T) {
if err := cache.AddPod(logger, podsWithPodGroupName[i]); err != nil {
t.Error(err)
}
}
}
removePod := func(i int) operation {
return func(t *testing.T) {
if err := cache.RemovePod(logger, pods[i]); err != nil {
@ -1240,6 +1817,13 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
}
}
}
removePodWithPodGroupName := func(i int) operation {
return func(t *testing.T) {
if err := cache.RemovePod(logger, podsWithPodGroupName[i]); err != nil {
t.Error(err)
}
}
}
updatePod := func(i int) operation {
return func(t *testing.T) {
if err := cache.UpdatePod(logger, pods[i], updatedPods[i]); err != nil {
@ -1247,6 +1831,13 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
}
}
}
updatePodWithPodGroupName := func(i int) operation {
return func(t *testing.T) {
if err := cache.UpdatePod(logger, podsWithPodGroupName[i], podsWithPodGroupName[i]); err != nil {
t.Error(err)
}
}
}
assumePod := func(i int) operation {
return func(t *testing.T) {
if err := cache.AssumePod(logger, pods[i]); err != nil {
@ -1348,11 +1939,12 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
}
tests := []struct {
name string
operations []operation
expected []*v1.Node
expectedHavePodsWithAffinity int
expectedUsedPVCSet sets.Set[string]
name string
operations []operation
expected []*v1.Node
expectedHavePodsWithAffinity int
expectedPodGroupStatesSnapshot map[podGroupKey]*podGroupStateSnapshot
expectedUsedPVCSet sets.Set[string]
}{
{
name: "Empty cache",
@ -1547,6 +2139,33 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
expected: []*v1.Node{nodes[1], nodes[0]},
expectedUsedPVCSet: sets.New("test-ns/test-pvc2", "test-ns/test-pvc3"),
},
{
name: "Add, Update and Remove multiple pods with SchedulingGroup",
operations: []operation{
addNode(0), addNode(1), addNode(2), addPodWithPodGroupName(0), addPodWithPodGroupName(1),
addPodWithPodGroupName(2), updateSnapshot(),
updatePodWithPodGroupName(0), removePodWithPodGroupName(1), updateSnapshot(),
},
expected: []*v1.Node{nodes[1], nodes[0], nodes[2]},
expectedPodGroupStatesSnapshot: map[podGroupKey]*podGroupStateSnapshot{
newPodGroupKey("test-ns", "pg-0"): {
podGroupStateData: podGroupStateData{
allPods: map[types.UID]*v1.Pod{"puid-podgroup-0": podsWithPodGroupName[0]},
assignedPods: sets.New[types.UID]("puid-podgroup-0"),
unscheduledPods: sets.New[types.UID](),
assumedPods: sets.New[types.UID](),
},
},
newPodGroupKey("test-ns", "pg-2"): {
podGroupStateData: podGroupStateData{
allPods: map[types.UID]*v1.Pod{"puid-podgroup-2": podsWithPodGroupName[2]},
assignedPods: sets.New[types.UID]("puid-podgroup-2"),
unscheduledPods: sets.New[types.UID](),
assumedPods: sets.New[types.UID](),
},
},
},
},
{
name: "Add and Remove multiple pods with PVC",
operations: []operation{
@ -1596,7 +2215,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
cache = newCache(ctx, time.Second, nil)
cache = newCache(ctx, time.Second, nil, true)
snapshot = NewEmptySnapshot()
for _, op := range test.operations {
@ -1619,6 +2238,11 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
t.Errorf("Not all the nodes were visited by following the NodeInfo linked list. Expected to see %v nodes, saw %v.", len(cache.nodes), i)
}
// Check pod group states in the snapshot.
if diff := cmp.Diff(test.expectedPodGroupStatesSnapshot, snapshot.podGroupStates, podGroupStateCmpOpts...); diff != "" {
t.Errorf("unexpected podGroupStates in snapshot (-want, +got):\n%s", diff)
}
// Check number of nodes with pods with affinity
if len(snapshot.havePodsWithAffinityNodeInfoList) != test.expectedHavePodsWithAffinity {
t.Errorf("unexpected number of HavePodsWithAffinity nodes. Expected: %v, got: %v", test.expectedHavePodsWithAffinity, len(snapshot.havePodsWithAffinityNodeInfoList))
@ -1830,7 +2454,7 @@ func TestSchedulerCache_updateNodeInfoSnapshotList(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
cache = newCache(ctx, time.Second, nil)
cache = newCache(ctx, time.Second, nil, false)
snapshot = NewEmptySnapshot()
test.operations(t)
@ -1911,7 +2535,7 @@ func setupCacheOf1kNodes30kPods(b *testing.B) Cache {
logger, ctx := ktesting.NewTestContext(b)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
cache := newCache(ctx, time.Second, nil)
cache := newCache(ctx, time.Second, nil, false)
for i := 0; i < 1000; i++ {
nodeName := fmt.Sprintf("node-%d", i)
cache.AddNode(logger, st.MakeNode().Name(nodeName).Obj())

View file

@ -20,6 +20,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
fwk "k8s.io/kube-scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
@ -107,6 +108,18 @@ type Cache interface {
// BindPod handles the pod binding by adding a bind API call to the dispatcher.
// This method should be used only if the SchedulerAsyncAPICalls feature gate is enabled.
BindPod(binding *v1.Binding) (<-chan error, error)
// PodGroupStates returns a PodGroupStateLister.
PodGroupStates() fwk.PodGroupStateLister
// AddPodGroupMember adds not assigned and not assumed pod to its pod group state.
AddPodGroupMember(pod *v1.Pod)
// UpdatePodGroupMember updates a pod in its pod group state.
UpdatePodGroupMember(logger klog.Logger, oldPod, newPod *v1.Pod)
// RemovePodGroupMember removes a pod from its pod group state.
RemovePodGroupMember(pod *v1.Pod)
}
// Dump is a dump of the cache state.

View file

@ -0,0 +1,354 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"maps"
"sync"
"sync/atomic"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
)
var generation atomic.Int64
// nextPodGroupGeneration increments generation numbers monotonically for a pod group state (instead of per-instance increment)
// to prevent generation reset or collision when a pod group is deleted and recreated with the same name.
func nextPodGroupGeneration() int64 {
return generation.Add(1)
}
// podGroupKey uniquely identifies a specific instance of a PodGroup.
type podGroupKey struct {
name string
namespace string
}
func (pgk podGroupKey) GetName() string {
return pgk.name
}
func (pgk podGroupKey) GetNamespace() string {
return pgk.namespace
}
func (pgk podGroupKey) String() string {
return pgk.namespace + "/" + pgk.GetName()
}
var _ klog.KMetadata = &podGroupKey{}
func newPodGroupKey(namespace string, name string) podGroupKey {
return podGroupKey{
namespace: namespace,
name: name,
}
}
// podGroupStateData holds data and functionality shared between podGroupState and podGroupStateSnapshot.
type podGroupStateData struct {
// generation gets bumped whenever the data is changed.
// It's used to detect changes and avoid unnecessary cloning when taking a snapshot.
generation int64
// allPods tracks all pods belonging to the group that are known to the scheduler.
allPods map[types.UID]*v1.Pod
// unscheduledPods tracks all pods that are unscheduled for this group,
// i.e., are neither assumed nor assigned.
unscheduledPods sets.Set[types.UID]
// assumedPods tracks pods that have reached the Reserve stage and are waiting
// for the rest of the gang to arrive before being allowed to bind.
assumedPods sets.Set[types.UID]
// assignedPods tracks all pods belonging to the group that are assigned (bound).
assignedPods sets.Set[types.UID]
}
func newPodGroupStateData() podGroupStateData {
return podGroupStateData{
allPods: make(map[types.UID]*v1.Pod),
unscheduledPods: sets.New[types.UID](),
assumedPods: sets.New[types.UID](),
assignedPods: sets.New[types.UID](),
}
}
// addPod adds the pod to this group.
// Depending on the NodeName, it can insert the pod into either assignedPods or unscheduledPods.
func (d *podGroupStateData) addPod(pod *v1.Pod) {
d.generation = nextPodGroupGeneration()
d.allPods[pod.UID] = pod
if pod.Spec.NodeName != "" {
d.assignedPods.Insert(pod.UID)
} else {
d.unscheduledPods.Insert(pod.UID)
}
}
// updatePod updates the pod in this group.
// In case of binding, it moves the pod to assignedPods.
func (d *podGroupStateData) updatePod(oldPod, newPod *v1.Pod) {
d.generation = nextPodGroupGeneration()
d.allPods[newPod.UID] = newPod
if oldPod.Spec.NodeName == "" && newPod.Spec.NodeName != "" {
d.assignedPods.Insert(newPod.UID)
// Clear pod from unscheduled and assumed when it is assigned.
d.unscheduledPods.Delete(newPod.UID)
d.assumedPods.Delete(newPod.UID)
}
}
// deletePod removes the pod from this pod group state.
func (d *podGroupStateData) deletePod(podUID types.UID) {
d.generation = nextPodGroupGeneration()
delete(d.allPods, podUID)
d.unscheduledPods.Delete(podUID)
d.assumedPods.Delete(podUID)
d.assignedPods.Delete(podUID)
}
// assumePod marks a pod as assumed within the pod group state.
func (d *podGroupStateData) assumePod(podUID types.UID) {
pod := d.allPods[podUID]
// A scheduling pod may be removed from the cluster.
// In that case, we just ignore it.
if pod == nil {
return
}
d.generation = nextPodGroupGeneration()
// If the pod is already assigned, put it into assignedPods.
// Otherwise put it to assumedPods.
if pod.Spec.NodeName != "" {
d.assignedPods.Insert(pod.UID)
} else {
d.assumedPods.Insert(pod.UID)
}
d.unscheduledPods.Delete(pod.UID)
}
// forgetPod moves a pod back from the assumed state to unscheduled within the pod group state.
func (d *podGroupStateData) forgetPod(podUID types.UID) {
pod := d.allPods[podUID]
// A scheduling pod may be removed from the cluster.
// In that case, we just ignore it.
if pod == nil {
return
}
d.generation = nextPodGroupGeneration()
d.assumedPods.Delete(podUID)
// If the pod is already assigned, put it into assignedPods.
// Otherwise, put it into unscheduledPods.
if pod.Spec.NodeName != "" {
d.assignedPods.Insert(podUID)
} else {
d.unscheduledPods.Insert(podUID)
}
}
// scheduledPods returns the pods that are either assumed or assigned for this pod group.
func (d *podGroupStateData) scheduledPods() []*v1.Pod {
scheduledPods := make([]*v1.Pod, 0, len(d.assignedPods)+len(d.assumedPods))
for uid := range d.assignedPods {
scheduledPods = append(scheduledPods, d.allPods[uid])
}
for uid := range d.assumedPods {
scheduledPods = append(scheduledPods, d.allPods[uid])
}
return scheduledPods
}
// empty returns true when the pod group state contains no pods.
func (d *podGroupStateData) empty() bool {
return len(d.allPods) == 0
}
// allPodsCount returns the number of all pods known to the scheduler for this group.
func (d *podGroupStateData) allPodsCount() int {
return len(d.allPods)
}
// scheduledPodsCount returns the number of pods for this group that are either assumed or assigned.
func (d *podGroupStateData) scheduledPodsCount() int {
return len(d.assumedPods) + len(d.assignedPods)
}
// deepCopy returns a deep copy of the pod group state data.
func (d *podGroupStateData) deepCopy() podGroupStateData {
return podGroupStateData{
generation: d.generation,
allPods: maps.Clone(d.allPods),
unscheduledPods: d.unscheduledPods.Clone(),
assumedPods: d.assumedPods.Clone(),
assignedPods: d.assignedPods.Clone(),
}
}
// unscheduledPodsMap returns all unscheduled pods for this pod group.
func (d *podGroupStateData) unscheduledPodsMap() map[string]*v1.Pod {
result := make(map[string]*v1.Pod, len(d.unscheduledPods))
for podUID := range d.unscheduledPods {
pod := d.allPods[podUID]
result[pod.Name] = pod
}
return result
}
// podGroupState holds the runtime state of a pod group.
type podGroupState struct {
lock sync.RWMutex
podGroupStateData
}
func newPodGroupState() *podGroupState {
return &podGroupState{podGroupStateData: newPodGroupStateData()}
}
// snapshot returns a deep copy of the live pod group state as an immutable snapshot.
// It must be called under the cache lock.
func (pgs *podGroupState) snapshot() *podGroupStateSnapshot {
return &podGroupStateSnapshot{podGroupStateData: pgs.podGroupStateData.deepCopy()}
}
// empty returns true when the group contains no pods.
// It must be called under the cache lock.
func (pgs *podGroupState) empty() bool {
return pgs.podGroupStateData.empty()
}
// forgetPod moves a pod back from the assumed state to unscheduled.
// It must be called under the cache lock.
func (pgs *podGroupState) forgetPod(podUID types.UID) {
pgs.podGroupStateData.forgetPod(podUID)
}
// AllPods returns the UIDs of all pods known to the scheduler for this group.
func (pgs *podGroupState) AllPods() sets.Set[types.UID] {
pgs.lock.RLock()
defer pgs.lock.RUnlock()
return sets.KeySet(pgs.podGroupStateData.allPods)
}
// AllPodsCount returns the number of all pods known to the scheduler for this group.
func (pgs *podGroupState) AllPodsCount() int {
pgs.lock.RLock()
defer pgs.lock.RUnlock()
return pgs.podGroupStateData.allPodsCount()
}
// UnscheduledPods returns all pods that are unscheduled for this group,
// i.e., are neither assumed nor assigned.
// The returned map type corresponds to the argument of the PodActivator.Activate method.
func (pgs *podGroupState) UnscheduledPods() map[string]*v1.Pod {
pgs.lock.RLock()
defer pgs.lock.RUnlock()
return pgs.podGroupStateData.unscheduledPodsMap()
}
// AssumedPods returns the UIDs of all pods for this group in the assumed state,
// i.e., that have passed the Reserve stage.
func (pgs *podGroupState) AssumedPods() sets.Set[types.UID] {
pgs.lock.RLock()
defer pgs.lock.RUnlock()
return pgs.podGroupStateData.assumedPods.Clone()
}
// AssignedPods returns the UIDs of all pods already assigned (bound) for this group.
func (pgs *podGroupState) AssignedPods() sets.Set[types.UID] {
pgs.lock.RLock()
defer pgs.lock.RUnlock()
return pgs.podGroupStateData.assignedPods.Clone()
}
// ScheduledPods returns the pods that are either assumed or assigned for this pod group.
func (pgs *podGroupState) ScheduledPods() []*v1.Pod {
pgs.lock.RLock()
defer pgs.lock.RUnlock()
return pgs.podGroupStateData.scheduledPods()
}
// ScheduledPodsCount returns the number of pods for this group that are either assumed or assigned.
func (pgs *podGroupState) ScheduledPodsCount() int {
pgs.lock.RLock()
defer pgs.lock.RUnlock()
return pgs.podGroupStateData.scheduledPodsCount()
}
// podGroupStateSnapshot is an immutable, point-in-time copy of a podGroupState.
// It is taken before a pod group scheduling cycle and used to track states of pods
// during the cycle without modifying the live state of pods.
type podGroupStateSnapshot struct {
podGroupStateData
}
// assumePod marks a pod within the pod group state snapshot as assumed.
func (s *podGroupStateSnapshot) assumePod(podUID types.UID) {
s.podGroupStateData.assumePod(podUID)
}
// forgetPod removes a pod from the assumed state within the snapshot.
func (s *podGroupStateSnapshot) forgetPod(podUID types.UID) {
s.podGroupStateData.forgetPod(podUID)
}
// AllPods returns the UIDs of all pods known to the scheduler for this group.
func (s *podGroupStateSnapshot) AllPods() sets.Set[types.UID] {
return sets.KeySet(s.podGroupStateData.allPods)
}
// UnscheduledPods returns all pods that are unscheduled for this group.
func (s *podGroupStateSnapshot) UnscheduledPods() map[string]*v1.Pod {
return s.podGroupStateData.unscheduledPodsMap()
}
// AssumedPods returns the UIDs of all assumed pods for this group.
func (s *podGroupStateSnapshot) AssumedPods() sets.Set[types.UID] {
return s.podGroupStateData.assumedPods
}
// AssignedPods returns the UIDs of all assigned (bound) pods for this group.
func (s *podGroupStateSnapshot) AssignedPods() sets.Set[types.UID] {
return s.podGroupStateData.assignedPods
}
// ScheduledPods returns the pods that are either assumed or assigned for this pod group.
func (s *podGroupStateSnapshot) ScheduledPods() []*v1.Pod {
return s.podGroupStateData.scheduledPods()
}
// AllPodsCount returns the number of all pods known to the scheduler for this group.
func (s *podGroupStateSnapshot) AllPodsCount() int {
return s.podGroupStateData.allPodsCount()
}
// ScheduledPodsCount returns the number of pods for this group that are either assumed or assigned.
func (s *podGroupStateSnapshot) ScheduledPodsCount() int {
return s.podGroupStateData.scheduledPodsCount()
}

View file

@ -14,14 +14,14 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package podgroupmanager
package cache
import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/utils/ptr"
)
func TestPodGroupState_AssumeForget(t *testing.T) {
@ -36,7 +36,7 @@ func TestPodGroupState_AssumeForget(t *testing.T) {
t.Fatal("Pod should be initially in UnscheduledPods")
}
pgs.AssumePod(pod.UID)
pgs.assumePod(pod.UID)
if !pgs.AssumedPods().Has(pod.UID) {
t.Fatal("Pod should be in AssumedPods after AssumePod")
}
@ -44,7 +44,7 @@ func TestPodGroupState_AssumeForget(t *testing.T) {
t.Fatal("UnscheduledPods should be empty after AssumePod")
}
pgs.ForgetPod(pod.UID)
pgs.forgetPod(pod.UID)
if pgs.AssumedPods().Has(pod.UID) {
t.Fatal("Pod should not be in AssumedPods after ForgetPod")
}
@ -53,41 +53,52 @@ func TestPodGroupState_AssumeForget(t *testing.T) {
}
}
func TestPodGroupState_SchedulingTimeout(t *testing.T) {
func TestPodGroupState_Clone(t *testing.T) {
pgs := newPodGroupState()
timeout := pgs.SchedulingTimeout()
if pgs.schedulingDeadline == nil {
t.Fatal("Scheduling deadline should be set after SchedulingTimeout call, but is nil")
}
if timeout <= 0 {
t.Errorf("Expected positive timeout duration, got %v", timeout)
pod1 := st.MakePod().Namespace("ns1").Name("p1").UID("p1").
PodGroupName("pg").Obj()
pod2 := st.MakePod().Namespace("ns1").Name("p2").UID("p2").
PodGroupName("pg").Obj()
pgs.addPod(pod1)
pgs.addPod(pod2)
pgs.assumePod(pod2.UID)
snap := pgs.snapshot()
// Clone has the same generation.
if snap.generation != pgs.generation {
t.Errorf("expected clone generation %d, got %d", pgs.generation, snap.generation)
}
// Sleep for a while to ensure that the time has increased,
// especially when testing on Windows machines with lower resolution.
time.Sleep(10 * time.Millisecond)
deadline := *pgs.schedulingDeadline
newTimeout := pgs.SchedulingTimeout()
if !deadline.Equal(*pgs.schedulingDeadline) {
t.Errorf("Previous deadline should not be changed: previous: %v, current: %v", deadline, *pgs.schedulingDeadline)
}
if newTimeout >= timeout {
t.Errorf("Expected lower timeout duration: previous: %v, current: %v", timeout, newTimeout)
// Clone contains both pods.
if !snap.AllPods().Has(pod1.UID) || !snap.AllPods().Has(pod2.UID) {
t.Error("expected both pods in clone's AllPods")
}
// Sleep for a while to ensure that the time has increased,
// especially when testing on Windows machines with lower resolution.
time.Sleep(10 * time.Millisecond)
pgs.schedulingDeadline = ptr.To(time.Now().Add(-1 * time.Second))
newTimeout = pgs.SchedulingTimeout()
if deadline.Equal(*pgs.schedulingDeadline) {
t.Error("Deadline should be reset after it has expired, but it wasn't")
// Clone preserves pod1 as unscheduled.
if _, ok := snap.UnscheduledPods()[pod1.Name]; !ok {
t.Error("expected pod1 in clone's UnscheduledPods")
}
if newTimeout <= 0 {
t.Errorf("Expected positive timeout duration after reset, got %v", timeout)
// Clone preserves pod2 as assumed.
if !snap.AssumedPods().Has(pod2.UID) {
t.Error("expected pod2 in clone's AssumedPods")
}
// Mutating the clone does not affect the original.
snap.assumePod(pod1.UID)
if pgs.assumedPods.Has(pod1.UID) {
t.Error("mutation to clone should not affect original's assumedPods")
}
// Mutating the original does not affect the clone.
pod3 := st.MakePod().Namespace("ns1").Name("p3").UID("p3").
PodGroupName("pg").Obj()
pgs.addPod(pod3)
if snap.AllPods().Has(pod3.UID) {
t.Error("mutation to original should not affect clone's AllPods")
}
}
@ -121,7 +132,7 @@ func TestPodGroupState_PodCounts(t *testing.T) {
}
// Assuming a pod should move it from unscheduled to assumed, increasing the count of scheduled pods.
pgs.AssumePod(pod1.UID)
pgs.assumePod(pod1.UID)
if count := pgs.AllPodsCount(); count != 3 {
t.Errorf("Expected AllPodsCount to be 3, got %d", count)
}
@ -130,7 +141,7 @@ func TestPodGroupState_PodCounts(t *testing.T) {
}
// Assuming a pod that is already scheduled should not change the counts.
pgs.AssumePod(pod3.UID)
pgs.assumePod(pod3.UID)
if count := pgs.AllPodsCount(); count != 3 {
t.Errorf("Expected AllPodsCount to be 3, got %d", count)
}
@ -139,7 +150,7 @@ func TestPodGroupState_PodCounts(t *testing.T) {
}
// Assuming a pod that is not in the state should not change the counts.
pgs.AssumePod(pod4.UID)
pgs.assumePod(pod4.UID)
if count := pgs.AllPodsCount(); count != 3 {
t.Errorf("Expected AllPodsCount to be 3, got %d", count)
}
@ -148,7 +159,7 @@ func TestPodGroupState_PodCounts(t *testing.T) {
}
// Forgetting a pod that is already scheduled should not change the counts.
pgs.ForgetPod(pod3.UID)
pgs.forgetPod(pod3.UID)
if count := pgs.AllPodsCount(); count != 3 {
t.Errorf("Expected AllPodsCount to be 3, got %d", count)
}
@ -158,7 +169,7 @@ func TestPodGroupState_PodCounts(t *testing.T) {
// Forgetting a pod that is in the assumed state should move it back to unscheduled,
// decreasing the count of scheduled pods.
pgs.ForgetPod(pod1.UID)
pgs.forgetPod(pod1.UID)
if count := pgs.AllPodsCount(); count != 3 {
t.Errorf("Expected AllPodsCount to be 3, got %d", count)
}
@ -167,7 +178,7 @@ func TestPodGroupState_PodCounts(t *testing.T) {
}
// Forgetting a pod that is not assumed should not change the counts.
pgs.ForgetPod(pod1.UID)
pgs.forgetPod(pod1.UID)
if count := pgs.AllPodsCount(); count != 3 {
t.Errorf("Expected AllPodsCount to be 3, got %d", count)
}
@ -176,7 +187,7 @@ func TestPodGroupState_PodCounts(t *testing.T) {
}
// Assuming a pod again should move it back to assumed, increasing the count of scheduled pods.
pgs.AssumePod(pod2.UID)
pgs.assumePod(pod2.UID)
if count := pgs.AllPodsCount(); count != 3 {
t.Errorf("Expected AllPodsCount to be 3, got %d", count)
}
@ -185,7 +196,7 @@ func TestPodGroupState_PodCounts(t *testing.T) {
}
// Forgetting a pod that is not in the state should not change the counts.
pgs.ForgetPod(pod4.UID)
pgs.forgetPod(pod4.UID)
if count := pgs.AllPodsCount(); count != 3 {
t.Errorf("Expected AllPodsCount to be 3, got %d", count)
}
@ -193,3 +204,37 @@ func TestPodGroupState_PodCounts(t *testing.T) {
t.Errorf("Expected ScheduledPodsCount to be 2, got %d", count)
}
}
// TestPodGroupState_ScheduledPods tests that ScheduledPods returns pods that
// are currently either assumed or assigned altogether.
func TestPodGroupState_ScheduledPods(t *testing.T) {
pgs := newPodGroupState()
unscheduledPod := st.MakePod().Namespace("ns").Name("p1").UID("p1").
PodGroupName("pg").Obj()
assumedPod := st.MakePod().Namespace("ns").Name("p2").UID("p2").
PodGroupName("pg").Obj()
assignedPod := st.MakePod().Namespace("ns").Name("p3").UID("p3").Node("node1").
PodGroupName("pg").Obj()
pgs.addPod(assignedPod)
pgs.addPod(unscheduledPod)
pgs.addPod(assumedPod)
pgs.assumePod(assumedPod.UID)
scheduledPods := pgs.ScheduledPods()
snapshot := pgs.snapshot()
pgs.assumePod(unscheduledPod.UID)
snapshotScheduledPods := snapshot.ScheduledPods()
expectedScheduledPods := []*v1.Pod{assignedPod, assumedPod}
if diff := cmp.Diff(expectedScheduledPods, scheduledPods); diff != "" {
t.Errorf("unexpected ScheduledPods result (-want,+got):\n%s", diff)
}
if diff := cmp.Diff(expectedScheduledPods, snapshotScheduledPods); diff != "" {
t.Errorf("unexpected snapshot ScheduledPods result (-want,+got):\n%s", diff)
}
}

View file

@ -22,8 +22,10 @@ import (
v1 "k8s.io/api/core/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
fwk "k8s.io/kube-scheduler/framework"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
@ -57,12 +59,15 @@ type Snapshot struct {
// assumedPods maps a pod key to an assumed pod object during a single pod group scheduling cycle.
// This map should be emptied before the next cycle starts.
assumedPods map[string]*v1.Pod
// podGroupStates maps a pod group key to a snapshot of its state, used during a pod group scheduling cycle.
podGroupStates map[podGroupKey]*podGroupStateSnapshot
// placementNodes stores nodes that are present in the current placement.
// If placement is not set, this is nil.
// It should only be set in the pod group scheduling cycle, when checking if pod group can be scheduled within the placement.
// This field should be cleared once the pod group has been checked for the placement.
placementNodes *placementNodes
// genericWorkloadEnabled stores the GenericWorkload feature gate value.
genericWorkloadEnabled bool
}
var _ fwk.SharedLister = &Snapshot{}
@ -70,9 +75,11 @@ var _ fwk.SharedLister = &Snapshot{}
// NewEmptySnapshot initializes a Snapshot struct and returns it.
func NewEmptySnapshot() *Snapshot {
return &Snapshot{
nodeInfoMap: make(map[string]*framework.NodeInfo),
usedPVCSet: sets.New[string](),
assumedPods: make(map[string]*v1.Pod),
nodeInfoMap: make(map[string]*framework.NodeInfo),
usedPVCSet: sets.New[string](),
assumedPods: make(map[string]*v1.Pod),
podGroupStates: make(map[podGroupKey]*podGroupStateSnapshot),
genericWorkloadEnabled: utilfeature.DefaultFeatureGate.Enabled(features.GenericWorkload),
}
}
@ -98,10 +105,31 @@ func NewSnapshot(pods []*v1.Pod, nodes []*v1.Node) *Snapshot {
s.havePodsWithAffinityNodeInfoList = havePodsWithAffinityNodeInfoList
s.havePodsWithRequiredAntiAffinityNodeInfoList = havePodsWithRequiredAntiAffinityNodeInfoList
s.usedPVCSet = createUsedPVCSet(pods)
if s.genericWorkloadEnabled {
s.podGroupStates = createPodGroupStates(pods)
}
return s
}
// createPodGroupStates builds the initial pod group state snapshot map from a list of pods.
func createPodGroupStates(pods []*v1.Pod) map[podGroupKey]*podGroupStateSnapshot {
podGroupStates := make(map[podGroupKey]*podGroupStateSnapshot)
for _, pod := range pods {
if pod.Spec.SchedulingGroup == nil {
continue
}
key := newPodGroupKey(pod.Namespace, *pod.Spec.SchedulingGroup.PodGroupName)
pgs, ok := podGroupStates[key]
if !ok {
pgs = &podGroupStateSnapshot{podGroupStateData: newPodGroupStateData()}
podGroupStates[key] = pgs
}
pgs.addPod(pod)
}
return podGroupStates
}
// createNodeInfoMap obtains a list of pods and pivots that list into a map
// where the keys are node names and the values are the aggregated information
// for that node.
@ -188,6 +216,27 @@ func (s *Snapshot) StorageInfos() fwk.StorageInfoLister {
return s
}
// PodGroupStates returns a PodGroupStateLister.
func (s *Snapshot) PodGroupStates() fwk.PodGroupStateLister {
return &podGroupStateSnapshotLister{podGroupStates: s.podGroupStates}
}
var _ fwk.PodGroupStateLister = &podGroupStateSnapshotLister{}
type podGroupStateSnapshotLister struct {
podGroupStates map[podGroupKey]*podGroupStateSnapshot
}
// Get returns the pod group state from the snapshot for the given pod group.
func (l *podGroupStateSnapshotLister) Get(namespace string, podGroupName string) (fwk.PodGroupState, error) {
key := newPodGroupKey(namespace, podGroupName)
state, ok := l.podGroupStates[key]
if !ok {
return nil, fmt.Errorf("pod group state not found for pod group %s", key)
}
return state, nil
}
// NumNodesInPlacement returns the number of nodes in the snapshot for the current placement.
// If no placement is set, it returns the number of nodes in the snapshot.
// This function is not thread safe so it should be executed when no other routines can write to the snapshot.
@ -247,6 +296,14 @@ func (s *Snapshot) AssumePod(podInfo *framework.PodInfo) error {
nodeInfo.AddPodInfo(podInfo)
nodeInfo.Generation = oldGeneration
s.assumedPods[key] = pod
// Update the pod group state in the snapshot if the pod belongs to a pod group.
if !s.genericWorkloadEnabled || pod.Spec.SchedulingGroup == nil {
return nil
}
pgKey := newPodGroupKey(pod.Namespace, *pod.Spec.SchedulingGroup.PodGroupName)
if pgs, ok := s.podGroupStates[pgKey]; ok {
pgs.assumePod(pod.UID)
}
return nil
}
@ -277,6 +334,14 @@ func (s *Snapshot) ForgetPod(logger klog.Logger, pod *v1.Pod) error {
delete(s.nodeInfoMap, nodeName)
}
}
// Update the pod group state in the snapshot if the pod belongs to a pod group.
if !s.genericWorkloadEnabled || pod.Spec.SchedulingGroup == nil {
return nil
}
pgKey := newPodGroupKey(assumedPod.Namespace, *assumedPod.Spec.SchedulingGroup.PodGroupName)
if pgs, ok := s.podGroupStates[pgKey]; ok {
pgs.forgetPod(assumedPod.UID)
}
return nil
}

View file

@ -1,134 +0,0 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podgroupmanager
import (
"fmt"
"sync"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
fwk "k8s.io/kube-scheduler/framework"
)
// PodGroupManager is the central source of truth for the state of pods belonging to PodGroup objects.
// It is designed to be driven explicitly by the scheduler's event handlers to ensure thread safety
// and avoid race conditions with the main scheduling queue.
// Note: The current implementation assumes that pod.Spec.SchedulingGroup is immutable.
// Allowing mutability would require changes to the manager, e.g., by properly handling pod updates.
type PodGroupManager interface {
fwk.PodGroupManager
// AddPod is called by the scheduler when a Pod/Add event is observed.
AddPod(pod *v1.Pod)
// UpdatePod is called by the scheduler when a Pod/Update event is observed.
UpdatePod(oldPod, newPod *v1.Pod)
// DeletePod is called by the scheduler when a Pod/Delete event is observed.
DeletePod(pod *v1.Pod)
}
// podGroupManager is the concrete implementation of the PodGroupManager.
type podGroupManager struct {
lock sync.RWMutex
// podGroupStates stores the runtime state for each known pod group.
podGroupStates map[podGroupKey]*podGroupState
logger klog.Logger
}
// New initializes a new pod group manager and returns it.
func New(logger klog.Logger) *podGroupManager {
return &podGroupManager{
podGroupStates: make(map[podGroupKey]*podGroupState),
logger: logger,
}
}
// AddPod adds a pod to the pod group manager if it has a scheduling group.
// Pod is added to the available pods set for its corresponding pod group.
func (pgm *podGroupManager) AddPod(pod *v1.Pod) {
if pod.Spec.SchedulingGroup == nil {
return
}
pgm.lock.Lock()
defer pgm.lock.Unlock()
key := newPodGroupKey(pod.Namespace, pod.Spec.SchedulingGroup)
state, ok := pgm.podGroupStates[key]
if !ok {
state = newPodGroupState()
pgm.podGroupStates[key] = state
}
state.addPod(pod)
}
// UpdatePod updates a pod in the pod group manager if it has a scheduling group.
// Note: The current implementation assumes that newPod.Spec.SchedulingGroup is immutable.
func (pgm *podGroupManager) UpdatePod(oldPod, newPod *v1.Pod) {
if newPod.Spec.SchedulingGroup == nil {
return
}
pgm.lock.Lock()
defer pgm.lock.Unlock()
key := newPodGroupKey(newPod.Namespace, newPod.Spec.SchedulingGroup)
state, ok := pgm.podGroupStates[key]
if !ok {
// Shouldn't happen, but handling this case gracefully.
state = newPodGroupState()
pgm.podGroupStates[key] = state
state.addPod(newPod)
pgm.logger.Error(nil, "UpdatePod found no existing PodGroup for pod. Created new PodGroup for the pod", "pod", klog.KObj(newPod), "podGroupKey", klog.KObj(key))
return
}
state.updatePod(oldPod, newPod)
}
// DeletePod removes a pod from the pod group manager if it has a scheduling group.
// Pod is removed from the pods sets for its corresponding pod group.
func (pgm *podGroupManager) DeletePod(pod *v1.Pod) {
if pod.Spec.SchedulingGroup == nil {
return
}
pgm.lock.Lock()
defer pgm.lock.Unlock()
key := newPodGroupKey(pod.Namespace, pod.Spec.SchedulingGroup)
state, ok := pgm.podGroupStates[key]
if !ok {
// The pod group may have already been cleaned up, or the pod was never added.
return
}
state.deletePod(pod.UID)
// Clean up the map entry if no pods are left in the group.
if state.empty() {
delete(pgm.podGroupStates, key)
}
}
// PodGroupState returns the runtime state of a pod group.
func (pgm *podGroupManager) PodGroupState(namespace string, schedulingGroup *v1.PodSchedulingGroup) (fwk.PodGroupState, error) {
pgm.lock.RLock()
defer pgm.lock.RUnlock()
state, ok := pgm.podGroupStates[newPodGroupKey(namespace, schedulingGroup)]
if !ok {
return nil, fmt.Errorf("internal pod group state doesn't exist for a pod's scheduling group")
}
return state, nil
}

View file

@ -1,280 +0,0 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podgroupmanager
import (
"testing"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2/ktesting"
st "k8s.io/kubernetes/pkg/scheduler/testing"
)
func TestPodGroupManager_AddPod(t *testing.T) {
p1 := st.MakePod().Namespace("ns1").Name("p1").UID("p1").PodGroupName("pg1").Obj()
// Assigned
p2 := st.MakePod().Namespace("ns1").Name("p2").UID("p2").Node("node1").PodGroupName("pg1").Obj()
// Different ns
p3 := st.MakePod().Namespace("ns2").Name("p3").UID("p3").PodGroupName("pg1").Obj()
nonPodGroupPod := st.MakePod().Namespace("ns1").Name("non-podgroup").Obj()
tests := []struct {
name string
initPods []*v1.Pod
podToAdd *v1.Pod
expectedPodGroups int
expectInAllPods bool
expectInUnscheduledPods bool
expectInAssumedPods bool
expectInAssignedPods bool
}{
{
name: "adding an unscheduled pod",
podToAdd: p1,
expectedPodGroups: 1,
expectInAllPods: true,
expectInUnscheduledPods: true,
},
{
name: "adding an assigned pod",
podToAdd: p2,
expectedPodGroups: 1,
expectInAllPods: true,
expectInAssignedPods: true,
},
{
name: "adding pod with different namespace",
initPods: []*v1.Pod{p1},
podToAdd: p3,
expectedPodGroups: 2,
expectInAllPods: true,
expectInUnscheduledPods: true,
},
{
name: "adding a non-podgroup pod is a no-op",
podToAdd: nonPodGroupPod,
expectedPodGroups: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
manager := New(logger)
for _, p := range tt.initPods {
manager.AddPod(p)
}
manager.AddPod(tt.podToAdd)
gotPodGroups := len(manager.podGroupStates)
if gotPodGroups != tt.expectedPodGroups {
t.Fatalf("Expected %v pod group(s), got %v", tt.expectedPodGroups, gotPodGroups)
}
if gotPodGroups == 0 {
return
}
state, err := manager.PodGroupState(tt.podToAdd.Namespace, tt.podToAdd.Spec.SchedulingGroup)
if err != nil {
t.Fatalf("Unexpected error getting pod group state: %v", err)
}
if inAll := state.AllPods().Has(tt.podToAdd.UID); inAll != tt.expectInAllPods {
t.Errorf("Unexpected AllPods state, want: %v, got: %v", tt.expectInAllPods, inAll)
}
if inAssumed := state.AssumedPods().Has(tt.podToAdd.UID); inAssumed != tt.expectInAssumedPods {
t.Errorf("Unexpected AssumedPods state, want: %v, got: %v", tt.expectInAssumedPods, inAssumed)
}
if inAssigned := state.AssignedPods().Has(tt.podToAdd.UID); inAssigned != tt.expectInAssignedPods {
t.Errorf("Unexpected AssignedPods state, want: %v, got: %v", tt.expectInAssignedPods, inAssigned)
}
})
}
}
func TestPodGroupManager_UpdatePod(t *testing.T) {
pod := st.MakePod().Namespace("ns1").Name("p1").UID("p1").
PodGroupName("pg1").Obj()
updatedPod := st.MakePod().Namespace("ns1").Name("p1").UID("p1").Labels(map[string]string{"foo": "bar"}).
PodGroupName("pg1").Obj()
assignedPod := st.MakePod().Namespace("ns1").Name("p2").UID("p2").Node("node1").
PodGroupName("pg1").Obj()
updatedAssignedPod := st.MakePod().Namespace("ns1").Name("p2").UID("p2").Node("node1").Labels(map[string]string{"foo": "bar"}).
PodGroupName("pg1").Obj()
nonPodGroup := st.MakePod().Namespace("ns1").Name("non-podgroup").Obj()
updatedNonPodGroupPod := st.MakePod().Namespace("ns1").Name("non-podgroup").Labels(map[string]string{"foo": "bar"}).Obj()
tests := []struct {
name string
assumePod bool
oldPod *v1.Pod
newPod *v1.Pod
expectInAllPods bool
expectInUnscheduledPods bool
expectInAssumedPods bool
expectInAssignedPods bool
}{
{
name: "updating an unscheduled pod",
oldPod: pod,
newPod: updatedPod,
expectInAllPods: true,
expectInUnscheduledPods: true,
},
{
name: "updating an assumed pod",
assumePod: true,
oldPod: pod,
newPod: updatedPod,
expectInAllPods: true,
expectInAssumedPods: true,
},
{
name: "updating an assigned pod",
oldPod: assignedPod,
newPod: updatedAssignedPod,
expectInAllPods: true,
expectInAssignedPods: true,
},
{
name: "binding an unscheduled pod",
oldPod: pod,
newPod: assignedPod,
expectInAllPods: true,
expectInAssignedPods: true,
},
{
name: "binding an assumed pod",
assumePod: true,
oldPod: pod,
newPod: assignedPod,
expectInAllPods: true,
expectInAssignedPods: true,
},
{
name: "updating a non-podgroup pod is a no-op",
oldPod: nonPodGroup,
newPod: updatedNonPodGroupPod,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
manager := New(logger)
manager.AddPod(tt.oldPod)
if tt.assumePod {
state, err := manager.PodGroupState(tt.oldPod.Namespace, tt.oldPod.Spec.SchedulingGroup)
if err != nil {
t.Fatalf("Unexpected error getting pod group state: %v", err)
}
state.AssumePod(tt.oldPod.UID)
}
manager.UpdatePod(tt.oldPod, tt.newPod)
gotPodGroups := len(manager.podGroupStates)
if gotPodGroups == 0 {
if tt.expectInAllPods {
t.Fatalf("Expected pod group, but got none")
}
return
}
if !tt.expectInAllPods {
t.Fatalf("Expected no pod groups, but got %v", gotPodGroups)
}
state, err := manager.PodGroupState(tt.newPod.Namespace, tt.newPod.Spec.SchedulingGroup)
if err != nil {
t.Fatalf("Unexpected error getting pod group state: %v", err)
}
if inAll := state.AllPods().Has(tt.newPod.UID); inAll != tt.expectInAllPods {
t.Errorf("Unexpected AllPods state, want: %v, got: %v", tt.expectInAllPods, inAll)
}
if inAssumed := state.AssumedPods().Has(tt.newPod.UID); inAssumed != tt.expectInAssumedPods {
t.Errorf("Unexpected AssumedPods state, want: %v, got: %v", tt.expectInAssumedPods, inAssumed)
}
if inAssigned := state.AssignedPods().Has(tt.newPod.UID); inAssigned != tt.expectInAssignedPods {
t.Errorf("Unexpected AssignedPods state, want: %v, got: %v", tt.expectInAssignedPods, inAssigned)
}
})
}
}
func TestPodGroupManager_DeletePod(t *testing.T) {
p1 := st.MakePod().Namespace("ns1").Name("p1").UID("p1").PodGroupName("pg1").Obj()
p2 := st.MakePod().Namespace("ns1").Name("p2").UID("p2").PodGroupName("pg1").Obj()
tests := []struct {
name string
initPods []*v1.Pod
podToDelete *v1.Pod
expectedPodGroups int
}{
{
name: "deleting a pod from a group with multiple pods",
initPods: []*v1.Pod{p1, p2},
podToDelete: p1,
expectedPodGroups: 1,
},
{
name: "deleting the last pod cleans up the state",
initPods: []*v1.Pod{p1},
podToDelete: p1,
},
{
name: "deleting a non-existent pod is a no-op",
podToDelete: p1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
manager := New(logger)
for _, p := range tt.initPods {
manager.AddPod(p)
}
manager.DeletePod(tt.podToDelete)
gotPodGroups := len(manager.podGroupStates)
if gotPodGroups != tt.expectedPodGroups {
t.Fatalf("Expected %v pod group(s), got %v", tt.expectedPodGroups, gotPodGroups)
}
if gotPodGroups == 0 {
return
}
state, err := manager.PodGroupState(tt.podToDelete.Namespace, tt.podToDelete.Spec.SchedulingGroup)
if err != nil {
t.Fatalf("Unexpected error getting pod group state: %v", err)
}
if state.AllPodsCount() == 0 {
t.Errorf("Expected AllPods to be non-empty")
}
if state.AllPods().Has(p1.UID) {
t.Errorf("Expected pod to be deleted")
}
})
}
}

View file

@ -1,248 +0,0 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podgroupmanager
import (
"sync"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
)
// DefaultSchedulingTimeoutDuration defines how long the gang pods should wait at the
// Permit stage for a quorum before being rejected.
// Variable is exported only for testing purposes.
var DefaultSchedulingTimeoutDuration = 5 * time.Minute
// podGroupKey uniquely identifies a specific instance of a PodGroup.
type podGroupKey struct {
name string
namespace string
}
func (pgk podGroupKey) GetName() string {
return pgk.name
}
func (pgk podGroupKey) GetNamespace() string {
return pgk.namespace
}
var _ klog.KMetadata = &podGroupKey{}
func newPodGroupKey(namespace string, schedulingGroup *v1.PodSchedulingGroup) podGroupKey {
return podGroupKey{
name: *schedulingGroup.PodGroupName,
namespace: namespace,
}
}
// podGroupState holds the runtime state of a pod group.
type podGroupState struct {
lock sync.RWMutex
// allPods tracks all pods belonging to the group that are known to the scheduler.
allPods map[types.UID]*v1.Pod
// unscheduledPods tracks all pods that are unscheduled for this group,
// i.e., are neither assumed nor scheduled.
unscheduledPods sets.Set[types.UID]
// assumedPods tracks pods that have reached the Reserve stage and are waiting
// for the rest of the gang to arrive before being allowed to bind.
assumedPods sets.Set[types.UID]
// assignedPods tracks all pods belonging to the group that are assigned (bound).
assignedPods sets.Set[types.UID]
// schedulingDeadline stores the time at which the gang will time out.
// It is initialized when the first pod from the group enters the Permit stage.
schedulingDeadline *time.Time
}
func newPodGroupState() *podGroupState {
return &podGroupState{
allPods: make(map[types.UID]*v1.Pod),
unscheduledPods: sets.New[types.UID](),
assumedPods: sets.New[types.UID](),
assignedPods: sets.New[types.UID](),
}
}
// addPod adds the pod to this group.
// Depending on the NodeName, it can insert the pod to assignedPods set.
func (pgs *podGroupState) addPod(pod *v1.Pod) {
pgs.lock.Lock()
defer pgs.lock.Unlock()
pgs.allPods[pod.UID] = pod
if pod.Spec.NodeName != "" {
pgs.assignedPods.Insert(pod.UID)
} else {
pgs.unscheduledPods.Insert(pod.UID)
}
}
// updatePod updates the pod in this group.
// In case of binding, it moves the pod to assignedPods.
func (pgs *podGroupState) updatePod(oldPod, newPod *v1.Pod) {
pgs.lock.Lock()
defer pgs.lock.Unlock()
pgs.allPods[newPod.UID] = newPod
if oldPod.Spec.NodeName == "" && newPod.Spec.NodeName != "" {
pgs.assignedPods.Insert(newPod.UID)
// Clear pod from unscheduled and assumed when it is assigned.
pgs.unscheduledPods.Delete(newPod.UID)
pgs.assumedPods.Delete(newPod.UID)
}
}
// deletePod completely deletes the pod from this group.
func (pgs *podGroupState) deletePod(podUID types.UID) {
pgs.lock.Lock()
defer pgs.lock.Unlock()
delete(pgs.allPods, podUID)
pgs.unscheduledPods.Delete(podUID)
pgs.assumedPods.Delete(podUID)
pgs.assignedPods.Delete(podUID)
}
// empty returns true when the group is empty.
func (pgs *podGroupState) empty() bool {
pgs.lock.RLock()
defer pgs.lock.RUnlock()
return len(pgs.allPods) == 0
}
// AllPods returns the UIDs of all pods known to the scheduler for this group.
func (pgs *podGroupState) AllPods() sets.Set[types.UID] {
pgs.lock.RLock()
defer pgs.lock.RUnlock()
return sets.KeySet(pgs.allPods)
}
// AllPodsCount returns the number of all pods known to the scheduler for this group.
func (pgs *podGroupState) AllPodsCount() int {
pgs.lock.RLock()
defer pgs.lock.RUnlock()
return len(pgs.allPods)
}
// UnscheduledPods returns all pods that are unscheduled for this group,
// i.e., are neither assumed nor assigned.
// The returned map type corresponds to the argument of the PodActivator.Activate method.
func (pgs *podGroupState) UnscheduledPods() map[string]*v1.Pod {
pgs.lock.RLock()
defer pgs.lock.RUnlock()
unscheduledPods := make(map[string]*v1.Pod, len(pgs.unscheduledPods))
for podUID := range pgs.unscheduledPods {
pod := pgs.allPods[podUID]
unscheduledPods[pod.Name] = pod
}
return unscheduledPods
}
// AssumedPods returns the UIDs of all pods for this group in the assumed state,
// i.e., passed the Reserve gate.
func (pgs *podGroupState) AssumedPods() sets.Set[types.UID] {
pgs.lock.RLock()
defer pgs.lock.RUnlock()
return pgs.assumedPods.Clone()
}
// AssignedPods returns the UIDs of all pods already assigned (bound) for this group.
func (pgs *podGroupState) AssignedPods() sets.Set[types.UID] {
pgs.lock.RLock()
defer pgs.lock.RUnlock()
return pgs.assignedPods.Clone()
}
// ScheduledPodsCount returns the number of pods for this group that are either assumed or assigned.
func (pgs *podGroupState) ScheduledPodsCount() int {
pgs.lock.RLock()
defer pgs.lock.RUnlock()
return len(pgs.assumedPods) + len(pgs.assignedPods)
}
// SchedulingTimeout returns the remaining time until the pod group scheduling times out.
// A new deadline is created if one doesn't exist, or if the previous one has expired.
func (pgs *podGroupState) SchedulingTimeout() time.Duration {
pgs.lock.Lock()
defer pgs.lock.Unlock()
now := time.Now()
// A new deadline is set if one doesn't exist, or if the old one has passed.
// This allows a new attempt to form a gang after a previous attempt timed out.
if pgs.schedulingDeadline == nil || pgs.schedulingDeadline.Before(now) {
pgs.schedulingDeadline = ptr.To(now.Add(DefaultSchedulingTimeoutDuration))
}
return pgs.schedulingDeadline.Sub(now)
}
// AssumePod marks a pod as having reached the Reserve stage.
func (pgs *podGroupState) AssumePod(podUID types.UID) {
pgs.lock.Lock()
defer pgs.lock.Unlock()
pod := pgs.allPods[podUID]
// A scheduling pod may be removed from the cluster.
// In that case, we just ignore it.
if pod == nil {
return
}
// If the pod is already assigned, put it into assignedPods.
// Otherwise put it to assumedPods.
if pod.Spec.NodeName != "" {
pgs.assignedPods.Insert(podUID)
} else {
pgs.assumedPods.Insert(podUID)
}
pgs.unscheduledPods.Delete(podUID)
}
// ForgetPod removes a pod from the assumed state.
func (pgs *podGroupState) ForgetPod(podUID types.UID) {
pgs.lock.Lock()
defer pgs.lock.Unlock()
pod := pgs.allPods[podUID]
// A scheduling pod may be removed from the cluster.
// In that case, we just ignore it.
if pod == nil {
return
}
pgs.assumedPods.Delete(podUID)
// If the pod is already assigned, put it into assignedPods.
// Otherwise, put it into unscheduledPods.
if pod.Spec.NodeName != "" {
pgs.assignedPods.Insert(podUID)
} else {
pgs.unscheduledPods.Insert(podUID)
}
}

View file

@ -133,10 +133,6 @@ func (sched *Scheduler) addPod(obj interface{}) {
return
}
if sched.PodGroupManager != nil {
// Register pod into pod group manager before adding to the cache or scheduling queue.
sched.PodGroupManager.AddPod(pod)
}
if assignedPod(pod) {
sched.addAssignedPodToCache(pod)
} else if responsibleForPod(pod, sched.Profiles) {
@ -157,10 +153,6 @@ func (sched *Scheduler) updatePod(oldObj, newObj interface{}) {
return
}
if sched.PodGroupManager != nil {
// Update pod in pod group manager before updating it in the cache or scheduling queue.
sched.PodGroupManager.UpdatePod(oldPod, newPod)
}
if assignedPod(oldPod) {
sched.updateAssignedPodInCache(oldPod, newPod)
} else if assignedPod(newPod) {
@ -185,10 +177,6 @@ func (sched *Scheduler) deletePod(obj interface{}) {
switch t := obj.(type) {
case *v1.Pod:
pod = t
if sched.PodGroupManager != nil {
// Delete pod from pod group manager before deleting the pod from cache or scheduling queue.
sched.PodGroupManager.DeletePod(pod)
}
if assignedPod(pod) {
sched.deleteAssignedPodFromCache(pod)
} else if responsibleForPod(pod, sched.Profiles) {
@ -204,10 +192,6 @@ func (sched *Scheduler) deletePod(obj interface{}) {
utilruntime.HandleErrorWithLogger(logger, nil, "Cannot convert to *v1.Pod", "obj", t.Obj)
return
}
if sched.PodGroupManager != nil {
// Delete pod from pod group manager before deleting the pod from cache or scheduling queue.
sched.PodGroupManager.DeletePod(pod)
}
// The carried object may be stale, so we don't use it to check if
// it's assigned or not. Attempting to cleanup anyways.
sched.deleteAssignedPodFromCache(pod)
@ -228,6 +212,7 @@ func (sched *Scheduler) addPodToSchedulingQueue(pod *v1.Pod) {
logger := sched.logger
logger.V(3).Info("Add event for unscheduled pod", "pod", klog.KObj(pod))
sched.Cache.AddPodGroupMember(pod)
sched.SchedulingQueue.Add(klog.NewContext(context.Background(), logger), pod)
if utilfeature.DefaultFeatureGate.Enabled(features.GangScheduling) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.EventUnscheduledPodAdd, nil, pod, nil)
@ -313,6 +298,8 @@ func (sched *Scheduler) updatePodInSchedulingQueue(oldPod, newPod *v1.Pod) {
_ = sched.syncPodWithDispatcher(newPod)
}
sched.Cache.UpdatePodGroupMember(logger, oldPod, newPod)
isAssumed, err := sched.Cache.IsAssumedPod(newPod)
if err != nil {
utilruntime.HandleErrorWithLogger(logger, err, "Failed to check whether pod is assumed", "pod", klog.KObj(newPod))
@ -354,6 +341,7 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(pod *v1.Pod, inBinding bool
// once the https://github.com/kubernetes/kubernetes/issues/134859 is fixed.
return
}
sched.Cache.RemovePodGroupMember(pod)
isAssumed, err := sched.Cache.IsAssumedPod(pod)
if err != nil {
utilruntime.HandleErrorWithLogger(logger, err, "Failed to check whether pod is assumed", "pod", klog.KObj(pod))

View file

@ -184,7 +184,7 @@ func TestEventHandlers_MoveToActiveOnNominatedNodeUpdate(t *testing.T) {
// disable backoff queue
internalqueue.WithPodInitialBackoffDuration(0),
internalqueue.WithPodMaxBackoffDuration(0))
schedulerCache := internalcache.New(ctx, nil)
schedulerCache := internalcache.New(ctx, nil, false)
// Put test pods into unschedulable queue
for _, pod := range unschedulablePods {
@ -254,7 +254,7 @@ func TestUpdateAssignedPodInCache(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sched := &Scheduler{
Cache: internalcache.New(ctx, nil),
Cache: internalcache.New(ctx, nil, false),
SchedulingQueue: internalqueue.NewTestQueue(ctx, nil),
logger: logger,
}
@ -795,7 +795,7 @@ func TestAddPod(t *testing.T) {
defer cancel()
sched := &Scheduler{
Cache: internalcache.New(ctx, nil),
Cache: internalcache.New(ctx, nil, false),
SchedulingQueue: internalqueue.NewTestQueue(ctx, nil),
logger: logger,
Profiles: profile.Map{
@ -930,7 +930,7 @@ func TestUpdatePod(t *testing.T) {
t.Fatalf("Failed to create framework: %v", err)
}
sched := &Scheduler{
Cache: internalcache.New(ctx, nil),
Cache: internalcache.New(ctx, nil, false),
SchedulingQueue: internalqueue.NewTestQueue(ctx, nil),
logger: logger,
Profiles: profile.Map{
@ -1049,7 +1049,7 @@ func TestDeletePod(t *testing.T) {
t.Fatalf("Failed to create framework: %v", err)
}
sched := &Scheduler{
Cache: internalcache.New(ctx, nil),
Cache: internalcache.New(ctx, nil, false),
SchedulingQueue: internalqueue.NewTestQueue(ctx, nil),
logger: logger,
Profiles: profile.Map{

View file

@ -332,7 +332,7 @@ func TestSchedulerWithExtenders(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
cache := internalcache.New(ctx, nil)
cache := internalcache.New(ctx, nil, false)
for _, name := range test.nodes {
cache.AddNode(logger, createNode(name))
}

View file

@ -33,6 +33,8 @@ var _ fwk.NodeInfoLister = &nodeInfoListerContract{}
var _ fwk.StorageInfoLister = &storageInfoListerContract{}
var _ fwk.SharedLister = &shareListerContract{}
var _ fwk.ResourceSliceLister = &resourceSliceListerContract{}
var _ fwk.PodGroupStateLister = &podGroupStateListerContract{}
var _ fwk.PodGroupState = &podGroupStateContract{}
var _ fwk.DeviceClassLister = &deviceClassListerContract{}
var _ fwk.ResourceClaimTracker = &resourceClaimTrackerContract{}
var _ fwk.DeviceClassResolver = &deviceClassResolverContract{}
@ -72,6 +74,46 @@ func (c *shareListerContract) StorageInfos() fwk.StorageInfoLister {
return nil
}
func (c *shareListerContract) PodGroupStates() fwk.PodGroupStateLister {
return nil
}
type podGroupStateListerContract struct{}
func (c *podGroupStateListerContract) Get(_ string, _ string) (fwk.PodGroupState, error) {
return nil, nil
}
type podGroupStateContract struct{}
func (c *podGroupStateContract) AllPods() sets.Set[types.UID] {
return nil
}
func (c *podGroupStateContract) UnscheduledPods() map[string]*v1.Pod {
return nil
}
func (c *podGroupStateContract) AssumedPods() sets.Set[types.UID] {
return nil
}
func (c *podGroupStateContract) AssignedPods() sets.Set[types.UID] {
return nil
}
func (c *podGroupStateContract) AllPodsCount() int {
return 0
}
func (c *podGroupStateContract) ScheduledPodsCount() int {
return 0
}
func (c *podGroupStateContract) ScheduledPods() []*v1.Pod {
return nil
}
type resourceSliceListerContract struct{}
func (c *resourceSliceListerContract) ListWithDeviceTaintRules() ([]*resourceapi.ResourceSlice, error) {

View file

@ -39,6 +39,8 @@ type CycleState struct {
// GetParallelPreBindPlugins returns plugins that can be run in parallel with other plugins
// in the PreBind extension point.
parallelPreBindPlugins sets.Set[string]
// isPodGroupSchedulingCycle indicates whether this cycle is a pod group scheduling cycle or not.
isPodGroupSchedulingCycle bool
}
// NewCycleState initializes a new CycleState and returns its pointer.
@ -94,6 +96,14 @@ func (c *CycleState) GetParallelPreBindPlugins() sets.Set[string] {
return c.parallelPreBindPlugins
}
func (c *CycleState) IsPodGroupSchedulingCycle() bool {
return c.isPodGroupSchedulingCycle
}
func (c *CycleState) SetPodGroupSchedulingCycle(isPodGroupSchedulingCycle bool) {
c.isPodGroupSchedulingCycle = isPodGroupSchedulingCycle
}
// Clone creates a copy of CycleState and returns its pointer. Clone returns
// nil if the context being cloned is nil.
func (c *CycleState) Clone() fwk.CycleState {
@ -112,6 +122,7 @@ func (c *CycleState) Clone() fwk.CycleState {
copy.skipScorePlugins = c.skipScorePlugins
copy.skipPreBindPlugins = c.skipPreBindPlugins
copy.parallelPreBindPlugins = c.parallelPreBindPlugins
copy.isPodGroupSchedulingCycle = c.isPodGroupSchedulingCycle
return copy
}

View file

@ -94,7 +94,7 @@ func TestDefaultBinder(t *testing.T) {
t.Fatal(err)
}
if asyncAPICallsEnabled {
cache := internalcache.New(ctx, apiDispatcher)
cache := internalcache.New(ctx, apiDispatcher, false)
fh.SetAPICacher(apicache.New(nil, cache))
}

View file

@ -456,7 +456,7 @@ func TestPostFilter(t *testing.T) {
t.Fatal(err)
}
if asyncAPICallsEnabled {
cache := internalcache.New(ctx, apiDispatcher)
cache := internalcache.New(ctx, apiDispatcher, false)
f.SetAPICacher(apicache.New(nil, cache))
}
@ -2183,7 +2183,7 @@ func TestPreempt(t *testing.T) {
defer apiDispatcher.Close()
}
cache := internalcache.New(ctx, apiDispatcher)
cache := internalcache.New(ctx, apiDispatcher, false)
for _, pod := range testPods {
if err := cache.AddPod(logger, pod.DeepCopy()); err != nil {
t.Fatalf("Failed to add pod %s: %v", pod.Name, err)

View file

@ -25,7 +25,6 @@ import (
schedulingapi "k8s.io/api/scheduling/v1alpha2"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
schedulinglisters "k8s.io/client-go/listers/scheduling/v1alpha2"
"k8s.io/klog/v2"
fwk "k8s.io/kube-scheduler/framework"
@ -38,25 +37,31 @@ import (
const (
// Name is the name of the plugin used in the plugin registry and configurations.
Name = names.GangScheduling
// permitTimeoutDuration defines how long the gang pods should
// wait at the permit stage for a quorum before being rejected.
permitTimeoutDuration = 5 * time.Minute
)
// GangScheduling is a plugin that enforces "all-or-nothing" scheduling for pods
// belonging to a PodGroup with a Gang scheduling policy.
type GangScheduling struct {
handle fwk.Handle
podGroupLister schedulinglisters.PodGroupLister
handle fwk.Handle
podGroupLister schedulinglisters.PodGroupLister
podGroupManager fwk.PodGroupManager
snapshotLister fwk.SharedLister
}
var _ fwk.EnqueueExtensions = &GangScheduling{}
var _ fwk.PreEnqueuePlugin = &GangScheduling{}
var _ fwk.ReservePlugin = &GangScheduling{}
var _ fwk.PermitPlugin = &GangScheduling{}
// New initializes a new plugin and returns it.
func New(_ context.Context, _ runtime.Object, fh fwk.Handle, fts feature.Features) (fwk.Plugin, error) {
return &GangScheduling{
handle: fh,
podGroupLister: fh.SharedInformerFactory().Scheduling().V1alpha2().PodGroups().Lister(),
handle: fh,
podGroupLister: fh.SharedInformerFactory().Scheduling().V1alpha2().PodGroups().Lister(),
podGroupManager: fh.PodGroupManager(),
snapshotLister: fh.SnapshotSharedLister(),
}, nil
}
@ -137,7 +142,7 @@ func (pl *GangScheduling) PreEnqueue(ctx context.Context, pod *v1.Pod) *fwk.Stat
return nil
}
podGroupState, err := pl.handle.PodGroupManager().PodGroupState(namespace, schedulingGroup)
podGroupState, err := pl.podGroupManager.PodGroupStates().Get(namespace, *schedulingGroup.PodGroupName)
if err != nil {
return fwk.AsStatus(err)
}
@ -150,35 +155,6 @@ func (pl *GangScheduling) PreEnqueue(ctx context.Context, pod *v1.Pod) *fwk.Stat
return nil
}
// Reserve is called after a node has been selected for the pod. For gang pods,
// this stage marks the pod as "assumed" in the PodGroupManager,
// contributing to the count of pods ready to be co-scheduled at the Permit stage.
func (pl *GangScheduling) Reserve(ctx context.Context, cs fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status {
if pod.Spec.SchedulingGroup == nil {
return nil
}
podGroupState, err := pl.handle.PodGroupManager().PodGroupState(pod.Namespace, pod.Spec.SchedulingGroup)
if err != nil {
return fwk.AsStatus(err)
}
podGroupState.AssumePod(pod.UID)
return nil
}
// Unreserve removes the gang pod from the "assumed" state in the PodGroupManager,
// ensuring it doesn't count towards the Permit quorum.
func (pl *GangScheduling) Unreserve(ctx context.Context, cs fwk.CycleState, pod *v1.Pod, nodeName string) {
if pod.Spec.SchedulingGroup == nil {
return
}
podGroupState, err := pl.handle.PodGroupManager().PodGroupState(pod.Namespace, pod.Spec.SchedulingGroup)
if err != nil {
utilruntime.HandleErrorWithContext(ctx, err, "Failed to get pod group state", "pod", klog.KObj(pod), "schedulingGroup", pod.Spec.SchedulingGroup)
return
}
podGroupState.ForgetPod(pod.UID)
}
// Permit forces all pods in a gang to wait at this stage. Once the number of waiting (assumed) pods
// reaches the gang's MinCount, all pods in the gang are permitted to proceed to binding simultaneously.
func (pl *GangScheduling) Permit(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (*fwk.Status, time.Duration) {
@ -203,7 +179,16 @@ func (pl *GangScheduling) Permit(ctx context.Context, state fwk.CycleState, pod
return nil, 0
}
podGroupState, err := pl.handle.PodGroupManager().PodGroupState(namespace, schedulingGroup)
// Select a lister for the pod group state based on the currently executed scheduling phase.
// In the pod group scheduling cycle, it reads from the snapshot.
// Otherwise, it reads the runtime state of the pod group from the cache.
var podGroupStateLister fwk.PodGroupStateLister
if state.IsPodGroupSchedulingCycle() {
podGroupStateLister = pl.snapshotLister.PodGroupStates()
} else {
podGroupStateLister = pl.podGroupManager.PodGroupStates()
}
podGroupState, err := podGroupStateLister.Get(namespace, *schedulingGroup.PodGroupName)
if err != nil {
return fwk.AsStatus(err), 0
}
@ -213,7 +198,7 @@ func (pl *GangScheduling) Permit(ctx context.Context, state fwk.CycleState, pod
unscheduledPods := podGroupState.UnscheduledPods()
pl.handle.Activate(klog.FromContext(ctx), unscheduledPods)
logger.V(4).Info("Quorum is not met for a gang. Waiting for another pod to allow", "pod", klog.KObj(pod), "schedulingGroup", schedulingGroup, "activatedPods", len(unscheduledPods))
return fwk.NewStatus(fwk.Wait, "waiting for minCount pods from a gang to be scheduled"), podGroupState.SchedulingTimeout()
return fwk.NewStatus(fwk.Wait, "waiting for minCount pods from a gang to be scheduled"), permitTimeoutDuration
}
assumedPods := podGroupState.AssumedPods()

View file

@ -24,17 +24,27 @@ import (
v1 "k8s.io/api/core/v1"
schedulingapi "k8s.io/api/scheduling/v1alpha2"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
fwk "k8s.io/kube-scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/backend/podgroupmanager"
"k8s.io/kubernetes/pkg/features"
internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
"k8s.io/kubernetes/pkg/scheduler/metrics"
st "k8s.io/kubernetes/pkg/scheduler/testing"
)
func init() {
// This is required for tests where cache is initialized, and cache attempts to update metrics.
metrics.Register()
}
func Test_isSchedulableAfterPodAdded(t *testing.T) {
tests := []struct {
name string
@ -161,7 +171,6 @@ func TestGangSchedulingFlow(t *testing.T) {
p3 := st.MakePod().Namespace("ns1").Name("p3").UID("p3").PodGroupName("pg1").Obj()
p4 := st.MakePod().Namespace("ns1").Name("p4").UID("p4").PodGroupName("pg2").Obj()
p5 := st.MakePod().Namespace("ns1").Name("p5").UID("p5").PodGroupName("pg2").Obj()
basicPolicyPod := st.MakePod().Namespace("ns1").Name("basic-pod").UID("basic-pod").PodGroupName("pg3").Obj()
@ -169,15 +178,16 @@ func TestGangSchedulingFlow(t *testing.T) {
nonGangPod := st.MakePod().Namespace("ns1").Name("non-gang").UID("non-gang").Obj()
tests := []struct {
name string
pod *v1.Pod
initialPods []*v1.Pod
initialPodGroups []*schedulingapi.PodGroup
podsWaitingOnPermit []*v1.Pod
wantPreEnqueueStatus *fwk.Status
wantPermitStatus *fwk.Status
wantActivatedPods []*v1.Pod
wantAllowedPods []types.UID
name string
pod *v1.Pod
initialPods []*v1.Pod
initialPodGroups []*schedulingapi.PodGroup
podsWaitingOnPermit []*v1.Pod
isDuringPodGroupSchedulingCycle bool
wantPreEnqueueStatus *fwk.Status
wantPermitStatus *fwk.Status
wantActivatedPods []*v1.Pod
wantAllowedPods []types.UID
}{
{
name: "non-gang pod succeeds immediately",
@ -228,23 +238,35 @@ func TestGangSchedulingFlow(t *testing.T) {
wantPermitStatus: nil,
wantAllowedPods: []types.UID{"p1", "p2", "p3"},
},
{
name: "final gang pod arrives at Permit during pod group scheduling cycle",
pod: p1,
initialPods: []*v1.Pod{p2, p3, p4, p5},
initialPodGroups: []*schedulingapi.PodGroup{gangPodGroup1, gangPodGroup2},
podsWaitingOnPermit: []*v1.Pod{p2, p3, p4, p5},
isDuringPodGroupSchedulingCycle: true,
wantPreEnqueueStatus: nil,
wantPermitStatus: nil,
wantAllowedPods: []types.UID{"p1", "p2", "p3"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.GenericWorkload, true)
logger, ctx := ktesting.NewTestContext(t)
manager := podgroupmanager.New(logger)
cache := internalcache.New(ctx, nil, true)
informerFactory := informers.NewSharedInformerFactory(fake.NewClientset(), 0)
podGroupInformer := informerFactory.Scheduling().V1alpha2().PodGroups()
fakeActivator := &podActivatorMock{}
snapshot := internalcache.NewEmptySnapshot()
fh, err := frameworkruntime.NewFramework(ctx, nil, nil,
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithPodGroupManager(manager),
frameworkruntime.WithPodGroupManager(cache),
frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()),
frameworkruntime.WithPodActivator(fakeActivator),
frameworkruntime.WithSnapshotSharedLister(snapshot),
)
if err != nil {
t.Fatalf("Failed to create framework: %v", err)
@ -257,10 +279,11 @@ func TestGangSchedulingFlow(t *testing.T) {
t.Fatalf("Failed to add podGroup %s to store: %v", wl.Name, err)
}
}
for _, p := range tt.initialPods {
manager.AddPod(p)
cache.AddPodGroupMember(p)
}
manager.AddPod(tt.pod)
cache.AddPodGroupMember(tt.pod)
p, err := New(ctx, nil, fh, feature.Features{EnableGangScheduling: true})
if err != nil {
@ -279,32 +302,62 @@ func TestGangSchedulingFlow(t *testing.T) {
// Simulate that other pods have already hit Permit and are now waiting.
for _, p := range tt.podsWaitingOnPermit {
// Run Reserve and Permit for these pods to get them into the "assumed" state inside the manager.
status := pl.Reserve(ctx, nil, p, "some-node")
if !status.IsSuccess() {
t.Fatalf("Unexpected Reserve status for pod %q: %v", p.Name, status)
pod := p.DeepCopy()
pod.Spec.NodeName = "some-node"
if err := cache.AssumePod(logger, pod); err != nil {
t.Fatalf("Failed to assume pod %q: %v", pod.Name, err)
}
status, _ = pl.Permit(ctx, nil, p, "some-node")
status, _ := pl.Permit(ctx, schedulerframework.NewCycleState(), pod, "some-node")
if status.Code() != fwk.Wait {
t.Fatalf("Expected Wait status while permitting a pod %q: %v", p.Name, status)
t.Fatalf("Expected Wait status while permitting a pod %q: %v", pod.Name, status)
}
}
status := pl.Reserve(ctx, nil, tt.pod, "some-node")
if !status.IsSuccess() {
t.Fatalf("Unexpected Reserve status: %v", status)
}
// Clear activated pods to assert those activated in tt.pod Permit.
fakeActivator.activatedPods = nil
gotPermitStatus, _ := pl.Permit(ctx, nil, tt.pod, "some-node")
cycleState := schedulerframework.NewCycleState()
cycleState.SetPodGroupSchedulingCycle(tt.isDuringPodGroupSchedulingCycle)
pod := tt.pod.DeepCopy()
pod.Spec.NodeName = "some-node"
// In a pod group scheduling cycle, a snapshot is taken after all
// waiting pods are assumed, so that Permit can read from it.
if tt.isDuringPodGroupSchedulingCycle {
if err := cache.UpdateSnapshot(logger, snapshot); err != nil {
t.Fatalf("Failed to update snapshot: %v", err)
}
podInfo, err := schedulerframework.NewPodInfo(pod)
if err != nil {
t.Fatalf("Failed to create pod info for %q: %v", pod.Name, err)
}
// Assume pod in the snapshot, as in a pod group scheduling cycle.
if err := snapshot.AssumePod(podInfo); err != nil {
t.Fatalf("Failed to assume pod %q in snapshot: %v", pod.Name, err)
}
} else {
// Assume pod in the cache, as in a pod-by-pod scheduling cycle, where Permit reads from cache.
if err := cache.AssumePod(logger, pod); err != nil {
t.Fatalf("Failed to assume pod %q in cache: %v", pod.Name, err)
}
}
gotPermitStatus, _ := pl.Permit(ctx, cycleState, pod, "some-node")
if diff := cmp.Diff(tt.wantPermitStatus, gotPermitStatus); diff != "" {
t.Fatalf("Unexpected Permit status (-want, +got):\n%s", diff)
}
if gotPermitStatus.Code() == fwk.Wait {
// Pod waits for others from a gang. Simulate its eventual Unreserve.
pl.Unreserve(ctx, nil, tt.pod, "some-node")
// Pod waits for others from a gang. Simulate its eventual forget.
if tt.isDuringPodGroupSchedulingCycle {
if err := snapshot.ForgetPod(logger, pod); err != nil {
t.Fatalf("Failed to forget pod %q from snapshot: %v", pod.Name, err)
}
} else {
if err := cache.ForgetPod(logger, pod); err != nil {
t.Fatalf("Failed to forget pod %q from cache: %v", pod.Name, err)
}
}
return
}

View file

@ -602,7 +602,7 @@ func TestPrepareCandidate(t *testing.T) {
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
if asyncAPICallsEnabled {
cache := internalcache.New(ctx, apiDispatcher)
cache := internalcache.New(ctx, apiDispatcher, false)
fwk.SetAPICacher(apicache.New(nil, cache))
}
@ -829,7 +829,7 @@ func TestPrepareCandidateAsyncSetsPreemptingSets(t *testing.T) {
}
informerFactory.Start(ctx.Done())
if asyncAPICallsEnabled {
cache := internalcache.New(ctx, apiDispatcher)
cache := internalcache.New(ctx, apiDispatcher, false)
fwk.SetAPICacher(apicache.New(nil, cache))
}

View file

@ -598,7 +598,7 @@ func TestCallExtenders(t *testing.T) {
}
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
cache := internalcache.New(ctx, apiDispatcher)
cache := internalcache.New(ctx, apiDispatcher, false)
fwk.SetAPICacher(apicache.New(nil, cache))
fakePreemptionScorePostFilterPlugin := &FakePreemptionScorePostFilterPlugin{}

View file

@ -82,6 +82,10 @@ func (s sharedLister) StorageInfos() fwk.StorageInfoLister {
return &storageInfoListerContract{}
}
func (s sharedLister) PodGroupStates() fwk.PodGroupStateLister {
return nil
}
var batchRegistry = func() Registry {
r := make(Registry)
err := r.Register("batchTest", newBatchTestPlugin)

View file

@ -262,7 +262,7 @@ func WithAPIDispatcher(apiDispatcher *apidispatcher.APIDispatcher) Option {
}
}
// WithPodGroupManager sets Pod group manager for the scheduling frameworkImpl.
// WithPodGroupManager sets the PodGroupManager for the scheduling frameworkImpl.
func WithPodGroupManager(podGroupManager fwk.PodGroupManager) Option {
return func(o *frameworkOptions) {
o.podGroupManager = podGroupManager

View file

@ -120,7 +120,7 @@ func (sched *Scheduler) podGroupInfoForPod(ctx context.Context, pInfo *framework
logger := klog.FromContext(ctx)
// Get the actual pod group state
podGroupState, err := sched.PodGroupManager.PodGroupState(pInfo.Pod.Namespace, pInfo.Pod.Spec.SchedulingGroup)
podGroupState, err := sched.Cache.PodGroupStates().Get(pInfo.Pod.Namespace, *pInfo.Pod.Spec.SchedulingGroup.PodGroupName)
if err != nil {
return nil, fmt.Errorf("error while retrieving pod group state: %w", err)
}
@ -193,6 +193,9 @@ func initPodSchedulingContext(ctx context.Context, pod *v1.Pod) *podSchedulingCo
podsToActivate := framework.NewPodsToActivate()
state.Write(framework.PodsToActivateKey, podsToActivate)
// Marks this cycle as a pod group scheduling cycle.
state.SetPodGroupSchedulingCycle(true)
return &podSchedulingContext{
logger: logger,
state: state,
@ -439,6 +442,8 @@ func (sched *Scheduler) submitPodGroupAlgorithmResult(ctx context.Context, sched
case podGroupResult.status.IsSuccess():
// Pod no longer needs a pod group scheduling cycle. Setting it to false to disable any checks in further functions.
pInfo.NeedsPodGroupScheduling = false
// Disable pod group scheduling in cycle state before binding.
podCtx.state.SetPodGroupSchedulingCycle(false)
// Schedule result is applied for pod and its binding cycle executes.
assumedPodInfo, status := sched.prepareForBindingCycle(ctx, podCtx.state, schedFwk, pInfo, podCtx.podsToActivate, podResult.scheduleResult)
if !status.IsSuccess() {

View file

@ -42,7 +42,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/apis/config"
internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache"
fakecache "k8s.io/kubernetes/pkg/scheduler/backend/cache/fake"
"k8s.io/kubernetes/pkg/scheduler/backend/podgroupmanager"
internalqueue "k8s.io/kubernetes/pkg/scheduler/backend/queue"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
@ -149,11 +148,11 @@ func TestPodGroupInfoForPod(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
manager := podgroupmanager.New(logger)
cache := internalcache.New(ctx, nil, true)
manager.AddPod(tt.pInfo.Pod)
cache.AddPodGroupMember(tt.pInfo.Pod)
for _, pod := range tt.unscheduledPods {
manager.AddPod(pod)
cache.AddPodGroupMember(pod)
}
q := internalqueue.NewTestQueue(ctx, nil)
@ -164,7 +163,7 @@ func TestPodGroupInfoForPod(t *testing.T) {
}
}
sched := &Scheduler{
PodGroupManager: manager,
Cache: cache,
SchedulingQueue: q,
}
@ -274,7 +273,7 @@ func TestSkipPodGroupPodSchedule(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
cache := internalcache.New(ctx, nil)
cache := internalcache.New(ctx, nil, true)
registry := frameworkruntime.Registry{
queuesort.Name: queuesort.New,
defaultbinder.Name: defaultbinder.New,
@ -362,7 +361,7 @@ func TestPodGroupCycle_UpdateSnapshotError(t *testing.T) {
// Create fake cache that returns error on UpdateSnapshot
updateSnapshotErr := fmt.Errorf("update snapshot error")
cache := &fakecache.Cache{
Cache: internalcache.New(ctx, nil),
Cache: internalcache.New(ctx, nil, true),
UpdateSnapshotFunc: func(nodeSnapshot *internalcache.Snapshot) error {
return updateSnapshotErr
},
@ -795,6 +794,7 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) {
client := clientsetfake.NewClientset(testNode)
informerFactory := informers.NewSharedInformerFactory(client, 0)
queue := internalqueue.NewSchedulingQueue(nil, informerFactory)
snapshot := internalcache.NewEmptySnapshot()
registry := []tf.RegisterPluginFunc{
tf.RegisterFilterPlugin(tt.plugin.Name(), func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) {
@ -816,19 +816,19 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) {
frameworkruntime.WithClientSet(client),
frameworkruntime.WithEventRecorder(events.NewFakeRecorder(100)),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewEmptySnapshot()),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithPodNominator(queue),
)
if err != nil {
t.Fatalf("Failed to create new framework: %v", err)
}
cache := internalcache.New(ctx, nil)
cache := internalcache.New(ctx, nil, true)
cache.AddNode(logger, testNode)
sched := &Scheduler{
Cache: cache,
nodeInfoSnapshot: internalcache.NewEmptySnapshot(),
nodeInfoSnapshot: snapshot,
SchedulingQueue: queue,
Profiles: profile.Map{"test-scheduler": schedFwk},
}
@ -1236,7 +1236,7 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) {
t.Fatalf("Failed to create new framework: %v", err)
}
cache := internalcache.New(ctx, nil)
cache := internalcache.New(ctx, nil, true)
cache.AddNode(klog.FromContext(ctx), testNode)
sched := &Scheduler{

View file

@ -65,7 +65,6 @@ import (
apidispatcher "k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher"
internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache"
fakecache "k8s.io/kubernetes/pkg/scheduler/backend/cache/fake"
"k8s.io/kubernetes/pkg/scheduler/backend/podgroupmanager"
internalqueue "k8s.io/kubernetes/pkg/scheduler/backend/queue"
"k8s.io/kubernetes/pkg/scheduler/framework"
apicalls "k8s.io/kubernetes/pkg/scheduler/framework/api_calls"
@ -1030,20 +1029,6 @@ func TestSchedulerScheduleOne(t *testing.T) {
var gotBinding *v1.Binding
var gotNominatingInfo *fwk.NominatingInfo
var pgm podgroupmanager.PodGroupManager
if scheduleAsPodGroup {
group := &v1.PodSchedulingGroup{
PodGroupName: new("pg"),
}
// When scheduling a pod as a pod group, set scheduling group to all relevant pods.
item.sendPod = withSchedulingGroup(item.sendPod, group)
item.expectErrorPod = withSchedulingGroup(item.expectErrorPod, group)
item.expectPodInBackoffQ = withSchedulingGroup(item.expectPodInBackoffQ, group)
item.expectPodInUnschedulable = withSchedulingGroup(item.expectPodInUnschedulable, group)
pgm = podgroupmanager.New(logger)
pgm.AddPod(item.sendPod)
}
client := clientsetfake.NewClientset(item.sendPod)
informerFactory := informers.NewSharedInformerFactory(client, 0)
client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
@ -1061,7 +1046,19 @@ func TestSchedulerScheduleOne(t *testing.T) {
defer apiDispatcher.Close()
}
internalCache := internalcache.New(ctx, apiDispatcher)
internalCache := internalcache.New(ctx, apiDispatcher, scheduleAsPodGroup)
if scheduleAsPodGroup {
group := &v1.PodSchedulingGroup{
PodGroupName: new("pg"),
}
// When scheduling a pod as a pod group, set scheduling group to all relevant pods.
item.sendPod = withSchedulingGroup(item.sendPod, group)
item.expectErrorPod = withSchedulingGroup(item.expectErrorPod, group)
item.expectPodInBackoffQ = withSchedulingGroup(item.expectPodInBackoffQ, group)
item.expectPodInUnschedulable = withSchedulingGroup(item.expectPodInUnschedulable, group)
internalCache.AddPodGroupMember(item.sendPod)
}
cache := &fakecache.Cache{
Cache: internalCache,
ForgetFunc: func(pod *v1.Pod) {
@ -1119,7 +1116,6 @@ func TestSchedulerScheduleOne(t *testing.T) {
frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()),
frameworkruntime.WithPodsInPreBind(frameworkruntime.NewPodsInPreBindMap()),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithPodGroupManager(pgm),
)
if err != nil {
t.Fatal(err)
@ -1138,7 +1134,6 @@ func TestSchedulerScheduleOne(t *testing.T) {
SchedulingQueue: queue,
Profiles: profile.Map{testSchedulerName: schedFramework},
APIDispatcher: apiDispatcher,
PodGroupManager: pgm,
nominatedNodeNameForExpectationEnabled: features.nominatedNodeNameForExpectationEnabled,
}
queue.Add(ctx, item.sendPod)
@ -1717,7 +1712,7 @@ func TestScheduleOneMarksPodAsProcessedBeforePreBind(t *testing.T) {
defer apiDispatcher.Close()
}
internalCache := internalcache.New(ctx, apiDispatcher)
internalCache := internalcache.New(ctx, apiDispatcher, false)
cache := &fakecache.Cache{
Cache: internalCache,
ForgetFunc: func(pod *v1.Pod) {
@ -1906,7 +1901,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
defer apiDispatcher.Close()
}
scache := internalcache.New(ctx, apiDispatcher)
scache := internalcache.New(ctx, apiDispatcher, false)
firstPod := podWithPort("pod.Name", "", 8080)
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1", UID: types.UID("node1")}}
scache.AddNode(logger, &node)
@ -1995,7 +1990,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
defer apiDispatcher.Close()
}
scache := internalcache.New(ctx, apiDispatcher)
scache := internalcache.New(ctx, apiDispatcher, false)
// Design the baseline for the pods, and we will make nodes that don't fit it later.
var cpu = int64(4)
@ -2303,7 +2298,7 @@ func TestSchedulerBinding(t *testing.T) {
if err != nil {
t.Fatal(err)
}
cache := internalcache.New(ctx, apiDispatcher)
cache := internalcache.New(ctx, apiDispatcher, false)
if asyncAPICallsEnabled {
informerFactory := informers.NewSharedInformerFactory(client, 0)
ar := metrics.NewMetricsAsyncRecorder(10, 1*time.Second, ctx.Done())
@ -3626,7 +3621,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
cache := internalcache.New(ctx, nil)
cache := internalcache.New(ctx, nil, false)
for _, pod := range test.pods {
cache.AddPod(logger, pod)
}
@ -4240,7 +4235,7 @@ func Test_prioritizeNodes(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
cache := internalcache.New(ctx, nil)
cache := internalcache.New(ctx, nil, false)
for _, node := range test.nodes {
cache.AddNode(klog.FromContext(ctx), node)
}
@ -4442,7 +4437,7 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) {
nodes := makeNodeList([]string{"node1", "node2", "node3"})
client := clientsetfake.NewClientset(test.pod)
informerFactory := informers.NewSharedInformerFactory(client, 0)
cache := internalcache.New(ctx, nil)
cache := internalcache.New(ctx, nil, false)
for _, n := range nodes {
cache.AddNode(logger, n)
}
@ -4525,7 +4520,7 @@ func makeNodeList(nodeNames []string) []*v1.Node {
// makeScheduler makes a simple Scheduler for testing.
func makeScheduler(ctx context.Context, nodes []*v1.Node) *Scheduler {
logger := klog.FromContext(ctx)
cache := internalcache.New(ctx, nil)
cache := internalcache.New(ctx, nil, false)
for _, n := range nodes {
cache.AddNode(logger, n)
}
@ -4689,7 +4684,7 @@ func setupTestSchedulerWithVolumeBinding(ctx context.Context, t *testing.T, clie
t.Cleanup(apiDispatcher.Close)
}
scache := internalcache.New(ctx, apiDispatcher)
scache := internalcache.New(ctx, apiDispatcher, false)
scache.AddNode(logger, &testNode)
informerFactory := informers.NewSharedInformerFactory(client, 0)
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()

View file

@ -44,7 +44,6 @@ import (
apidispatcher "k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher"
internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache"
cachedebugger "k8s.io/kubernetes/pkg/scheduler/backend/cache/debugger"
internalpodgroupmanager "k8s.io/kubernetes/pkg/scheduler/backend/podgroupmanager"
internalqueue "k8s.io/kubernetes/pkg/scheduler/backend/queue"
"k8s.io/kubernetes/pkg/scheduler/framework"
apicalls "k8s.io/kubernetes/pkg/scheduler/framework/api_calls"
@ -98,9 +97,6 @@ type Scheduler struct {
// framework.APICache should be used instead.
APIDispatcher *apidispatcher.APIDispatcher
// PodGroupManager can be used to provide workload-aware scheduling.
PodGroupManager internalpodgroupmanager.PodGroupManager
// Profiles are the scheduling profiles.
Profiles profile.Map
@ -351,10 +347,8 @@ func New(ctx context.Context,
if feature.DefaultFeatureGate.Enabled(features.SchedulerAsyncAPICalls) {
apiDispatcher = apidispatcher.New(client, int(options.parallelism), apicalls.Relevances)
}
var podGroupManager internalpodgroupmanager.PodGroupManager
if feature.DefaultFeatureGate.Enabled(features.GenericWorkload) {
podGroupManager = internalpodgroupmanager.New(logger)
}
schedulerCache := internalcache.New(ctx, apiDispatcher, feature.DefaultFeatureGate.Enabled(features.GenericWorkload))
profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
@ -371,7 +365,7 @@ func New(ctx context.Context,
frameworkruntime.WithPodsInPreBind(podsInPreBind),
frameworkruntime.WithAPIDispatcher(apiDispatcher),
frameworkruntime.WithSharedCSIManager(sharedCSIManager),
frameworkruntime.WithPodGroupManager(podGroupManager),
frameworkruntime.WithPodGroupManager(schedulerCache),
)
if err != nil {
return nil, fmt.Errorf("initializing profiles: %v", err)
@ -423,8 +417,6 @@ func New(ctx context.Context,
internalqueue.WithPodSigners(podSigners),
)
schedulerCache := internalcache.New(ctx, apiDispatcher)
var apiCache fwk.APICacher
if apiDispatcher != nil {
apiCache = apicache.New(podQueue, schedulerCache)
@ -452,7 +444,6 @@ func New(ctx context.Context,
logger: logger,
APIDispatcher: apiDispatcher,
nominatedNodeNameForExpectationEnabled: feature.DefaultFeatureGate.Enabled(features.NominatedNodeNameForExpectation),
PodGroupManager: podGroupManager,
}
sched.NextPod = podQueue.Pop
sched.applyDefaultHandlers()

View file

@ -310,7 +310,7 @@ func TestFailureHandler(t *testing.T) {
recorder := metrics.NewMetricsAsyncRecorder(3, 20*time.Microsecond, ctx.Done())
queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(testingclock.NewFakeClock(time.Now())), internalqueue.WithMetricsRecorder(recorder), internalqueue.WithAPIDispatcher(apiDispatcher))
schedulerCache := internalcache.New(ctx, apiDispatcher)
schedulerCache := internalcache.New(ctx, apiDispatcher, false)
queue.Add(ctx, testPod)
@ -384,7 +384,7 @@ func TestFailureHandler_PodAlreadyBound(t *testing.T) {
}
queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(testingclock.NewFakeClock(time.Now())), internalqueue.WithAPIDispatcher(apiDispatcher))
schedulerCache := internalcache.New(ctx, apiDispatcher)
schedulerCache := internalcache.New(ctx, apiDispatcher, false)
// Add node to schedulerCache no matter it's deleted in API server or not.
schedulerCache.AddNode(logger, &nodeFoo)

View file

@ -89,6 +89,10 @@ type CycleState interface {
// Clone creates a copy of CycleState and returns its pointer. Clone returns
// nil if the context being cloned is nil.
Clone() CycleState
// IsPodGroupSchedulingCycle returns true if this cycle is a pod group scheduling cycle.
IsPodGroupSchedulingCycle() bool
// SetPodGroupSchedulingCycle sets whether this cycle is a pod group scheduling cycle or not.
SetPodGroupSchedulingCycle(bool)
}
// PodGroupCycleState provides a mechanism for plugins that operate on pod groups to store and retrieve arbitrary data.

View file

@ -882,7 +882,7 @@ type Handle interface {
// ProfileName returns the profile name associated to a profile.
ProfileName() string
// PodGroupManager can be used to provide workload-aware scheduling.
// PodGroupManager provides an interface for runtime information about pod groups from scheduler's cache.
PodGroupManager() PodGroupManager
// SignPod creates a PodSignature for a pod.

View file

@ -17,8 +17,6 @@ limitations under the License.
package framework
import (
"time"
v1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1"
storagev1 "k8s.io/api/storage/v1"
@ -50,6 +48,13 @@ type StorageInfoLister interface {
type SharedLister interface {
NodeInfos() NodeInfoLister
StorageInfos() StorageInfoLister
PodGroupStates() PodGroupStateLister
}
// PodGroupStateLister provides read access to pod group states.
type PodGroupStateLister interface {
// Get returns the PodGroupState of the given pod group.
Get(namespace string, podGroupName string) (PodGroupState, error)
}
type CSINodeLister interface {
@ -146,14 +151,13 @@ type CSIManager interface {
CSINodes() CSINodeLister
}
// PodGroupManager provides an interface for scheduling plugins to provide workload-aware scheduling.
// It acts as the central source of truth for runtime information about pod groups.
// PodGroupManager provides an interface for runtime information about pod groups in the scheduler cache.
type PodGroupManager interface {
// PodGroupState retrieves the runtime state for a specific pod group, identified by pod group's name and namespace.
PodGroupState(namespace string, schedulingGroup *v1.PodSchedulingGroup) (PodGroupState, error)
// PodGroupStates returns the PodGroupStateLister.
PodGroupStates() PodGroupStateLister
}
// PodGroupState provides an interface to view and modify the state of a single pod group.
// PodGroupState provides an interface to view the state of a single pod group.
type PodGroupState interface {
// AllPods returns the UIDs of all pods known to the scheduler for this group.
AllPods() sets.Set[types.UID]
@ -168,13 +172,8 @@ type PodGroupState interface {
AssumedPods() sets.Set[types.UID]
// AssignedPods returns the UIDs of all pods already assigned (bound) for this group.
AssignedPods() sets.Set[types.UID]
// ScheduledPods returns the pods that are either assumed or assigned for this pod group.
ScheduledPods() []*v1.Pod
// ScheduledPodsCount returns the number of pods for this group that are either assumed or assigned.
ScheduledPodsCount() int
// AssumePod marks a pod as having reached the Reserve stage.
AssumePod(podUID types.UID)
// ForgetPod removes a pod from the assumed state.
ForgetPod(podUID types.UID)
// SchedulingTimeout returns the remaining time until the pod group scheduling times out.
// A new deadline is created if one doesn't exist, or if the previous one has expired.
SchedulingTimeout() time.Duration
}

View file

@ -30,7 +30,6 @@ import (
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/backend/podgroupmanager"
"k8s.io/kubernetes/pkg/scheduler/backend/queue"
st "k8s.io/kubernetes/pkg/scheduler/testing"
testutils "k8s.io/kubernetes/test/integration/util"
@ -393,8 +392,6 @@ func TestPodGroupScheduling(t *testing.T) {
features.GangScheduling: true,
})
podgroupmanager.DefaultSchedulingTimeoutDuration = 5 * time.Second
testCtx := testutils.InitTestSchedulerWithNS(t, "podgroup-scheduling",
// disable backoff
scheduler.WithPodMaxBackoffSeconds(0),