diff --git a/pkg/scheduler/extender_test.go b/pkg/scheduler/extender_test.go index b766ad50594..5d0060b47c5 100644 --- a/pkg/scheduler/extender_test.go +++ b/pkg/scheduler/extender_test.go @@ -358,8 +358,8 @@ func TestSchedulerWithExtenders(t *testing.T) { } sched.applyDefaultHandlers() - podIgnored := &v1.Pod{} - result, err := sched.SchedulePod(ctx, fwk, framework.NewCycleState(), podIgnored) + podInfoIgnored := queuedPodInfoForPod(&v1.Pod{}) + result, err := sched.SchedulePod(ctx, fwk, framework.NewCycleState(), podInfoIgnored) if test.expectsErr { if err == nil { t.Errorf("Unexpected non-error, result %+v", result) diff --git a/pkg/scheduler/schedule_one.go b/pkg/scheduler/schedule_one.go index c54165bedc8..e4b03b53398 100644 --- a/pkg/scheduler/schedule_one.go +++ b/pkg/scheduler/schedule_one.go @@ -73,7 +73,12 @@ func (sched *Scheduler) ScheduleOne(ctx context.Context) { if podInfo == nil || podInfo.Pod == nil { return } + sched.scheduleOnePod(ctx, podInfo) +} +// scheduleOnePod does the entire scheduling workflow for a single pod. +func (sched *Scheduler) scheduleOnePod(ctx context.Context, podInfo *framework.QueuedPodInfo) { + logger := klog.FromContext(ctx) pod := podInfo.Pod // TODO(knelasevero): Remove duplicated keys from log entry calls // When contextualized logging hits GA @@ -120,19 +125,29 @@ func (sched *Scheduler) ScheduleOne(ctx context.Context) { } // bind the pod to its host asynchronously (we can do this b/c of the assumption step above). - go func() { - bindingCycleCtx, cancel := context.WithCancel(ctx) - defer cancel() + go sched.runBindingCycle(ctx, state, fwk, scheduleResult, assumedPodInfo, start, podsToActivate) +} - metrics.Goroutines.WithLabelValues(metrics.Binding).Inc() - defer metrics.Goroutines.WithLabelValues(metrics.Binding).Dec() +// runBindingCycle runs a binding cycle algorithm. +func (sched *Scheduler) runBindingCycle( + ctx context.Context, + state fwk.CycleState, + schedFramework framework.Framework, + scheduleResult ScheduleResult, + assumedPodInfo *framework.QueuedPodInfo, + start time.Time, + podsToActivate *framework.PodsToActivate) { + bindingCycleCtx, cancel := context.WithCancel(ctx) + defer cancel() - status := sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, start, podsToActivate) - if !status.IsSuccess() { - sched.handleBindingCycleError(bindingCycleCtx, state, fwk, assumedPodInfo, start, scheduleResult, status) - return - } - }() + metrics.Goroutines.WithLabelValues(metrics.Binding).Inc() + defer metrics.Goroutines.WithLabelValues(metrics.Binding).Dec() + + status := sched.bindingCycle(bindingCycleCtx, state, schedFramework, scheduleResult, assumedPodInfo, start, podsToActivate) + if !status.IsSuccess() { + sched.handleBindingCycleError(bindingCycleCtx, state, schedFramework, assumedPodInfo, start, scheduleResult, status) + return + } } var clearNominatedNode = &fwk.NominatingInfo{NominatingMode: fwk.ModeOverride, NominatedNodeName: ""} @@ -146,22 +161,98 @@ func (sched *Scheduler) schedulingCycle( start time.Time, podsToActivate *framework.PodsToActivate, ) (ScheduleResult, *framework.QueuedPodInfo, *fwk.Status) { - logger := klog.FromContext(ctx) + scheduleResult, status := sched.schedulingAlgorithm(ctx, state, schedFramework, podInfo, start) + if !status.IsSuccess() { + return scheduleResult, podInfo, status + } + + assumedPodInfo, status := sched.prepareForBindingCycle(ctx, state, schedFramework, podInfo, podsToActivate, scheduleResult) + if !status.IsSuccess() { + return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, status + } + + return scheduleResult, assumedPodInfo, nil +} + +// prepareForBindingCycle applies the schedule result, generated by the scheduling algorithm, in memory, +// preparing for a binding cycle. +func (sched *Scheduler) prepareForBindingCycle( + ctx context.Context, + state fwk.CycleState, + schedFramework framework.Framework, + podInfo *framework.QueuedPodInfo, + podsToActivate *framework.PodsToActivate, + scheduleResult ScheduleResult, +) (*framework.QueuedPodInfo, *fwk.Status) { + assumedPodInfo, status := sched.assumeAndReserve(ctx, state, schedFramework, podInfo, scheduleResult) + if !status.IsSuccess() { + return assumedPodInfo, status + } + assumedPod := assumedPodInfo.Pod + + // Run "permit" plugins. + runPermitStatus := schedFramework.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost) + if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() { + // trigger un-reserve plugins to clean up state associated with the reserved Pod + err := sched.unreserveAndForget(ctx, state, schedFramework, assumedPodInfo, scheduleResult.SuggestedHost) + if err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "Unreserve and forget failed") + } + + if runPermitStatus.IsRejected() { + fitErr := &framework.FitError{ + NumAllNodes: 1, + Pod: podInfo.Pod, + Diagnosis: framework.Diagnosis{ + NodeToStatus: framework.NewDefaultNodeToStatus(), + }, + } + fitErr.Diagnosis.NodeToStatus.Set(scheduleResult.SuggestedHost, runPermitStatus) + fitErr.Diagnosis.AddPluginStatus(runPermitStatus) + return assumedPodInfo, fwk.NewStatus(runPermitStatus.Code()).WithError(fitErr) + } + + return assumedPodInfo, runPermitStatus + } + + // At the end of a successful scheduling cycle, pop and move up Pods if needed. + if len(podsToActivate.Map) != 0 { + logger := klog.FromContext(ctx) + sched.SchedulingQueue.Activate(logger, podsToActivate.Map) + // Clear the entries after activation. + podsToActivate.Map = make(map[string]*v1.Pod) + } + + return assumedPodInfo, nil +} + +// schedulingAlgorithm runs fitering and scoring phases for a single pod, +// together with post filter when the pod is unschedulable. +func (sched *Scheduler) schedulingAlgorithm( + ctx context.Context, + state fwk.CycleState, + schedFramework framework.Framework, + podInfo *framework.QueuedPodInfo, + start time.Time, +) (ScheduleResult, *fwk.Status) { + defer func() { + metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start)) + }() + pod := podInfo.Pod - scheduleResult, err := sched.SchedulePod(ctx, schedFramework, state, pod) + + logger := klog.FromContext(ctx) + scheduleResult, err := sched.SchedulePod(ctx, schedFramework, state, podInfo) if err != nil { - defer func() { - metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start)) - }() if err == ErrNoNodesAvailable { status := fwk.NewStatus(fwk.UnschedulableAndUnresolvable).WithError(err) - return ScheduleResult{nominatingInfo: clearNominatedNode}, podInfo, status + return ScheduleResult{nominatingInfo: clearNominatedNode}, status } fitError, ok := err.(*framework.FitError) if !ok { logger.Error(err, "Error selecting node for pod", "pod", klog.KObj(pod)) - return ScheduleResult{nominatingInfo: clearNominatedNode}, podInfo, fwk.AsStatus(err) + return ScheduleResult{nominatingInfo: clearNominatedNode}, fwk.AsStatus(err) } // SchedulePod() may have failed because the pod would not fit on any host, so we try to @@ -171,7 +262,7 @@ func (sched *Scheduler) schedulingCycle( if !schedFramework.HasPostFilterPlugins() { logger.V(3).Info("No PostFilter plugins are registered, so no preemption will be performed") - return ScheduleResult{nominatingInfo: clearNominatedNode}, podInfo, fwk.NewStatus(fwk.Unschedulable).WithError(err) + return ScheduleResult{nominatingInfo: clearNominatedNode}, fwk.NewStatus(fwk.Unschedulable).WithError(err) } // Run PostFilter plugins to attempt to make the pod schedulable in a future scheduling cycle. @@ -188,81 +279,72 @@ func (sched *Scheduler) schedulingCycle( if result != nil { nominatingInfo = result.NominatingInfo } - return ScheduleResult{nominatingInfo: nominatingInfo}, podInfo, fwk.NewStatus(fwk.Unschedulable).WithError(err) + return ScheduleResult{nominatingInfo: nominatingInfo}, fwk.NewStatus(fwk.Unschedulable).WithError(err) } + return scheduleResult, nil +} - metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start)) +// assumeAndReserve assumes and reserves the pod in scheduler's memory. +func (sched *Scheduler) assumeAndReserve( + ctx context.Context, + state fwk.CycleState, + schedFramework framework.Framework, + podInfo *framework.QueuedPodInfo, + scheduleResult ScheduleResult, +) (*framework.QueuedPodInfo, *fwk.Status) { + logger := klog.FromContext(ctx) // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet. // This allows us to keep scheduling without waiting on binding to occur. assumedPodInfo := podInfo.DeepCopy() assumedPod := assumedPodInfo.Pod // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost - err = sched.assume(logger, assumedPod, scheduleResult.SuggestedHost) + err := sched.assume(logger, assumedPodInfo, scheduleResult.SuggestedHost) if err != nil { // This is most probably result of a BUG in retrying logic. // We report an error here so that pod scheduling can be retried. // This relies on the fact that Error will check if the pod has been bound // to a node and if so will not add it back to the unscheduled pods queue // (otherwise this would cause an infinite loop). - return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, fwk.AsStatus(err) + return assumedPodInfo, fwk.AsStatus(err) } // Run the Reserve method of reserve plugins. if sts := schedFramework.RunReservePluginsReserve(ctx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { // trigger un-reserve to clean up state associated with the reserved Pod - schedFramework.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost) - if forgetErr := sched.Cache.ForgetPod(logger, assumedPod); forgetErr != nil { - utilruntime.HandleErrorWithContext(ctx, forgetErr, "Scheduler cache ForgetPod failed") + err := sched.unreserveAndForget(ctx, state, schedFramework, assumedPodInfo, scheduleResult.SuggestedHost) + if err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "Unreserve and forget failed") } if sts.IsRejected() { fitErr := &framework.FitError{ NumAllNodes: 1, - Pod: pod, + Pod: podInfo.Pod, Diagnosis: framework.Diagnosis{ NodeToStatus: framework.NewDefaultNodeToStatus(), }, } fitErr.Diagnosis.NodeToStatus.Set(scheduleResult.SuggestedHost, sts) fitErr.Diagnosis.AddPluginStatus(sts) - return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, fwk.NewStatus(sts.Code()).WithError(fitErr) + return assumedPodInfo, fwk.NewStatus(sts.Code()).WithError(fitErr) } - return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, sts + return assumedPodInfo, sts } + return assumedPodInfo, nil +} - // Run "permit" plugins. - runPermitStatus := schedFramework.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost) - if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() { - // trigger un-reserve to clean up state associated with the reserved Pod - schedFramework.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost) - if forgetErr := sched.Cache.ForgetPod(logger, assumedPod); forgetErr != nil { - utilruntime.HandleErrorWithContext(ctx, forgetErr, "Scheduler cache ForgetPod failed") - } +// unreserveAndForget unreserves and forgets the pod from scheduler's memory. +func (sched *Scheduler) unreserveAndForget( + ctx context.Context, + state fwk.CycleState, + schedFramework framework.Framework, + assumedPodInfo *framework.QueuedPodInfo, + nodeName string, +) error { + logger := klog.FromContext(ctx) - if runPermitStatus.IsRejected() { - fitErr := &framework.FitError{ - NumAllNodes: 1, - Pod: pod, - Diagnosis: framework.Diagnosis{ - NodeToStatus: framework.NewDefaultNodeToStatus(), - }, - } - fitErr.Diagnosis.NodeToStatus.Set(scheduleResult.SuggestedHost, runPermitStatus) - fitErr.Diagnosis.AddPluginStatus(runPermitStatus) - return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, fwk.NewStatus(runPermitStatus.Code()).WithError(fitErr) - } - - return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, runPermitStatus - } - - // At the end of a successful scheduling cycle, pop and move up Pods if needed. - if len(podsToActivate.Map) != 0 { - sched.SchedulingQueue.Activate(logger, podsToActivate.Map) - // Clear the entries after activation. - podsToActivate.Map = make(map[string]*v1.Pod) - } - - return scheduleResult, assumedPodInfo, nil + schedFramework.RunReservePluginsUnreserve(ctx, state, assumedPodInfo.Pod, nodeName) + return sched.Cache.ForgetPod(logger, assumedPodInfo.Pod) } // bindingCycle tries to bind an assumed Pod. @@ -370,9 +452,8 @@ func (sched *Scheduler) handleBindingCycleError( assumedPod := podInfo.Pod // trigger un-reserve plugins to clean up state associated with the reserved Pod - fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost) - if forgetErr := sched.Cache.ForgetPod(logger, assumedPod); forgetErr != nil { - utilruntime.HandleErrorWithContext(ctx, forgetErr, "scheduler cache ForgetPod failed") + if forgetErr := sched.unreserveAndForget(ctx, state, fwk, podInfo, scheduleResult.SuggestedHost); forgetErr != nil { + utilruntime.HandleErrorWithContext(ctx, forgetErr, "Unreserve and forget failed") } else { // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, // as the assumed Pod had occupied a certain amount of resources in scheduler cache. @@ -423,7 +504,8 @@ func (sched *Scheduler) skipPodSchedule(ctx context.Context, fwk framework.Frame // schedulePod tries to schedule the given pod to one of the nodes in the node list. // If it succeeds, it will return the name of the node. // If it fails, it will return a FitError with reasons. -func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state fwk.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { +func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state fwk.CycleState, podInfo *framework.QueuedPodInfo) (result ScheduleResult, err error) { + pod := podInfo.Pod trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name}) defer trace.LogIfLong(100 * time.Millisecond) if err := sched.Cache.UpdateSnapshot(klog.FromContext(ctx), sched.nodeInfoSnapshot); err != nil { @@ -435,7 +517,7 @@ func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework return result, ErrNoNodesAvailable } - feasibleNodes, diagnosis, nodeHint, signature, err := sched.findNodesThatFitPod(ctx, fwk, state, pod) + feasibleNodes, diagnosis, nodeHint, signature, err := sched.findNodesThatFitPod(ctx, fwk, state, podInfo) if err != nil { return result, err } @@ -484,11 +566,12 @@ func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework // Filters the nodes to find the ones that fit the pod based on the framework // filter plugins and filter extenders. -func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, schedFramework framework.Framework, state fwk.CycleState, pod *v1.Pod) ([]fwk.NodeInfo, framework.Diagnosis, string, fwk.PodSignature, error) { +func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, schedFramework framework.Framework, state fwk.CycleState, podInfo *framework.QueuedPodInfo) ([]fwk.NodeInfo, framework.Diagnosis, string, fwk.PodSignature, error) { logger := klog.FromContext(ctx) diagnosis := framework.Diagnosis{ NodeToStatus: framework.NewDefaultNodeToStatus(), } + pod := podInfo.Pod allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List() if err != nil { @@ -957,21 +1040,20 @@ func (h *nodeScoreHeap) Pop() interface{} { } // assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous. -// assume modifies `assumed`. -func (sched *Scheduler) assume(logger klog.Logger, assumed *v1.Pod, host string) error { +func (sched *Scheduler) assume(logger klog.Logger, assumedPodInfo *framework.QueuedPodInfo, host string) error { // Optimistically assume that the binding will succeed and send it to apiserver // in the background. // If the binding fails, scheduler will release resources allocated to assumed pod // immediately. - assumed.Spec.NodeName = host + assumedPodInfo.Pod.Spec.NodeName = host - if err := sched.Cache.AssumePod(logger, assumed); err != nil { + if err := sched.Cache.AssumePod(logger, assumedPodInfo.Pod); err != nil { logger.Error(err, "Scheduler cache AssumePod failed") return err } // if "assumed" is a nominated pod, we should remove it from internal cache if sched.SchedulingQueue != nil { - sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed) + sched.SchedulingQueue.DeleteNominatedPodIfExists(assumedPodInfo.Pod) } return nil @@ -1032,7 +1114,7 @@ func getAttemptsLabel(p *framework.QueuedPodInfo) string { // handleSchedulingFailure records an event for the pod that indicates the // pod has failed to schedule. Also, update the pod condition and nominated node name if set. -func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, status *fwk.Status, nominatingInfo *fwk.NominatingInfo, start time.Time) { +func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, podFwk framework.Framework, podInfo *framework.QueuedPodInfo, status *fwk.Status, nominatingInfo *fwk.NominatingInfo, start time.Time) { calledDone := false defer func() { if !calledDone { @@ -1051,9 +1133,9 @@ func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framewo switch reason { case v1.PodReasonUnschedulable: - metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) + metrics.PodUnschedulable(podFwk.ProfileName(), metrics.SinceInSeconds(start)) case v1.PodReasonSchedulerError: - metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) + metrics.PodScheduleError(podFwk.ProfileName(), metrics.SinceInSeconds(start)) } pod := podInfo.Pod @@ -1077,7 +1159,7 @@ func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framewo } // Check if the Pod exists in informer cache. - podLister := fwk.SharedInformerFactory().Core().V1().Pods().Lister() + podLister := podFwk.SharedInformerFactory().Core().V1().Pods().Lister() cachedPod, e := podLister.Pods(pod.Namespace).Get(pod.Name) if e != nil { logger.Info("Pod doesn't exist in informer cache", "pod", klog.KObj(pod), "err", e) @@ -1114,8 +1196,8 @@ func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framewo } msg := truncateMessage(errMsg) - fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg) - if err := updatePod(ctx, sched.client, fwk.APICacher(), pod, &v1.PodCondition{ + podFwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg) + if err := updatePod(ctx, sched.client, podFwk.APICacher(), pod, &v1.PodCondition{ Type: v1.PodScheduled, ObservedGeneration: podutil.CalculatePodConditionObservedGeneration(&pod.Status, pod.Generation, v1.PodScheduled), Status: v1.ConditionFalse, diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index 0825de23638..ff1f7e56dca 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -1072,7 +1072,7 @@ func TestSchedulerScheduleOne(t *testing.T) { } queue.Add(logger, item.sendPod) - sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state fwk.CycleState, pod *v1.Pod) (ScheduleResult, error) { + sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state fwk.CycleState, podInfo *framework.QueuedPodInfo) (ScheduleResult, error) { return item.mockScheduleResult, item.injectSchedulingError } sched.FailureHandler = func(ctx context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *fwk.Status, ni *fwk.NominatingInfo, start time.Time) { @@ -1647,7 +1647,7 @@ func TestScheduleOneMarksPodAsProcessedBeforePreBind(t *testing.T) { } queue.Add(logger, item.sendPod) - sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state fwk.CycleState, pod *v1.Pod) (ScheduleResult, error) { + sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state fwk.CycleState, podInfo *framework.QueuedPodInfo) (ScheduleResult, error) { return item.mockScheduleResult, item.injectSchedulingError } sched.FailureHandler = func(_ context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *fwk.Status, _ *fwk.NominatingInfo, _ time.Time) { @@ -3542,7 +3542,8 @@ func TestSchedulerSchedulePod(t *testing.T) { informerFactory.Start(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done()) - result, err := sched.SchedulePod(ctx, schedFramework, framework.NewCycleState(), test.pod) + podInfo := queuedPodInfoForPod(test.pod) + result, err := sched.SchedulePod(ctx, schedFramework, framework.NewCycleState(), podInfo) if err != test.wErr { gotFitErr, gotOK := err.(*framework.FitError) wantFitErr, wantOK := test.wErr.(*framework.FitError) @@ -3595,7 +3596,8 @@ func TestFindFitAllError(t *testing.T) { t.Fatal(err) } - _, diagnosis, _, _, err := scheduler.findNodesThatFitPod(ctx, schedFramework, framework.NewCycleState(), &v1.Pod{}) + podInfo := queuedPodInfoForPod(&v1.Pod{}) + _, diagnosis, _, _, err := scheduler.findNodesThatFitPod(ctx, schedFramework, framework.NewCycleState(), podInfo) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -3641,7 +3643,8 @@ func TestFindFitSomeError(t *testing.T) { } pod := st.MakePod().Name("1").UID("1").Obj() - _, diagnosis, _, _, err := scheduler.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), pod) + podInfo := queuedPodInfoForPod(pod) + _, diagnosis, _, _, err := scheduler.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), podInfo) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -3736,7 +3739,8 @@ func TestFindFitPredicateCallCounts(t *testing.T) { } schedFramework.AddNominatedPod(logger, podinfo, &fwk.NominatingInfo{NominatingMode: fwk.ModeOverride, NominatedNodeName: "1"}) - _, _, _, _, err = scheduler.findNodesThatFitPod(ctx, schedFramework, framework.NewCycleState(), test.pod) + podInfo := queuedPodInfoForPod(test.pod) + _, _, _, _, err = scheduler.findNodesThatFitPod(ctx, schedFramework, framework.NewCycleState(), podInfo) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -4259,7 +4263,8 @@ func TestFairEvaluationForNodes(t *testing.T) { // Iterating over all nodes more than twice for i := 0; i < 2*(numAllNodes/nodesToFind+1); i++ { - nodesThatFit, _, _, _, err := sched.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), &v1.Pod{}) + podInfo := queuedPodInfoForPod(&v1.Pod{}) + nodesThatFit, _, _, _, err := sched.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), podInfo) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -4343,7 +4348,8 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) { } sched.applyDefaultHandlers() - _, _, _, _, err = sched.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), test.pod) + podInfo := queuedPodInfoForPod(test.pod) + _, _, _, _, err = sched.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), podInfo) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -4603,3 +4609,11 @@ func pop(queue clientcache.Queue) interface{} { } return obj } + +func queuedPodInfoForPod(pod *v1.Pod) *framework.QueuedPodInfo { + return &framework.QueuedPodInfo{ + PodInfo: &framework.PodInfo{ + Pod: pod, + }, + } +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 80795a756c3..16c7da3802a 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -84,7 +84,7 @@ type Scheduler struct { // SchedulePod tries to schedule the given pod to one of the nodes in the node list. // Return a struct of ScheduleResult with the name of suggested host on success, // otherwise will return a FitError with reasons. - SchedulePod func(ctx context.Context, fwk framework.Framework, state fwk.CycleState, pod *v1.Pod) (ScheduleResult, error) + SchedulePod func(ctx context.Context, fwk framework.Framework, state fwk.CycleState, podInfo *framework.QueuedPodInfo) (ScheduleResult, error) // Close this to shut down the scheduler. StopEverything <-chan struct{} diff --git a/test/integration/scheduler/queueing/queue_test.go b/test/integration/scheduler/queueing/queue_test.go index 11fe79c4640..b11286c502b 100644 --- a/test/integration/scheduler/queueing/queue_test.go +++ b/test/integration/scheduler/queueing/queue_test.go @@ -333,7 +333,7 @@ func TestCustomResourceEnqueue(t *testing.T) { t.Fatalf("Cannot find the profile for Pod %v", podInfo.Pod.Name) } // Schedule the Pod manually. - _, fitError := testCtx.Scheduler.SchedulePod(ctx, schedFramework, framework.NewCycleState(), podInfo.Pod) + _, fitError := testCtx.Scheduler.SchedulePod(ctx, schedFramework, framework.NewCycleState(), podInfo) // The fitError is expected to be non-nil as it failed the fakeCRPlugin plugin. if fitError == nil { t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name)