Merge pull request #139130 from antekjb/ReturnResultForAllPodsFromScheduling

Ensure that podGroupAlgorithmResult contains result for all pods from podGroup
This commit is contained in:
Kubernetes Prow Robot 2026-05-25 14:03:22 +05:30 committed by GitHub
commit 122e9166ae
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 211 additions and 16 deletions

View file

@ -229,11 +229,16 @@ func (sched *Scheduler) podGroupCycle(ctx context.Context, schedFwk framework.Fr
result := podGroupAlgorithmResult{
status: fwk.AsStatus(err),
}
// Ensure podResults has an entry for each pod in the pod group with Error status.
result = completePodGroupAlgorithmResult(ctx, podGroupInfo, podGroupCycleState, runAllPostFilters, result)
sched.submitPodGroupAlgorithmResult(ctx, schedFwk, podGroupCycleState, podGroupInfo, result, start)
return
}
result := sched.podGroupSchedulingAlgorithm(ctx, schedFwk, podGroupCycleState, podGroupInfo, runAllPostFilters)
// Ensure podResults has an entry for each pod in the pod group with a status.
result = completePodGroupAlgorithmResult(ctx, podGroupInfo, podGroupCycleState, runAllPostFilters, result)
metrics.PodGroupSchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
// Run workload aware preemption if required. If the preemption is successful,
@ -490,7 +495,29 @@ func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, sche
}, revertFn
}
// completePodGroupAlgorithmResult ensures that the podGroupAlgorithmResult contains the same number of podResults as there are pods in QueuedPodInfos.
func completePodGroupAlgorithmResult(ctx context.Context, podGroupInfo *framework.QueuedPodGroupInfo, podGroupState *framework.CycleState, postFilterMode podGroupPostFilterMode, podGroupResult podGroupAlgorithmResult) podGroupAlgorithmResult {
numInResult := len(podGroupResult.podResults)
numInQueue := len(podGroupInfo.QueuedPodInfos)
if numInResult == numInQueue {
return podGroupResult
}
newResults := make([]algorithmResult, numInQueue)
copy(newResults, podGroupResult.podResults)
for i := numInResult; i < numInQueue; i++ {
pInfo := podGroupInfo.QueuedPodInfos[i]
newResults[i] = algorithmResult{
podCtx: initPodSchedulingContext(ctx, pInfo.Pod, podGroupState, postFilterMode),
status: podGroupResult.status.Clone(),
}
}
podGroupResult.podResults = newResults
return podGroupResult
}
// submitPodGroupAlgorithmResult submits the result of the pod group scheduling algorithm.
// It assumes that podGroupResult contains results for all pods from the pod group,
// if it does not, podGroupCondition will be updated to reflect the error.
// If that algorithm succedeed, the schedulable pods proceed to the binding cycle.
// Unschedulable pods are moved back to the scheduling queue and need to wait
// for the next pod group scheduling cycle.
@ -499,19 +526,16 @@ func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, sche
func (sched *Scheduler) submitPodGroupAlgorithmResult(ctx context.Context, schedFwk framework.Framework, podGroupState *framework.CycleState, podGroupInfo *framework.QueuedPodGroupInfo, podGroupResult podGroupAlgorithmResult, start time.Time) {
logger := klog.FromContext(ctx)
if len(podGroupResult.podResults) != len(podGroupInfo.QueuedPodInfos) {
// This should never happen, but if it does, complete the result with the error status.
logger.Error(fmt.Errorf("some pods were not processed"), "scheduling error for pod group", "podGroup", klog.KObj(podGroupInfo))
podGroupResult.status = fwk.NewStatus(fwk.Error, "scheduling error for pod group, some pods were not processed")
podGroupResult.podResults = nil
podGroupResult = completePodGroupAlgorithmResult(ctx, podGroupInfo, podGroupState, runAllPostFilters, podGroupResult)
}
var scheduledPods, unschedulablePods int
for i, pInfo := range podGroupInfo.QueuedPodInfos {
var podResult algorithmResult
if len(podGroupResult.podResults) > i {
podResult = podGroupResult.podResults[i]
} else {
// In pod group-level unschedulable or error cases, podResult may not be defined.
// Initialize it now to handle pod failure correctly.
podResult = algorithmResult{
podCtx: initPodSchedulingContext(ctx, pInfo.Pod, podGroupState, runAllPostFilters),
status: podGroupResult.status.Clone(),
}
}
podResult := podGroupResult.podResults[i]
podCtx := podResult.podCtx
ctx := klog.NewContext(ctx, podCtx.logger)
// To be consistent with pod-by-pod scheduling, construct pod scheduling start time as `now - scheduling duration`.
@ -776,14 +800,13 @@ func makeProposedAssignments(res *podGroupAlgorithmResult) []fwk.ProposedAssignm
return proposedAssignments
}
// podGroupSchedulingAlgorithm attempts to schedule pods in the pod group according to the policy and constraints and returns the scheduling result for each pod in the pod group.
// podGroupSchedulingAlgorithm attempts to schedule pods in the pod group according to the policy and constraints and returns the scheduling result for all evaluated pods in the pod group, not necessarily all pods in the pod group.
func (sched *Scheduler) podGroupSchedulingAlgorithm(ctx context.Context, schedFwk framework.Framework, podGroupCycleState *framework.CycleState, podGroupInfo *framework.QueuedPodGroupInfo, postFilterMode podGroupPostFilterMode) podGroupAlgorithmResult {
podGroupCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareWorkloadScheduling) {
return sched.podGroupSchedulingPlacementAlgorithm(podGroupCycleCtx, schedFwk, podGroupCycleState, podGroupInfo, postFilterMode)
} else {
return sched.podGroupSchedulingDefaultAlgorithm(podGroupCycleCtx, schedFwk, podGroupCycleState, podGroupInfo, postFilterMode)
}
return sched.podGroupSchedulingDefaultAlgorithm(podGroupCycleCtx, schedFwk, podGroupCycleState, podGroupInfo, postFilterMode)
}

