mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-02-18 18:28:18 -05:00
Refactor ScheduleOne functions for Workload Scheduling Cycle
This commit is contained in:
parent
a7b940cde2
commit
1333aa577a
5 changed files with 186 additions and 90 deletions
|
|
@ -358,8 +358,8 @@ func TestSchedulerWithExtenders(t *testing.T) {
|
|||
}
|
||||
sched.applyDefaultHandlers()
|
||||
|
||||
podIgnored := &v1.Pod{}
|
||||
result, err := sched.SchedulePod(ctx, fwk, framework.NewCycleState(), podIgnored)
|
||||
podInfoIgnored := queuedPodInfoForPod(&v1.Pod{})
|
||||
result, err := sched.SchedulePod(ctx, fwk, framework.NewCycleState(), podInfoIgnored)
|
||||
if test.expectsErr {
|
||||
if err == nil {
|
||||
t.Errorf("Unexpected non-error, result %+v", result)
|
||||
|
|
|
|||
|
|
@ -73,7 +73,12 @@ func (sched *Scheduler) ScheduleOne(ctx context.Context) {
|
|||
if podInfo == nil || podInfo.Pod == nil {
|
||||
return
|
||||
}
|
||||
sched.scheduleOnePod(ctx, podInfo)
|
||||
}
|
||||
|
||||
// scheduleOnePod does the entire scheduling workflow for a single pod.
|
||||
func (sched *Scheduler) scheduleOnePod(ctx context.Context, podInfo *framework.QueuedPodInfo) {
|
||||
logger := klog.FromContext(ctx)
|
||||
pod := podInfo.Pod
|
||||
// TODO(knelasevero): Remove duplicated keys from log entry calls
|
||||
// When contextualized logging hits GA
|
||||
|
|
@ -120,19 +125,29 @@ func (sched *Scheduler) ScheduleOne(ctx context.Context) {
|
|||
}
|
||||
|
||||
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
|
||||
go func() {
|
||||
bindingCycleCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
go sched.runBindingCycle(ctx, state, fwk, scheduleResult, assumedPodInfo, start, podsToActivate)
|
||||
}
|
||||
|
||||
metrics.Goroutines.WithLabelValues(metrics.Binding).Inc()
|
||||
defer metrics.Goroutines.WithLabelValues(metrics.Binding).Dec()
|
||||
// runBindingCycle runs a binding cycle algorithm.
|
||||
func (sched *Scheduler) runBindingCycle(
|
||||
ctx context.Context,
|
||||
state fwk.CycleState,
|
||||
schedFramework framework.Framework,
|
||||
scheduleResult ScheduleResult,
|
||||
assumedPodInfo *framework.QueuedPodInfo,
|
||||
start time.Time,
|
||||
podsToActivate *framework.PodsToActivate) {
|
||||
bindingCycleCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
status := sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, start, podsToActivate)
|
||||
if !status.IsSuccess() {
|
||||
sched.handleBindingCycleError(bindingCycleCtx, state, fwk, assumedPodInfo, start, scheduleResult, status)
|
||||
return
|
||||
}
|
||||
}()
|
||||
metrics.Goroutines.WithLabelValues(metrics.Binding).Inc()
|
||||
defer metrics.Goroutines.WithLabelValues(metrics.Binding).Dec()
|
||||
|
||||
status := sched.bindingCycle(bindingCycleCtx, state, schedFramework, scheduleResult, assumedPodInfo, start, podsToActivate)
|
||||
if !status.IsSuccess() {
|
||||
sched.handleBindingCycleError(bindingCycleCtx, state, schedFramework, assumedPodInfo, start, scheduleResult, status)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
var clearNominatedNode = &fwk.NominatingInfo{NominatingMode: fwk.ModeOverride, NominatedNodeName: ""}
|
||||
|
|
@ -146,22 +161,98 @@ func (sched *Scheduler) schedulingCycle(
|
|||
start time.Time,
|
||||
podsToActivate *framework.PodsToActivate,
|
||||
) (ScheduleResult, *framework.QueuedPodInfo, *fwk.Status) {
|
||||
logger := klog.FromContext(ctx)
|
||||
scheduleResult, status := sched.schedulingAlgorithm(ctx, state, schedFramework, podInfo, start)
|
||||
if !status.IsSuccess() {
|
||||
return scheduleResult, podInfo, status
|
||||
}
|
||||
|
||||
assumedPodInfo, status := sched.prepareForBindingCycle(ctx, state, schedFramework, podInfo, podsToActivate, scheduleResult)
|
||||
if !status.IsSuccess() {
|
||||
return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, status
|
||||
}
|
||||
|
||||
return scheduleResult, assumedPodInfo, nil
|
||||
}
|
||||
|
||||
// prepareForBindingCycle applies the schedule result, generated by the scheduling algorithm, in memory,
|
||||
// preparing for a binding cycle.
|
||||
func (sched *Scheduler) prepareForBindingCycle(
|
||||
ctx context.Context,
|
||||
state fwk.CycleState,
|
||||
schedFramework framework.Framework,
|
||||
podInfo *framework.QueuedPodInfo,
|
||||
podsToActivate *framework.PodsToActivate,
|
||||
scheduleResult ScheduleResult,
|
||||
) (*framework.QueuedPodInfo, *fwk.Status) {
|
||||
assumedPodInfo, status := sched.assumeAndReserve(ctx, state, schedFramework, podInfo, scheduleResult)
|
||||
if !status.IsSuccess() {
|
||||
return assumedPodInfo, status
|
||||
}
|
||||
assumedPod := assumedPodInfo.Pod
|
||||
|
||||
// Run "permit" plugins.
|
||||
runPermitStatus := schedFramework.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
|
||||
if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() {
|
||||
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
||||
err := sched.unreserveAndForget(ctx, state, schedFramework, assumedPodInfo, scheduleResult.SuggestedHost)
|
||||
if err != nil {
|
||||
utilruntime.HandleErrorWithContext(ctx, err, "Unreserve and forget failed")
|
||||
}
|
||||
|
||||
if runPermitStatus.IsRejected() {
|
||||
fitErr := &framework.FitError{
|
||||
NumAllNodes: 1,
|
||||
Pod: podInfo.Pod,
|
||||
Diagnosis: framework.Diagnosis{
|
||||
NodeToStatus: framework.NewDefaultNodeToStatus(),
|
||||
},
|
||||
}
|
||||
fitErr.Diagnosis.NodeToStatus.Set(scheduleResult.SuggestedHost, runPermitStatus)
|
||||
fitErr.Diagnosis.AddPluginStatus(runPermitStatus)
|
||||
return assumedPodInfo, fwk.NewStatus(runPermitStatus.Code()).WithError(fitErr)
|
||||
}
|
||||
|
||||
return assumedPodInfo, runPermitStatus
|
||||
}
|
||||
|
||||
// At the end of a successful scheduling cycle, pop and move up Pods if needed.
|
||||
if len(podsToActivate.Map) != 0 {
|
||||
logger := klog.FromContext(ctx)
|
||||
sched.SchedulingQueue.Activate(logger, podsToActivate.Map)
|
||||
// Clear the entries after activation.
|
||||
podsToActivate.Map = make(map[string]*v1.Pod)
|
||||
}
|
||||
|
||||
return assumedPodInfo, nil
|
||||
}
|
||||
|
||||
// schedulingAlgorithm runs fitering and scoring phases for a single pod,
|
||||
// together with post filter when the pod is unschedulable.
|
||||
func (sched *Scheduler) schedulingAlgorithm(
|
||||
ctx context.Context,
|
||||
state fwk.CycleState,
|
||||
schedFramework framework.Framework,
|
||||
podInfo *framework.QueuedPodInfo,
|
||||
start time.Time,
|
||||
) (ScheduleResult, *fwk.Status) {
|
||||
defer func() {
|
||||
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
|
||||
}()
|
||||
|
||||
pod := podInfo.Pod
|
||||
scheduleResult, err := sched.SchedulePod(ctx, schedFramework, state, pod)
|
||||
|
||||
logger := klog.FromContext(ctx)
|
||||
scheduleResult, err := sched.SchedulePod(ctx, schedFramework, state, podInfo)
|
||||
if err != nil {
|
||||
defer func() {
|
||||
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
|
||||
}()
|
||||
if err == ErrNoNodesAvailable {
|
||||
status := fwk.NewStatus(fwk.UnschedulableAndUnresolvable).WithError(err)
|
||||
return ScheduleResult{nominatingInfo: clearNominatedNode}, podInfo, status
|
||||
return ScheduleResult{nominatingInfo: clearNominatedNode}, status
|
||||
}
|
||||
|
||||
fitError, ok := err.(*framework.FitError)
|
||||
if !ok {
|
||||
logger.Error(err, "Error selecting node for pod", "pod", klog.KObj(pod))
|
||||
return ScheduleResult{nominatingInfo: clearNominatedNode}, podInfo, fwk.AsStatus(err)
|
||||
return ScheduleResult{nominatingInfo: clearNominatedNode}, fwk.AsStatus(err)
|
||||
}
|
||||
|
||||
// SchedulePod() may have failed because the pod would not fit on any host, so we try to
|
||||
|
|
@ -171,7 +262,7 @@ func (sched *Scheduler) schedulingCycle(
|
|||
|
||||
if !schedFramework.HasPostFilterPlugins() {
|
||||
logger.V(3).Info("No PostFilter plugins are registered, so no preemption will be performed")
|
||||
return ScheduleResult{nominatingInfo: clearNominatedNode}, podInfo, fwk.NewStatus(fwk.Unschedulable).WithError(err)
|
||||
return ScheduleResult{nominatingInfo: clearNominatedNode}, fwk.NewStatus(fwk.Unschedulable).WithError(err)
|
||||
}
|
||||
|
||||
// Run PostFilter plugins to attempt to make the pod schedulable in a future scheduling cycle.
|
||||
|
|
@ -188,81 +279,72 @@ func (sched *Scheduler) schedulingCycle(
|
|||
if result != nil {
|
||||
nominatingInfo = result.NominatingInfo
|
||||
}
|
||||
return ScheduleResult{nominatingInfo: nominatingInfo}, podInfo, fwk.NewStatus(fwk.Unschedulable).WithError(err)
|
||||
return ScheduleResult{nominatingInfo: nominatingInfo}, fwk.NewStatus(fwk.Unschedulable).WithError(err)
|
||||
}
|
||||
return scheduleResult, nil
|
||||
}
|
||||
|
||||
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
|
||||
// assumeAndReserve assumes and reserves the pod in scheduler's memory.
|
||||
func (sched *Scheduler) assumeAndReserve(
|
||||
ctx context.Context,
|
||||
state fwk.CycleState,
|
||||
schedFramework framework.Framework,
|
||||
podInfo *framework.QueuedPodInfo,
|
||||
scheduleResult ScheduleResult,
|
||||
) (*framework.QueuedPodInfo, *fwk.Status) {
|
||||
logger := klog.FromContext(ctx)
|
||||
// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
|
||||
// This allows us to keep scheduling without waiting on binding to occur.
|
||||
assumedPodInfo := podInfo.DeepCopy()
|
||||
assumedPod := assumedPodInfo.Pod
|
||||
// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
|
||||
err = sched.assume(logger, assumedPod, scheduleResult.SuggestedHost)
|
||||
err := sched.assume(logger, assumedPodInfo, scheduleResult.SuggestedHost)
|
||||
if err != nil {
|
||||
// This is most probably result of a BUG in retrying logic.
|
||||
// We report an error here so that pod scheduling can be retried.
|
||||
// This relies on the fact that Error will check if the pod has been bound
|
||||
// to a node and if so will not add it back to the unscheduled pods queue
|
||||
// (otherwise this would cause an infinite loop).
|
||||
return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, fwk.AsStatus(err)
|
||||
return assumedPodInfo, fwk.AsStatus(err)
|
||||
}
|
||||
|
||||
// Run the Reserve method of reserve plugins.
|
||||
if sts := schedFramework.RunReservePluginsReserve(ctx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
|
||||
// trigger un-reserve to clean up state associated with the reserved Pod
|
||||
schedFramework.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
|
||||
if forgetErr := sched.Cache.ForgetPod(logger, assumedPod); forgetErr != nil {
|
||||
utilruntime.HandleErrorWithContext(ctx, forgetErr, "Scheduler cache ForgetPod failed")
|
||||
err := sched.unreserveAndForget(ctx, state, schedFramework, assumedPodInfo, scheduleResult.SuggestedHost)
|
||||
if err != nil {
|
||||
utilruntime.HandleErrorWithContext(ctx, err, "Unreserve and forget failed")
|
||||
}
|
||||
|
||||
if sts.IsRejected() {
|
||||
fitErr := &framework.FitError{
|
||||
NumAllNodes: 1,
|
||||
Pod: pod,
|
||||
Pod: podInfo.Pod,
|
||||
Diagnosis: framework.Diagnosis{
|
||||
NodeToStatus: framework.NewDefaultNodeToStatus(),
|
||||
},
|
||||
}
|
||||
fitErr.Diagnosis.NodeToStatus.Set(scheduleResult.SuggestedHost, sts)
|
||||
fitErr.Diagnosis.AddPluginStatus(sts)
|
||||
return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, fwk.NewStatus(sts.Code()).WithError(fitErr)
|
||||
return assumedPodInfo, fwk.NewStatus(sts.Code()).WithError(fitErr)
|
||||
}
|
||||
return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, sts
|
||||
return assumedPodInfo, sts
|
||||
}
|
||||
return assumedPodInfo, nil
|
||||
}
|
||||
|
||||
// Run "permit" plugins.
|
||||
runPermitStatus := schedFramework.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
|
||||
if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() {
|
||||
// trigger un-reserve to clean up state associated with the reserved Pod
|
||||
schedFramework.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
|
||||
if forgetErr := sched.Cache.ForgetPod(logger, assumedPod); forgetErr != nil {
|
||||
utilruntime.HandleErrorWithContext(ctx, forgetErr, "Scheduler cache ForgetPod failed")
|
||||
}
|
||||
// unreserveAndForget unreserves and forgets the pod from scheduler's memory.
|
||||
func (sched *Scheduler) unreserveAndForget(
|
||||
ctx context.Context,
|
||||
state fwk.CycleState,
|
||||
schedFramework framework.Framework,
|
||||
assumedPodInfo *framework.QueuedPodInfo,
|
||||
nodeName string,
|
||||
) error {
|
||||
logger := klog.FromContext(ctx)
|
||||
|
||||
if runPermitStatus.IsRejected() {
|
||||
fitErr := &framework.FitError{
|
||||
NumAllNodes: 1,
|
||||
Pod: pod,
|
||||
Diagnosis: framework.Diagnosis{
|
||||
NodeToStatus: framework.NewDefaultNodeToStatus(),
|
||||
},
|
||||
}
|
||||
fitErr.Diagnosis.NodeToStatus.Set(scheduleResult.SuggestedHost, runPermitStatus)
|
||||
fitErr.Diagnosis.AddPluginStatus(runPermitStatus)
|
||||
return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, fwk.NewStatus(runPermitStatus.Code()).WithError(fitErr)
|
||||
}
|
||||
|
||||
return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, runPermitStatus
|
||||
}
|
||||
|
||||
// At the end of a successful scheduling cycle, pop and move up Pods if needed.
|
||||
if len(podsToActivate.Map) != 0 {
|
||||
sched.SchedulingQueue.Activate(logger, podsToActivate.Map)
|
||||
// Clear the entries after activation.
|
||||
podsToActivate.Map = make(map[string]*v1.Pod)
|
||||
}
|
||||
|
||||
return scheduleResult, assumedPodInfo, nil
|
||||
schedFramework.RunReservePluginsUnreserve(ctx, state, assumedPodInfo.Pod, nodeName)
|
||||
return sched.Cache.ForgetPod(logger, assumedPodInfo.Pod)
|
||||
}
|
||||
|
||||
// bindingCycle tries to bind an assumed Pod.
|
||||
|
|
@ -370,9 +452,8 @@ func (sched *Scheduler) handleBindingCycleError(
|
|||
|
||||
assumedPod := podInfo.Pod
|
||||
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
||||
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
|
||||
if forgetErr := sched.Cache.ForgetPod(logger, assumedPod); forgetErr != nil {
|
||||
utilruntime.HandleErrorWithContext(ctx, forgetErr, "scheduler cache ForgetPod failed")
|
||||
if forgetErr := sched.unreserveAndForget(ctx, state, fwk, podInfo, scheduleResult.SuggestedHost); forgetErr != nil {
|
||||
utilruntime.HandleErrorWithContext(ctx, forgetErr, "Unreserve and forget failed")
|
||||
} else {
|
||||
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
|
||||
// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
|
||||
|
|
@ -423,7 +504,8 @@ func (sched *Scheduler) skipPodSchedule(ctx context.Context, fwk framework.Frame
|
|||
// schedulePod tries to schedule the given pod to one of the nodes in the node list.
|
||||
// If it succeeds, it will return the name of the node.
|
||||
// If it fails, it will return a FitError with reasons.
|
||||
func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state fwk.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
|
||||
func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state fwk.CycleState, podInfo *framework.QueuedPodInfo) (result ScheduleResult, err error) {
|
||||
pod := podInfo.Pod
|
||||
trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
|
||||
defer trace.LogIfLong(100 * time.Millisecond)
|
||||
if err := sched.Cache.UpdateSnapshot(klog.FromContext(ctx), sched.nodeInfoSnapshot); err != nil {
|
||||
|
|
@ -435,7 +517,7 @@ func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework
|
|||
return result, ErrNoNodesAvailable
|
||||
}
|
||||
|
||||
feasibleNodes, diagnosis, nodeHint, signature, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
|
||||
feasibleNodes, diagnosis, nodeHint, signature, err := sched.findNodesThatFitPod(ctx, fwk, state, podInfo)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
|
|
@ -484,11 +566,12 @@ func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework
|
|||
|
||||
// Filters the nodes to find the ones that fit the pod based on the framework
|
||||
// filter plugins and filter extenders.
|
||||
func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, schedFramework framework.Framework, state fwk.CycleState, pod *v1.Pod) ([]fwk.NodeInfo, framework.Diagnosis, string, fwk.PodSignature, error) {
|
||||
func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, schedFramework framework.Framework, state fwk.CycleState, podInfo *framework.QueuedPodInfo) ([]fwk.NodeInfo, framework.Diagnosis, string, fwk.PodSignature, error) {
|
||||
logger := klog.FromContext(ctx)
|
||||
diagnosis := framework.Diagnosis{
|
||||
NodeToStatus: framework.NewDefaultNodeToStatus(),
|
||||
}
|
||||
pod := podInfo.Pod
|
||||
|
||||
allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List()
|
||||
if err != nil {
|
||||
|
|
@ -957,21 +1040,20 @@ func (h *nodeScoreHeap) Pop() interface{} {
|
|||
}
|
||||
|
||||
// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
|
||||
// assume modifies `assumed`.
|
||||
func (sched *Scheduler) assume(logger klog.Logger, assumed *v1.Pod, host string) error {
|
||||
func (sched *Scheduler) assume(logger klog.Logger, assumedPodInfo *framework.QueuedPodInfo, host string) error {
|
||||
// Optimistically assume that the binding will succeed and send it to apiserver
|
||||
// in the background.
|
||||
// If the binding fails, scheduler will release resources allocated to assumed pod
|
||||
// immediately.
|
||||
assumed.Spec.NodeName = host
|
||||
assumedPodInfo.Pod.Spec.NodeName = host
|
||||
|
||||
if err := sched.Cache.AssumePod(logger, assumed); err != nil {
|
||||
if err := sched.Cache.AssumePod(logger, assumedPodInfo.Pod); err != nil {
|
||||
logger.Error(err, "Scheduler cache AssumePod failed")
|
||||
return err
|
||||
}
|
||||
// if "assumed" is a nominated pod, we should remove it from internal cache
|
||||
if sched.SchedulingQueue != nil {
|
||||
sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
|
||||
sched.SchedulingQueue.DeleteNominatedPodIfExists(assumedPodInfo.Pod)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
@ -1032,7 +1114,7 @@ func getAttemptsLabel(p *framework.QueuedPodInfo) string {
|
|||
|
||||
// handleSchedulingFailure records an event for the pod that indicates the
|
||||
// pod has failed to schedule. Also, update the pod condition and nominated node name if set.
|
||||
func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, status *fwk.Status, nominatingInfo *fwk.NominatingInfo, start time.Time) {
|
||||
func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, podFwk framework.Framework, podInfo *framework.QueuedPodInfo, status *fwk.Status, nominatingInfo *fwk.NominatingInfo, start time.Time) {
|
||||
calledDone := false
|
||||
defer func() {
|
||||
if !calledDone {
|
||||
|
|
@ -1051,9 +1133,9 @@ func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framewo
|
|||
|
||||
switch reason {
|
||||
case v1.PodReasonUnschedulable:
|
||||
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
||||
metrics.PodUnschedulable(podFwk.ProfileName(), metrics.SinceInSeconds(start))
|
||||
case v1.PodReasonSchedulerError:
|
||||
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
||||
metrics.PodScheduleError(podFwk.ProfileName(), metrics.SinceInSeconds(start))
|
||||
}
|
||||
|
||||
pod := podInfo.Pod
|
||||
|
|
@ -1077,7 +1159,7 @@ func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framewo
|
|||
}
|
||||
|
||||
// Check if the Pod exists in informer cache.
|
||||
podLister := fwk.SharedInformerFactory().Core().V1().Pods().Lister()
|
||||
podLister := podFwk.SharedInformerFactory().Core().V1().Pods().Lister()
|
||||
cachedPod, e := podLister.Pods(pod.Namespace).Get(pod.Name)
|
||||
if e != nil {
|
||||
logger.Info("Pod doesn't exist in informer cache", "pod", klog.KObj(pod), "err", e)
|
||||
|
|
@ -1114,8 +1196,8 @@ func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framewo
|
|||
}
|
||||
|
||||
msg := truncateMessage(errMsg)
|
||||
fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
|
||||
if err := updatePod(ctx, sched.client, fwk.APICacher(), pod, &v1.PodCondition{
|
||||
podFwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
|
||||
if err := updatePod(ctx, sched.client, podFwk.APICacher(), pod, &v1.PodCondition{
|
||||
Type: v1.PodScheduled,
|
||||
ObservedGeneration: podutil.CalculatePodConditionObservedGeneration(&pod.Status, pod.Generation, v1.PodScheduled),
|
||||
Status: v1.ConditionFalse,
|
||||
|
|
|
|||
|
|
@ -1072,7 +1072,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
|
|||
}
|
||||
queue.Add(logger, item.sendPod)
|
||||
|
||||
sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state fwk.CycleState, pod *v1.Pod) (ScheduleResult, error) {
|
||||
sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state fwk.CycleState, podInfo *framework.QueuedPodInfo) (ScheduleResult, error) {
|
||||
return item.mockScheduleResult, item.injectSchedulingError
|
||||
}
|
||||
sched.FailureHandler = func(ctx context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *fwk.Status, ni *fwk.NominatingInfo, start time.Time) {
|
||||
|
|
@ -1647,7 +1647,7 @@ func TestScheduleOneMarksPodAsProcessedBeforePreBind(t *testing.T) {
|
|||
}
|
||||
queue.Add(logger, item.sendPod)
|
||||
|
||||
sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state fwk.CycleState, pod *v1.Pod) (ScheduleResult, error) {
|
||||
sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state fwk.CycleState, podInfo *framework.QueuedPodInfo) (ScheduleResult, error) {
|
||||
return item.mockScheduleResult, item.injectSchedulingError
|
||||
}
|
||||
sched.FailureHandler = func(_ context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *fwk.Status, _ *fwk.NominatingInfo, _ time.Time) {
|
||||
|
|
@ -3542,7 +3542,8 @@ func TestSchedulerSchedulePod(t *testing.T) {
|
|||
informerFactory.Start(ctx.Done())
|
||||
informerFactory.WaitForCacheSync(ctx.Done())
|
||||
|
||||
result, err := sched.SchedulePod(ctx, schedFramework, framework.NewCycleState(), test.pod)
|
||||
podInfo := queuedPodInfoForPod(test.pod)
|
||||
result, err := sched.SchedulePod(ctx, schedFramework, framework.NewCycleState(), podInfo)
|
||||
if err != test.wErr {
|
||||
gotFitErr, gotOK := err.(*framework.FitError)
|
||||
wantFitErr, wantOK := test.wErr.(*framework.FitError)
|
||||
|
|
@ -3595,7 +3596,8 @@ func TestFindFitAllError(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, diagnosis, _, _, err := scheduler.findNodesThatFitPod(ctx, schedFramework, framework.NewCycleState(), &v1.Pod{})
|
||||
podInfo := queuedPodInfoForPod(&v1.Pod{})
|
||||
_, diagnosis, _, _, err := scheduler.findNodesThatFitPod(ctx, schedFramework, framework.NewCycleState(), podInfo)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
|
@ -3641,7 +3643,8 @@ func TestFindFitSomeError(t *testing.T) {
|
|||
}
|
||||
|
||||
pod := st.MakePod().Name("1").UID("1").Obj()
|
||||
_, diagnosis, _, _, err := scheduler.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), pod)
|
||||
podInfo := queuedPodInfoForPod(pod)
|
||||
_, diagnosis, _, _, err := scheduler.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), podInfo)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
|
@ -3736,7 +3739,8 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
|
|||
}
|
||||
schedFramework.AddNominatedPod(logger, podinfo, &fwk.NominatingInfo{NominatingMode: fwk.ModeOverride, NominatedNodeName: "1"})
|
||||
|
||||
_, _, _, _, err = scheduler.findNodesThatFitPod(ctx, schedFramework, framework.NewCycleState(), test.pod)
|
||||
podInfo := queuedPodInfoForPod(test.pod)
|
||||
_, _, _, _, err = scheduler.findNodesThatFitPod(ctx, schedFramework, framework.NewCycleState(), podInfo)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
|
@ -4259,7 +4263,8 @@ func TestFairEvaluationForNodes(t *testing.T) {
|
|||
|
||||
// Iterating over all nodes more than twice
|
||||
for i := 0; i < 2*(numAllNodes/nodesToFind+1); i++ {
|
||||
nodesThatFit, _, _, _, err := sched.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), &v1.Pod{})
|
||||
podInfo := queuedPodInfoForPod(&v1.Pod{})
|
||||
nodesThatFit, _, _, _, err := sched.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), podInfo)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
|
@ -4343,7 +4348,8 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) {
|
|||
}
|
||||
sched.applyDefaultHandlers()
|
||||
|
||||
_, _, _, _, err = sched.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), test.pod)
|
||||
podInfo := queuedPodInfoForPod(test.pod)
|
||||
_, _, _, _, err = sched.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), podInfo)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
|
@ -4603,3 +4609,11 @@ func pop(queue clientcache.Queue) interface{} {
|
|||
}
|
||||
return obj
|
||||
}
|
||||
|
||||
func queuedPodInfoForPod(pod *v1.Pod) *framework.QueuedPodInfo {
|
||||
return &framework.QueuedPodInfo{
|
||||
PodInfo: &framework.PodInfo{
|
||||
Pod: pod,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -84,7 +84,7 @@ type Scheduler struct {
|
|||
// SchedulePod tries to schedule the given pod to one of the nodes in the node list.
|
||||
// Return a struct of ScheduleResult with the name of suggested host on success,
|
||||
// otherwise will return a FitError with reasons.
|
||||
SchedulePod func(ctx context.Context, fwk framework.Framework, state fwk.CycleState, pod *v1.Pod) (ScheduleResult, error)
|
||||
SchedulePod func(ctx context.Context, fwk framework.Framework, state fwk.CycleState, podInfo *framework.QueuedPodInfo) (ScheduleResult, error)
|
||||
|
||||
// Close this to shut down the scheduler.
|
||||
StopEverything <-chan struct{}
|
||||
|
|
|
|||
|
|
@ -333,7 +333,7 @@ func TestCustomResourceEnqueue(t *testing.T) {
|
|||
t.Fatalf("Cannot find the profile for Pod %v", podInfo.Pod.Name)
|
||||
}
|
||||
// Schedule the Pod manually.
|
||||
_, fitError := testCtx.Scheduler.SchedulePod(ctx, schedFramework, framework.NewCycleState(), podInfo.Pod)
|
||||
_, fitError := testCtx.Scheduler.SchedulePod(ctx, schedFramework, framework.NewCycleState(), podInfo)
|
||||
// The fitError is expected to be non-nil as it failed the fakeCRPlugin plugin.
|
||||
if fitError == nil {
|
||||
t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name)
|
||||
|
|
|
|||
Loading…
Reference in a new issue