View file

@ -424,6 +424,141 @@ func TestPodGroupCycle_UpdateSnapshotError(t *testing.T) {
}
}
func TestPodGroupCycle_FillsPodResultsOnFewerResults(t *testing.T) {
testPodGroup := st.MakePodGroup().Name("pg").Namespace("default").Obj()
p1 := st.MakePod().Name("p1").UID("p1").PodGroupName("pg").SchedulerName("test-scheduler").Obj()
p2 := st.MakePod().Name("p2").UID("p2").PodGroupName("pg").SchedulerName("test-scheduler").Obj()
p3 := st.MakePod().Name("p3").UID("p3").PodGroupName("pg").SchedulerName("test-scheduler").Obj()
testNode := st.MakeNode().Name("node1").UID("node1").Obj()
qInfo1 := &framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: p1}}
qInfo2 := &framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: p2}}
qInfo3 := &framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: p3}}
podGroupInfo := &framework.QueuedPodGroupInfo{
QueuedPodInfos: []*framework.QueuedPodInfo{qInfo1, qInfo2, qInfo3},
PodGroupInfo: &framework.PodGroupInfo{
Name: "pg",
Namespace: "default",
UnscheduledPods: []*v1.Pod{p1, p2, p3},
},
}
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
fakePlugin := &fakePodGroupPlugin{
filterStatus: map[string]*fwk.Status{
"p1": nil,
"p2": fwk.NewStatus(fwk.Error, "filter error for p2"),
"p3": nil,
},
permitStatus: map[string]*fwk.Status{
"p1": nil,
"p2": nil,
"p3": nil,
},
}
registry := []tf.RegisterPluginFunc{
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
tf.RegisterPostFilterPlugin(fakePlugin.Name(), func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) {
return fakePlugin, nil
}),
tf.RegisterPermitPlugin(fakePlugin.Name(), func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) {
return fakePlugin, nil
}),
tf.RegisterFilterPlugin(fakePlugin.Name(), func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) {
return fakePlugin, nil
}),
}
client := clientsetfake.NewSimpleClientset(testPodGroup, testNode)
informerFactory := informers.NewSharedInformerFactory(client, 0)
podGroupLister := informerFactory.Scheduling().V1alpha3().PodGroups().Lister()
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
queue := internalqueue.NewSchedulingQueue(nil, informerFactory)
snapshot := internalcache.NewEmptySnapshot()
schedFwk, err := tf.NewFramework(ctx, registry, "test-scheduler",
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithPodNominator(queue),
frameworkruntime.WithClientSet(client),
frameworkruntime.WithEventRecorder(events.NewFakeRecorder(100)),
)
if err != nil {
t.Fatalf("Failed to create new framework: %v", err)
}
cache := internalcache.New(ctx, nil, true)
logger, ctx := ktesting.NewTestContext(t)
cache.AddNode(logger, testNode)
handledPods := make(map[string]*fwk.Status)
var lock sync.Mutex
sched := &Scheduler{
Profiles: profile.Map{"test-scheduler": schedFwk},
SchedulingQueue: internalqueue.NewTestQueue(ctx, nil),
Cache: cache,
client: client,
podGroupLister: podGroupLister,
nodeInfoSnapshot: internalcache.NewEmptySnapshot(),
workloadAwarePreemptionEnabled: false,
FailureHandler: func(ctx context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *fwk.Status, ni *fwk.NominatingInfo, start time.Time) {
lock.Lock()
defer lock.Unlock()
handledPods[p.Pod.Name] = status
},
}
// Checking that scheduling algorithm is returning shorter list
if err := sched.Cache.UpdateSnapshot(logger, sched.nodeInfoSnapshot); err != nil {
t.Fatalf("Failed to update snapshot: %v", err)
}
sched.SchedulePod = sched.schedulePod
schedulePodResult := sched.podGroupSchedulingAlgorithm(ctx, schedFwk, framework.NewCycleState(), podGroupInfo, runAllPostFilters)
if len(schedulePodResult.podResults) != 2 {
t.Errorf("Expected 2 pod results, got %d", len(schedulePodResult.podResults))
}
// Run the scheduling cycle and check that all pods are handled.
sched.podGroupCycle(ctx, schedFwk, framework.NewCycleState(), podGroupInfo)
lock.Lock()
defer lock.Unlock()
if len(handledPods) != 3 {
t.Errorf("Expected FailureHandler to be called for 3 pods, but got called for %d", len(handledPods))
}
expectedGroupErrMsg := "failed to schedule other pod from a pod group: running \"FakePodGroupPlugin\" filter plugin: filter error for p2"
expectedP2ErrMsg := "running \"FakePodGroupPlugin\" filter plugin: filter error for p2"
if status, ok := handledPods["p1"]; !ok {
t.Errorf("Expected FailureHandler to be called for p1")
} else if status.AsError() == nil || status.AsError().Error() != expectedGroupErrMsg {
t.Errorf("Expected status error for p1 to be %q, got %v", expectedGroupErrMsg, status.AsError())
}
if status, ok := handledPods["p2"]; !ok {
t.Errorf("Expected FailureHandler to be called for p2")
} else if status.AsError() == nil || status.AsError().Error() != expectedP2ErrMsg {
t.Errorf("Expected status error for p2 to be %q, got %v", expectedP2ErrMsg, status.AsError())
}
if status, ok := handledPods["p3"]; !ok {
t.Errorf("Expected FailureHandler to be called for p3")
} else if status.AsError() == nil || status.AsError().Error() != expectedGroupErrMsg {
t.Errorf("Expected status error for p3 to be %q, got %v", expectedGroupErrMsg, status.AsError())
}
}
func TestPodGroupCycle_PodGroupPostFilter(t *testing.T) {
tests := []struct {
name string
@ -1391,8 +1526,17 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) {
{
name: "Unschedulable for the entire pod group",
algorithmResult: podGroupAlgorithmResult{
status: fwk.NewStatus(fwk.Unschedulable, "node affinity mismatch"),
podResults: []algorithmResult{},
status: fwk.NewStatus(fwk.Unschedulable, "node affinity mismatch"),
podResults: []algorithmResult{{
scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode},
status: fwk.NewStatus(fwk.Unschedulable),
}, {
scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode},
status: fwk.NewStatus(fwk.Unschedulable),
}, {
scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode},
status: fwk.NewStatus(fwk.Unschedulable),
}},
},
expectBound: sets.New[string](),
expectFailed: sets.New("p1", "p2", "p3"),
@ -1414,6 +1558,9 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) {
}, {
scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode},
status: fwk.NewStatus(fwk.Error, "plugin returned error"),
}, {
scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode},
status: fwk.NewStatus(fwk.Error, "plugin returned error"),
}},
},
expectBound: sets.New[string](),
@ -1440,6 +1587,9 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) {
}, {
scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode},
status: fwk.NewStatus(fwk.Error, "internal failure"),
}, {
scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode},
status: fwk.NewStatus(fwk.Error, "internal failure"),
}},
},
expectBound: sets.New[string](),
@ -1546,6 +1696,9 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) {
}, {
scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode},
status: fwk.NewStatus(fwk.Error),
}, {
scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode},
status: fwk.NewStatus(fwk.Error),
}},
},
expectBound: sets.New[string](),
@ -1557,6 +1710,25 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) {
Message: "All pods scheduled",
},
},
{
name: "Different number of pods in result and queue, should fail all queue pods",
algorithmResult: podGroupAlgorithmResult{
status: fwk.NewStatus(fwk.Error),
podResults: []algorithmResult{{
scheduleResult: ScheduleResult{SuggestedHost: "node1"},
status: nil,
permitStatus: nil,
}},
},
expectBound: sets.New[string](),
expectFailed: sets.New("p1", "p2", "p3"),
expectCondition: &metav1.Condition{
Type: schedulingapi.PodGroupScheduled,
Status: metav1.ConditionFalse,
Reason: schedulingapi.PodGroupReasonSchedulerError,
Message: fwk.NewStatus(fwk.Error, "scheduling error for pod group, some pods were not processed").AsError().Error(),
},
},
}
for _, tt := range tests {