mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-05-28 04:04:39 -04:00
Merge pull request #134157 from macsko/forget_pod_immediately_when_it_is_deleted
Forget pod from scheduler's cache immediately when it's deleted
This commit is contained in:
commit
412bfec7a1
7 changed files with 818 additions and 156 deletions
9
pkg/scheduler/backend/cache/cache.go
vendored
9
pkg/scheduler/backend/cache/cache.go
vendored
|
|
@ -419,12 +419,15 @@ func (cache *cacheImpl) ForgetPod(logger klog.Logger, pod *v1.Pod) error {
|
|||
defer cache.mu.Unlock()
|
||||
|
||||
currState, ok := cache.podStates[key]
|
||||
if ok && currState.pod.Spec.NodeName != pod.Spec.NodeName {
|
||||
if !ok {
|
||||
// Pod does not exist in the cache anymore.
|
||||
return nil
|
||||
}
|
||||
if currState.pod.Spec.NodeName != pod.Spec.NodeName {
|
||||
return fmt.Errorf("pod %v(%v) was assumed on %v but assigned to %v", key, klog.KObj(pod), pod.Spec.NodeName, currState.pod.Spec.NodeName)
|
||||
}
|
||||
|
||||
// Only assumed pod can be forgotten.
|
||||
if ok && cache.assumedPods.Has(key) {
|
||||
if cache.assumedPods.Has(key) {
|
||||
return cache.removePod(logger, pod)
|
||||
}
|
||||
return fmt.Errorf("pod %v(%v) wasn't assumed so cannot be forgotten", key, klog.KObj(pod))
|
||||
|
|
|
|||
6
pkg/scheduler/backend/cache/cache_test.go
vendored
6
pkg/scheduler/backend/cache/cache_test.go
vendored
|
|
@ -1043,9 +1043,9 @@ func TestForgetPod(t *testing.T) {
|
|||
if err := isForgottenFromCache(pod, cache); err != nil {
|
||||
t.Errorf("pod %q: %v", pod.Name, err)
|
||||
}
|
||||
// trying to forget a pod already forgotten should return an error
|
||||
if err := cache.ForgetPod(logger, pod); err == nil {
|
||||
t.Error("expected error, no error found")
|
||||
// trying to forget a pod already forgotten should return nil
|
||||
if err := cache.ForgetPod(logger, pod); err != nil {
|
||||
t.Error("expected no error, error found")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -126,12 +126,93 @@ func (sched *Scheduler) deleteNodeFromCache(obj interface{}) {
|
|||
}
|
||||
}
|
||||
|
||||
func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
|
||||
func (sched *Scheduler) addPod(obj interface{}) {
|
||||
logger := sched.logger
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
utilruntime.HandleErrorWithLogger(logger, nil, "Cannot convert to *v1.Pod", "obj", obj)
|
||||
return
|
||||
}
|
||||
|
||||
if assignedPod(pod) {
|
||||
sched.addAssignedPodToCache(pod)
|
||||
} else if responsibleForPod(pod, sched.Profiles) {
|
||||
sched.addPodToSchedulingQueue(pod)
|
||||
}
|
||||
}
|
||||
|
||||
func (sched *Scheduler) updatePod(oldObj, newObj interface{}) {
|
||||
logger := sched.logger
|
||||
oldPod, ok := oldObj.(*v1.Pod)
|
||||
if !ok {
|
||||
utilruntime.HandleErrorWithLogger(logger, nil, "Cannot convert oldObj to *v1.Pod", "oldObj", oldObj)
|
||||
return
|
||||
}
|
||||
newPod, ok := newObj.(*v1.Pod)
|
||||
if !ok {
|
||||
utilruntime.HandleErrorWithLogger(logger, nil, "Cannot convert newObj to *v1.Pod", "newObj", newObj)
|
||||
return
|
||||
}
|
||||
|
||||
if assignedPod(oldPod) {
|
||||
sched.updateAssignedPodInCache(oldPod, newPod)
|
||||
} else if assignedPod(newPod) {
|
||||
// This update means binding operation. We can treat it as adding the pod to a cache
|
||||
// (addition to the cache will handle this binding appropriately).
|
||||
sched.addAssignedPodToCache(newPod)
|
||||
if responsibleForPod(oldPod, sched.Profiles) {
|
||||
// Pod shouldn't be in the scheduling queue, but in unlikely event that the pod has been bound
|
||||
// by another component, it should be removed from scheduling queue for correctness.
|
||||
// Passing "true" means that removal from the scheduling queue is caused by a binding event,
|
||||
// not by removal of the pod from the cluster.
|
||||
sched.deletePodFromSchedulingQueue(oldPod, true)
|
||||
}
|
||||
} else if responsibleForPod(oldPod, sched.Profiles) {
|
||||
sched.updatePodInSchedulingQueue(oldPod, newPod)
|
||||
}
|
||||
}
|
||||
|
||||
func (sched *Scheduler) deletePod(obj interface{}) {
|
||||
logger := sched.logger
|
||||
var pod *v1.Pod
|
||||
switch t := obj.(type) {
|
||||
case *v1.Pod:
|
||||
pod = t
|
||||
if assignedPod(pod) {
|
||||
sched.deleteAssignedPodFromCache(pod)
|
||||
} else if responsibleForPod(pod, sched.Profiles) {
|
||||
// Passing "false" means that removal from the scheduling queue is caused by
|
||||
// removal of the pod from the cluster, not by a binding event.
|
||||
sched.deletePodFromSchedulingQueue(pod, false)
|
||||
}
|
||||
return
|
||||
case cache.DeletedFinalStateUnknown:
|
||||
var ok bool
|
||||
pod, ok = t.Obj.(*v1.Pod)
|
||||
if !ok {
|
||||
utilruntime.HandleErrorWithLogger(logger, nil, "Cannot convert to *v1.Pod", "obj", t.Obj)
|
||||
return
|
||||
}
|
||||
// The carried object may be stale, so we don't use it to check if
|
||||
// it's assigned or not. Attempting to cleanup anyways.
|
||||
sched.deleteAssignedPodFromCache(pod)
|
||||
if responsibleForPod(pod, sched.Profiles) {
|
||||
// Passing "false" means that removal from the scheduling queue is caused by
|
||||
// removal of the pod from the cluster, not by a binding event.
|
||||
sched.deletePodFromSchedulingQueue(pod, false)
|
||||
}
|
||||
return
|
||||
default:
|
||||
utilruntime.HandleErrorWithLogger(logger, nil, "Unable to handle object", "objType", fmt.Sprintf("%T", obj), "obj", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (sched *Scheduler) addPodToSchedulingQueue(pod *v1.Pod) {
|
||||
start := time.Now()
|
||||
defer metrics.EventHandlingLatency.WithLabelValues(framework.EventUnscheduledPodAdd.Label()).Observe(metrics.SinceInSeconds(start))
|
||||
|
||||
logger := sched.logger
|
||||
pod := obj.(*v1.Pod)
|
||||
logger.V(3).Info("Add event for unscheduled pod", "pod", klog.KObj(pod))
|
||||
sched.SchedulingQueue.Add(logger, pod)
|
||||
}
|
||||
|
|
@ -150,10 +231,52 @@ func (sched *Scheduler) syncPodWithDispatcher(pod *v1.Pod) *v1.Pod {
|
|||
return enrichedPod
|
||||
}
|
||||
|
||||
func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
|
||||
// handleAssumedPodDeletion is an event handler that deals with the deletion of an assumed pod.
|
||||
// We must remove it from the scheduler's cache immediately to prevent it from blocking resources for other pending pods,
|
||||
// causing unnecessary preemption attempts. Note that PreBinding/Binding will continue, but is eventually expected to fail
|
||||
// as the pod does not exist in the kube-apiserver anymore and so in the scheduler cache.
|
||||
func (sched *Scheduler) handleAssumedPodDeletion(pod *v1.Pod) {
|
||||
logger := sched.logger
|
||||
// We must operate on the pod from the scheduler's cache, not the one from the event.
|
||||
// The cached version has the assigned NodeName and represents the resources being consumed.
|
||||
assumedPod, err := sched.Cache.GetPod(pod)
|
||||
if err != nil {
|
||||
// This is not an error. The pod may have already completed its binding cycle and been
|
||||
// removed from the cache. Nothing more to do.
|
||||
logger.V(5).Info("Assumed pod was already forgotten", "pod", klog.KObj(pod))
|
||||
return
|
||||
}
|
||||
pod = assumedPod
|
||||
|
||||
fwk, err := sched.frameworkForPod(pod)
|
||||
if err != nil {
|
||||
// This shouldn't happen, because we only accept for scheduling the pods
|
||||
// which specify a scheduler name that matches one of the profiles.
|
||||
utilruntime.HandleErrorWithLogger(logger, err, "Unable to get profile for pod", "pod", klog.KObj(pod))
|
||||
return
|
||||
}
|
||||
|
||||
// The pod might be in one of two states:
|
||||
// 1. If the pod is waiting on WaitOnPermit, we reject it. This causes the pod's scheduling
|
||||
// cycle to quickly fail gracefully, and it will clean itself up via `handleBindingCycleError`.
|
||||
if !fwk.RejectWaitingPod(pod.UID) {
|
||||
// 2. If the pod is no longer waiting (e.g., it's in PreBind or Bind), we can't quickly reject it.
|
||||
// We must explicitly remove it from the cache here to free up its assumed resources.
|
||||
if err := sched.Cache.ForgetPod(logger, pod); err != nil {
|
||||
utilruntime.HandleErrorWithLogger(logger, err, "Scheduler cache ForgetPod failed", "pod", klog.KObj(pod))
|
||||
}
|
||||
}
|
||||
|
||||
// The removal of this assumed pod may have freed up resources. We trigger the AssignedPodDelete event
|
||||
// to move other unscheduled pods, giving them a chance to be scheduled.
|
||||
// If the forgotten pod reserved some resources in memory,
|
||||
// it will wake up the pods again after freeing up the resources in `handleBindingCycleError`.
|
||||
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.EventAssignedPodDelete, pod, nil, nil)
|
||||
}
|
||||
|
||||
func (sched *Scheduler) updatePodInSchedulingQueue(oldPod, newPod *v1.Pod) {
|
||||
start := time.Now()
|
||||
logger := sched.logger
|
||||
oldPod, newPod := oldObj.(*v1.Pod), newObj.(*v1.Pod)
|
||||
// Bypass update event that carries identical objects; otherwise, a duplicated
|
||||
// Pod may go through scheduling and cause unexpected behavior (see #96071).
|
||||
if oldPod.ResourceVersion == newPod.ResourceVersion {
|
||||
|
|
@ -178,6 +301,11 @@ func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
|
|||
utilruntime.HandleErrorWithLogger(logger, err, "Failed to check whether pod is assumed", "pod", klog.KObj(newPod))
|
||||
}
|
||||
if isAssumed {
|
||||
if newPod.DeletionTimestamp != nil && oldPod.DeletionTimestamp == nil {
|
||||
// Assumed pod deletion has started. We should handle that differently,
|
||||
// because we can't update such pod in any structure directly.
|
||||
sched.handleAssumedPodDeletion(newPod)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -195,45 +323,33 @@ func hasNominatedNodeNameChanged(oldPod, newPod *v1.Pod) bool {
|
|||
return len(oldPod.Status.NominatedNodeName) > 0 && oldPod.Status.NominatedNodeName != newPod.Status.NominatedNodeName
|
||||
}
|
||||
|
||||
func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) {
|
||||
func (sched *Scheduler) deletePodFromSchedulingQueue(pod *v1.Pod, inBinding bool) {
|
||||
start := time.Now()
|
||||
defer metrics.EventHandlingLatency.WithLabelValues(framework.EventUnscheduledPodDelete.Label()).Observe(metrics.SinceInSeconds(start))
|
||||
|
||||
logger := sched.logger
|
||||
var pod *v1.Pod
|
||||
switch t := obj.(type) {
|
||||
case *v1.Pod:
|
||||
pod = obj.(*v1.Pod)
|
||||
case cache.DeletedFinalStateUnknown:
|
||||
var ok bool
|
||||
pod, ok = t.Obj.(*v1.Pod)
|
||||
if !ok {
|
||||
utilruntime.HandleErrorWithLogger(logger, nil, "Cannot convert to *v1.Pod", "obj", t.Obj)
|
||||
return
|
||||
}
|
||||
default:
|
||||
utilruntime.HandleErrorWithLogger(logger, nil, "Unable to handle object", "objType", fmt.Sprintf("%T", obj), "obj", obj)
|
||||
return
|
||||
}
|
||||
|
||||
logger.V(3).Info("Delete event for unscheduled pod", "pod", klog.KObj(pod))
|
||||
sched.SchedulingQueue.Delete(pod)
|
||||
fwk, err := sched.frameworkForPod(pod)
|
||||
if err != nil {
|
||||
// This shouldn't happen, because we only accept for scheduling the pods
|
||||
// which specify a scheduler name that matches one of the profiles.
|
||||
utilruntime.HandleErrorWithLogger(logger, err, "Unable to get profile", "pod", klog.KObj(pod))
|
||||
if inBinding {
|
||||
// In the case of a binding, the rest can be skipped because it is not really a pod removal operation, but a binding.
|
||||
// Any necessary notifications will be sent by the binding process, unless it was an unlikely external binding.
|
||||
// In that case, we need to notify about the release of resources that were held by different assume/nomination
|
||||
// once the https://github.com/kubernetes/kubernetes/issues/134859 is fixed.
|
||||
return
|
||||
}
|
||||
// If a waiting pod is rejected, it indicates it's previously assumed and we're
|
||||
// removing it from the scheduler cache. In this case, signal a AssignedPodDelete
|
||||
// event to immediately retry some unscheduled Pods.
|
||||
// Similarly when a pod that had nominated node is deleted, it can unblock scheduling of other pods,
|
||||
// because the lower or equal priority pods treat such a pod as if it was assigned.
|
||||
if fwk.RejectWaitingPod(pod.UID) {
|
||||
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.EventAssignedPodDelete, pod, nil, nil)
|
||||
isAssumed, err := sched.Cache.IsAssumedPod(pod)
|
||||
if err != nil {
|
||||
utilruntime.HandleErrorWithLogger(logger, err, "Failed to check whether pod is assumed", "pod", klog.KObj(pod))
|
||||
}
|
||||
if isAssumed {
|
||||
// Assumed pod is deleted. We should handle that differently,
|
||||
// because we can't delete such pod from any structure directly.
|
||||
sched.handleAssumedPodDeletion(pod)
|
||||
} else if pod.Status.NominatedNodeName != "" {
|
||||
// Note that a nominated pod can fall into `RejectWaitingPod` case as well,
|
||||
// When a pod that had nominated node is deleted, it can unblock scheduling of other pods,
|
||||
// because the lower or equal priority pods treat such a pod as if it was assigned.
|
||||
// Note that a nominated pod can fall into `handleAssumedPodDeletion` case as well,
|
||||
// but in that case the `MoveAllToActiveOrBackoffQueue` already covered lower priority pods.
|
||||
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.EventAssignedPodDelete, pod, nil, getLEPriorityPreCheck(corev1helpers.PodPriority(pod)))
|
||||
}
|
||||
|
|
@ -246,16 +362,11 @@ func getLEPriorityPreCheck(priority int32) queue.PreEnqueueCheck {
|
|||
}
|
||||
}
|
||||
|
||||
func (sched *Scheduler) addPodToCache(obj interface{}) {
|
||||
func (sched *Scheduler) addAssignedPodToCache(pod *v1.Pod) {
|
||||
start := time.Now()
|
||||
defer metrics.EventHandlingLatency.WithLabelValues(framework.EventAssignedPodAdd.Label()).Observe(metrics.SinceInSeconds(start))
|
||||
|
||||
logger := sched.logger
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
utilruntime.HandleErrorWithLogger(logger, nil, "Cannot convert to *v1.Pod", "obj", obj)
|
||||
return
|
||||
}
|
||||
|
||||
logger.V(3).Info("Add event for scheduled pod", "pod", klog.KObj(pod))
|
||||
if err := sched.Cache.AddPod(logger, pod); err != nil {
|
||||
|
|
@ -278,21 +389,11 @@ func (sched *Scheduler) addPodToCache(obj interface{}) {
|
|||
}
|
||||
}
|
||||
|
||||
func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
|
||||
func (sched *Scheduler) updateAssignedPodInCache(oldPod, newPod *v1.Pod) {
|
||||
start := time.Now()
|
||||
defer metrics.EventHandlingLatency.WithLabelValues(framework.EventAssignedPodUpdate.Label()).Observe(metrics.SinceInSeconds(start))
|
||||
|
||||
logger := sched.logger
|
||||
oldPod, ok := oldObj.(*v1.Pod)
|
||||
if !ok {
|
||||
utilruntime.HandleErrorWithLogger(logger, nil, "Cannot convert oldObj to *v1.Pod", "oldObj", oldObj)
|
||||
return
|
||||
}
|
||||
newPod, ok := newObj.(*v1.Pod)
|
||||
if !ok {
|
||||
utilruntime.HandleErrorWithLogger(logger, nil, "Cannot convert newObj to *v1.Pod", "newObj", newObj)
|
||||
return
|
||||
}
|
||||
|
||||
if sched.APIDispatcher != nil {
|
||||
// If the API dispatcher is available, sync the new pod with the details.
|
||||
|
|
@ -331,26 +432,11 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
|
|||
}
|
||||
}
|
||||
|
||||
func (sched *Scheduler) deletePodFromCache(obj interface{}) {
|
||||
func (sched *Scheduler) deleteAssignedPodFromCache(pod *v1.Pod) {
|
||||
start := time.Now()
|
||||
defer metrics.EventHandlingLatency.WithLabelValues(framework.EventAssignedPodDelete.Label()).Observe(metrics.SinceInSeconds(start))
|
||||
|
||||
logger := sched.logger
|
||||
var pod *v1.Pod
|
||||
switch t := obj.(type) {
|
||||
case *v1.Pod:
|
||||
pod = t
|
||||
case cache.DeletedFinalStateUnknown:
|
||||
var ok bool
|
||||
pod, ok = t.Obj.(*v1.Pod)
|
||||
if !ok {
|
||||
utilruntime.HandleErrorWithLogger(logger, nil, "Cannot convert to *v1.Pod", "obj", t.Obj)
|
||||
return
|
||||
}
|
||||
default:
|
||||
utilruntime.HandleErrorWithLogger(logger, nil, "Unable to handle object", "objType", fmt.Sprintf("%T", obj), "obj", obj)
|
||||
return
|
||||
}
|
||||
|
||||
logger.V(3).Info("Delete event for scheduled pod", "pod", klog.KObj(pod))
|
||||
if err := sched.Cache.RemovePod(logger, pod); err != nil {
|
||||
|
|
@ -405,64 +491,12 @@ func addAllEventHandlers(
|
|||
)
|
||||
|
||||
logger := sched.logger
|
||||
// scheduled pod cache
|
||||
if handlerRegistration, err = informerFactory.Core().V1().Pods().Informer().AddEventHandler(
|
||||
cache.FilteringResourceEventHandler{
|
||||
FilterFunc: func(obj interface{}) bool {
|
||||
switch t := obj.(type) {
|
||||
case *v1.Pod:
|
||||
return assignedPod(t)
|
||||
case cache.DeletedFinalStateUnknown:
|
||||
if _, ok := t.Obj.(*v1.Pod); ok {
|
||||
// The carried object may be stale, so we don't use it to check if
|
||||
// it's assigned or not. Attempting to cleanup anyways.
|
||||
return true
|
||||
}
|
||||
utilruntime.HandleErrorWithLogger(logger, nil, "Cannot convert to *v1.Pod", "obj", t.Obj)
|
||||
return false
|
||||
default:
|
||||
utilruntime.HandleErrorWithLogger(logger, nil, "Unable to handle object", "objType", fmt.Sprintf("%T", obj), "obj", obj)
|
||||
return false
|
||||
}
|
||||
},
|
||||
Handler: cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: sched.addPodToCache,
|
||||
UpdateFunc: sched.updatePodInCache,
|
||||
DeleteFunc: sched.deletePodFromCache,
|
||||
},
|
||||
},
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
handlers = append(handlers, handlerRegistration)
|
||||
|
||||
// unscheduled pod queue
|
||||
if handlerRegistration, err = informerFactory.Core().V1().Pods().Informer().AddEventHandler(
|
||||
cache.FilteringResourceEventHandler{
|
||||
FilterFunc: func(obj interface{}) bool {
|
||||
switch t := obj.(type) {
|
||||
case *v1.Pod:
|
||||
return !assignedPod(t) && responsibleForPod(t, sched.Profiles)
|
||||
case cache.DeletedFinalStateUnknown:
|
||||
if pod, ok := t.Obj.(*v1.Pod); ok {
|
||||
// The carried object may be stale, so we don't use it to check if
|
||||
// it's assigned or not.
|
||||
return responsibleForPod(pod, sched.Profiles)
|
||||
}
|
||||
utilruntime.HandleErrorWithLogger(logger, nil, "Cannot convert to *v1.Pod", "obj", t.Obj)
|
||||
return false
|
||||
default:
|
||||
utilruntime.HandleErrorWithLogger(logger, nil, "Unable to handle object", "objType", fmt.Sprintf("%T", obj), "obj", obj)
|
||||
return false
|
||||
}
|
||||
},
|
||||
Handler: cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: sched.addPodToSchedulingQueue,
|
||||
UpdateFunc: sched.updatePodInSchedulingQueue,
|
||||
DeleteFunc: sched.deletePodFromSchedulingQueue,
|
||||
},
|
||||
},
|
||||
); err != nil {
|
||||
if handlerRegistration, err = informerFactory.Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: sched.addPod,
|
||||
UpdateFunc: sched.updatePod,
|
||||
DeleteFunc: sched.deletePod,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
handlers = append(handlers, handlerRegistration)
|
||||
|
|
|
|||
|
|
@ -41,6 +41,7 @@ import (
|
|||
dyfake "k8s.io/client-go/dynamic/fake"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
resourceslicetracker "k8s.io/dynamic-resource-allocation/resourceslice/tracker"
|
||||
"k8s.io/klog/v2"
|
||||
|
|
@ -52,13 +53,17 @@ import (
|
|||
internalqueue "k8s.io/kubernetes/pkg/scheduler/backend/queue"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
apicalls "k8s.io/kubernetes/pkg/scheduler/framework/api_calls"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
|
||||
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||
"k8s.io/kubernetes/pkg/scheduler/profile"
|
||||
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
||||
tf "k8s.io/kubernetes/pkg/scheduler/testing/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/util"
|
||||
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
|
||||
)
|
||||
|
|
@ -127,7 +132,7 @@ func TestEventHandlers_MoveToActiveOnNominatedNodeUpdate(t *testing.T) {
|
|||
{
|
||||
name: "Removal of a pod that had nominated node name should trigger rescheduling of lower priority pods",
|
||||
updateFunc: func(s *Scheduler) {
|
||||
s.deletePodFromSchedulingQueue(medNominatedPriorityPod)
|
||||
s.deletePodFromSchedulingQueue(medNominatedPriorityPod, false)
|
||||
},
|
||||
wantInActiveOrBackoff: sets.New(lowPriorityPod.Name, medPriorityPod.Name),
|
||||
},
|
||||
|
|
@ -222,24 +227,24 @@ func newDefaultQueueSort() fwk.LessFunc {
|
|||
return sort.Less
|
||||
}
|
||||
|
||||
func TestUpdatePodInCache(t *testing.T) {
|
||||
func TestUpdateAssignedPodInCache(t *testing.T) {
|
||||
ttl := 10 * time.Second
|
||||
nodeName := "node"
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
oldObj interface{}
|
||||
newObj interface{}
|
||||
oldPod *v1.Pod
|
||||
newPod *v1.Pod
|
||||
}{
|
||||
{
|
||||
name: "pod updated with the same UID",
|
||||
oldObj: withPodName(podWithPort("oldUID", nodeName, 80), "pod"),
|
||||
newObj: withPodName(podWithPort("oldUID", nodeName, 8080), "pod"),
|
||||
oldPod: withPodName(podWithPort("oldUID", nodeName, 80), "pod"),
|
||||
newPod: withPodName(podWithPort("oldUID", nodeName, 8080), "pod"),
|
||||
},
|
||||
{
|
||||
name: "pod updated with different UIDs",
|
||||
oldObj: withPodName(podWithPort("oldUID", nodeName, 80), "pod"),
|
||||
newObj: withPodName(podWithPort("newUID", nodeName, 8080), "pod"),
|
||||
oldPod: withPodName(podWithPort("oldUID", nodeName, 80), "pod"),
|
||||
newPod: withPodName(podWithPort("newUID", nodeName, 8080), "pod"),
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
|
|
@ -252,20 +257,20 @@ func TestUpdatePodInCache(t *testing.T) {
|
|||
SchedulingQueue: internalqueue.NewTestQueue(ctx, nil),
|
||||
logger: logger,
|
||||
}
|
||||
sched.addPodToCache(tt.oldObj)
|
||||
sched.updatePodInCache(tt.oldObj, tt.newObj)
|
||||
sched.addAssignedPodToCache(tt.oldPod)
|
||||
sched.updateAssignedPodInCache(tt.oldPod, tt.newPod)
|
||||
|
||||
if tt.oldObj.(*v1.Pod).UID != tt.newObj.(*v1.Pod).UID {
|
||||
if pod, err := sched.Cache.GetPod(tt.oldObj.(*v1.Pod)); err == nil {
|
||||
if tt.oldPod.UID != tt.newPod.UID {
|
||||
if pod, err := sched.Cache.GetPod(tt.oldPod); err == nil {
|
||||
t.Errorf("Get pod UID %v from cache but it should not happen", pod.UID)
|
||||
}
|
||||
}
|
||||
pod, err := sched.Cache.GetPod(tt.newObj.(*v1.Pod))
|
||||
pod, err := sched.Cache.GetPod(tt.newPod)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to get pod from scheduler: %v", err)
|
||||
}
|
||||
if pod.UID != tt.newObj.(*v1.Pod).UID {
|
||||
t.Errorf("Want pod UID %v, got %v", tt.newObj.(*v1.Pod).UID, pod.UID)
|
||||
if pod.UID != tt.newPod.UID {
|
||||
t.Errorf("Want pod UID %v, got %v", tt.newPod.UID, pod.UID)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
@ -677,3 +682,317 @@ func TestAdmissionCheck(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddPod(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
pod *v1.Pod
|
||||
expectInQueue bool
|
||||
expectInCache bool
|
||||
}{
|
||||
{
|
||||
name: "add unscheduled pod",
|
||||
pod: st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").SchedulerName("supported-scheduler").Obj(),
|
||||
expectInQueue: true,
|
||||
},
|
||||
{
|
||||
name: "add unscheduled pod with other scheduler name",
|
||||
pod: st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").SchedulerName("other-scheduler").Obj(),
|
||||
},
|
||||
{
|
||||
name: "add scheduled pod",
|
||||
pod: st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Node("node1").Obj(),
|
||||
expectInCache: true,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
sched := &Scheduler{
|
||||
Cache: internalcache.New(ctx, 0, nil),
|
||||
SchedulingQueue: internalqueue.NewTestQueue(ctx, nil),
|
||||
logger: logger,
|
||||
Profiles: profile.Map{
|
||||
"supported-scheduler": nil,
|
||||
},
|
||||
}
|
||||
|
||||
sched.addPod(tt.pod)
|
||||
|
||||
_, ok := sched.SchedulingQueue.GetPod(tt.pod.Name, tt.pod.Namespace)
|
||||
if tt.expectInQueue && !ok {
|
||||
t.Errorf("Expected pod to be in scheduling queue")
|
||||
} else if !tt.expectInQueue && ok {
|
||||
t.Errorf("Expected pod not to be in scheduling queue")
|
||||
}
|
||||
_, err := sched.Cache.GetPod(tt.pod)
|
||||
if tt.expectInCache && err != nil {
|
||||
t.Errorf("Expected pod to be in cache: %v", err)
|
||||
} else if !tt.expectInCache && err == nil {
|
||||
t.Errorf("Expected pod not to be in cache")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdatePod(t *testing.T) {
|
||||
pod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").SchedulerName("supported-scheduler").Obj()
|
||||
updatedPod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Labels(map[string]string{"foo": "bar"}).ResourceVersion("2").SchedulerName("supported-scheduler").Obj()
|
||||
|
||||
podWithDeletionTimestamp := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Terminating().ResourceVersion("2").SchedulerName("supported-scheduler").Obj()
|
||||
|
||||
otherPod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").SchedulerName("other-scheduler").Obj()
|
||||
updatedOtherPod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Labels(map[string]string{"foo": "bar"}).ResourceVersion("2").SchedulerName("other-scheduler").Obj()
|
||||
|
||||
scheduledPod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Node("node1").SchedulerName("supported-scheduler").Obj()
|
||||
updatedScheduledPod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Labels(map[string]string{"foo": "bar"}).ResourceVersion("2").Node("node1").SchedulerName("supported-scheduler").Obj()
|
||||
|
||||
otherScheduledPod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Node("node1").SchedulerName("other-scheduler").Obj()
|
||||
updatedOtherScheduledPod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Labels(map[string]string{"foo": "bar"}).ResourceVersion("2").Node("node1").SchedulerName("other-scheduler").Obj()
|
||||
|
||||
scheduledPodOtherNode := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Node("node2").SchedulerName("supported-scheduler").Obj()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
oldPod *v1.Pod
|
||||
assumedPod *v1.Pod
|
||||
newPod *v1.Pod
|
||||
expectInQueue *v1.Pod
|
||||
expectInCache *v1.Pod
|
||||
}{
|
||||
{
|
||||
name: "update unscheduled pod",
|
||||
oldPod: pod,
|
||||
newPod: updatedPod,
|
||||
expectInQueue: updatedPod,
|
||||
},
|
||||
{
|
||||
name: "update unscheduled pod with other scheduler name",
|
||||
oldPod: otherPod,
|
||||
newPod: updatedOtherPod,
|
||||
},
|
||||
{
|
||||
name: "update assumed pod",
|
||||
oldPod: pod,
|
||||
assumedPod: scheduledPod,
|
||||
newPod: updatedPod,
|
||||
expectInCache: scheduledPod,
|
||||
},
|
||||
{
|
||||
name: "update scheduled pod",
|
||||
oldPod: scheduledPod,
|
||||
newPod: updatedScheduledPod,
|
||||
expectInCache: updatedScheduledPod,
|
||||
},
|
||||
{
|
||||
name: "update scheduled pod with other scheduler name",
|
||||
oldPod: otherScheduledPod,
|
||||
newPod: updatedOtherScheduledPod,
|
||||
expectInCache: updatedOtherScheduledPod,
|
||||
},
|
||||
{
|
||||
name: "bind unscheduled pod",
|
||||
oldPod: pod,
|
||||
newPod: scheduledPod,
|
||||
expectInCache: scheduledPod,
|
||||
},
|
||||
{
|
||||
name: "bind unscheduled pod with other scheduler name",
|
||||
oldPod: pod,
|
||||
newPod: scheduledPod,
|
||||
expectInCache: scheduledPod,
|
||||
},
|
||||
{
|
||||
name: "bind assumed pod",
|
||||
oldPod: pod,
|
||||
assumedPod: scheduledPod,
|
||||
newPod: scheduledPod,
|
||||
expectInCache: scheduledPod,
|
||||
},
|
||||
{
|
||||
name: "bind assumed pod to a different node",
|
||||
oldPod: pod,
|
||||
assumedPod: scheduledPod,
|
||||
newPod: scheduledPodOtherNode,
|
||||
expectInCache: scheduledPodOtherNode,
|
||||
},
|
||||
{
|
||||
name: "delete assumed pod with deletion timestamp",
|
||||
oldPod: pod,
|
||||
assumedPod: scheduledPod,
|
||||
newPod: podWithDeletionTimestamp,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
registerPluginFuncs := []tf.RegisterPluginFunc{
|
||||
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
}
|
||||
waitingPods := frameworkruntime.NewWaitingPodsMap()
|
||||
schedFramework, err := tf.NewFramework(
|
||||
ctx,
|
||||
registerPluginFuncs,
|
||||
"supported-scheduler",
|
||||
frameworkruntime.WithWaitingPods(waitingPods),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create framework: %v", err)
|
||||
}
|
||||
sched := &Scheduler{
|
||||
Cache: internalcache.New(ctx, 0, nil),
|
||||
SchedulingQueue: internalqueue.NewTestQueue(ctx, nil),
|
||||
logger: logger,
|
||||
Profiles: profile.Map{
|
||||
"supported-scheduler": schedFramework,
|
||||
},
|
||||
}
|
||||
|
||||
if tt.assumedPod != nil {
|
||||
err := sched.Cache.AssumePod(logger, tt.assumedPod)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to assume pod: %v", err)
|
||||
}
|
||||
} else {
|
||||
sched.addPod(tt.oldPod)
|
||||
}
|
||||
|
||||
sched.updatePod(tt.oldPod, tt.newPod)
|
||||
|
||||
qPod, ok := sched.SchedulingQueue.GetPod(tt.newPod.Name, tt.newPod.Namespace)
|
||||
if tt.expectInQueue != nil {
|
||||
if !ok {
|
||||
t.Errorf("Expected pod to be in scheduling queue")
|
||||
} else if diff := cmp.Diff(tt.expectInQueue, qPod.Pod); diff != "" {
|
||||
t.Errorf("Unexpected pod after update (-want,+got):\n%s", diff)
|
||||
}
|
||||
} else if ok {
|
||||
t.Errorf("Expected pod not to be in scheduling queue")
|
||||
}
|
||||
pod, err := sched.Cache.GetPod(tt.newPod)
|
||||
if tt.expectInCache != nil {
|
||||
if err != nil {
|
||||
t.Errorf("Expected pod to be in cache: %v", err)
|
||||
} else if diff := cmp.Diff(tt.expectInCache, pod); diff != "" {
|
||||
t.Errorf("Unexpected pod after update (-want,+got):\n%s", diff)
|
||||
}
|
||||
} else if err == nil {
|
||||
t.Errorf("Expected pod not to be in cache")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeletePod(t *testing.T) {
|
||||
pod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").SchedulerName("supported-scheduler").Obj()
|
||||
otherPod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").SchedulerName("other-scheduler").Obj()
|
||||
scheduledPod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Node("node1").SchedulerName("supported-scheduler").Obj()
|
||||
otherScheduledPod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Node("node1").SchedulerName("other-scheduler").Obj()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
initialPod *v1.Pod
|
||||
assumed bool
|
||||
waitingOnPermit bool
|
||||
podToDelete any
|
||||
}{
|
||||
{
|
||||
name: "delete unscheduled pod",
|
||||
initialPod: pod,
|
||||
podToDelete: pod,
|
||||
},
|
||||
{
|
||||
name: "delete unscheduled pod with other scheduler name",
|
||||
initialPod: otherPod,
|
||||
podToDelete: otherPod,
|
||||
},
|
||||
{
|
||||
name: "delete unscheduled pod with unknown state",
|
||||
initialPod: pod,
|
||||
podToDelete: cache.DeletedFinalStateUnknown{Obj: pod},
|
||||
},
|
||||
{
|
||||
name: "delete assumed pod",
|
||||
initialPod: scheduledPod,
|
||||
assumed: true,
|
||||
podToDelete: pod,
|
||||
},
|
||||
{
|
||||
name: "delete scheduled pod",
|
||||
initialPod: scheduledPod,
|
||||
podToDelete: scheduledPod,
|
||||
},
|
||||
{
|
||||
name: "delete scheduled pod with other scheduler name",
|
||||
initialPod: otherScheduledPod,
|
||||
podToDelete: otherScheduledPod,
|
||||
},
|
||||
{
|
||||
name: "delete scheduled pod with unknown state",
|
||||
initialPod: scheduledPod,
|
||||
podToDelete: cache.DeletedFinalStateUnknown{Obj: scheduledPod},
|
||||
},
|
||||
{
|
||||
name: "delete scheduled pod with unknown older state",
|
||||
initialPod: scheduledPod,
|
||||
podToDelete: cache.DeletedFinalStateUnknown{Obj: pod},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
registerPluginFuncs := []tf.RegisterPluginFunc{
|
||||
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
}
|
||||
waitingPods := frameworkruntime.NewWaitingPodsMap()
|
||||
schedFramework, err := tf.NewFramework(
|
||||
ctx,
|
||||
registerPluginFuncs,
|
||||
"supported-scheduler",
|
||||
frameworkruntime.WithWaitingPods(waitingPods),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create framework: %v", err)
|
||||
}
|
||||
sched := &Scheduler{
|
||||
Cache: internalcache.New(ctx, 0, nil),
|
||||
SchedulingQueue: internalqueue.NewTestQueue(ctx, nil),
|
||||
logger: logger,
|
||||
Profiles: profile.Map{
|
||||
"supported-scheduler": schedFramework,
|
||||
},
|
||||
}
|
||||
|
||||
if tt.assumed {
|
||||
err := sched.Cache.AssumePod(logger, tt.initialPod)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to assume pod: %v", err)
|
||||
}
|
||||
} else {
|
||||
sched.addPod(tt.initialPod)
|
||||
}
|
||||
|
||||
sched.deletePod(tt.podToDelete)
|
||||
|
||||
_, err = sched.Cache.GetPod(tt.initialPod)
|
||||
if err == nil {
|
||||
t.Errorf("Unexpected pod in cache after removal")
|
||||
}
|
||||
_, ok := sched.SchedulingQueue.GetPod(tt.initialPod.Name, tt.initialPod.Namespace)
|
||||
if ok {
|
||||
t.Errorf("Unexpected pod in scheduling queue after removal")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2097,7 +2097,7 @@ func TestSchedulerBinding(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestUpdatePod(t *testing.T) {
|
||||
func TestUpdatePodStatus(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
currentPodConditions []v1.PodCondition
|
||||
|
|
|
|||
|
|
@ -417,6 +417,12 @@ func (p *PodWrapper) ZeroTerminationGracePeriod() *PodWrapper {
|
|||
return p
|
||||
}
|
||||
|
||||
// TerminationGracePeriodSeconds sets the TerminationGracePeriodSeconds of the inner pod.
|
||||
func (p *PodWrapper) TerminationGracePeriodSeconds(s int64) *PodWrapper {
|
||||
p.Spec.TerminationGracePeriodSeconds = &s
|
||||
return p
|
||||
}
|
||||
|
||||
// Node sets `s` as the nodeName of the inner pod.
|
||||
func (p *PodWrapper) Node(s string) *PodWrapper {
|
||||
p.Spec.NodeName = s
|
||||
|
|
@ -562,6 +568,12 @@ func (p *PodWrapper) SchedulingGates(gates []string) *PodWrapper {
|
|||
return p
|
||||
}
|
||||
|
||||
// ResourceVersion sets the inner pod's ResurceVersion.
|
||||
func (p *PodWrapper) ResourceVersion(version string) *PodWrapper {
|
||||
p.ObjectMeta.ResourceVersion = version
|
||||
return p
|
||||
}
|
||||
|
||||
// PodAffinityKind represents different kinds of PodAffinity.
|
||||
type PodAffinityKind int
|
||||
|
||||
|
|
|
|||
|
|
@ -484,6 +484,7 @@ func TestPreemption(t *testing.T) {
|
|||
|
||||
func TestAsyncPreemption(t *testing.T) {
|
||||
const podBlockedInBindingName = "pod-blocked-in-binding"
|
||||
const reservingPodName = "reserving-pod"
|
||||
|
||||
type createPod struct {
|
||||
pod *v1.Pod
|
||||
|
|
@ -811,11 +812,11 @@ func TestAsyncPreemption(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
// This scenario verifies the fix for https://github.com/kubernetes/kubernetes/issues/134249
|
||||
// This scenario verifies the fix for https://github.com/kubernetes/kubernetes/issues/134217
|
||||
// Scenario reproduces the issue:
|
||||
// Victim pod takes long in binding. Preemptor pod attempts preemption, goes to unschedulable, then gets activated by some unknown trigger.
|
||||
// Preemptor pod is expected to go back to unschedulable queue and remain there until victim binding and preemption is completed.
|
||||
name: "victim blocked in binding, preemptor pod gets activated randomly and returns to unschedulable queue until victim is bound and deleted",
|
||||
// Victim pod takes long in binding. Preemptor pod attempts preemption, goes to unschedulable, then the victim is deleted.
|
||||
// Preemptor pod is woken up by the Pod/Delete event and is being scheduled, even before the victim binding is terminated.
|
||||
name: "victim blocked in binding, preemptor pod gets scheduled after victim-in-binding is deleted",
|
||||
scenarios: []scenario{
|
||||
{
|
||||
name: "create victim Pod that is going to be blocked in binding",
|
||||
|
|
@ -847,11 +848,45 @@ func TestAsyncPreemption(t *testing.T) {
|
|||
completePreemption: "preemptor",
|
||||
},
|
||||
{
|
||||
name: "activate preemptor Pod, simulating a random event that activated it",
|
||||
activatePod: "preemptor",
|
||||
name: "schedule the preemptor Pod again and expect it to be scheduled (assumed victim pod was forgotten)",
|
||||
schedulePod: &schedulePod{
|
||||
podName: "preemptor",
|
||||
expectSuccess: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "schedule the preemptor Pod again and expect it to end up in unschedulable (waiting for preemption to finish)",
|
||||
name: "resume binding of the blocked pod",
|
||||
resumeBind: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
// This scenario verifies the fix for https://github.com/kubernetes/kubernetes/issues/134217
|
||||
// Scenario reproduces the issue, but with a victim that is under graceful termination:
|
||||
// Victim pod takes long in binding. Preemptor pod attempts preemption, goes to unschedulable, then the victim's graceful termination is initiated.
|
||||
// Preemptor pod is woken up by the Pod/Update event (working like AssignedPodDeleted) and is being scheduled, even before the victim binding is terminated.
|
||||
name: "victim blocked in binding, preemptor pod gets scheduled when victim-in-binding is under graceful termination",
|
||||
scenarios: []scenario{
|
||||
{
|
||||
name: "create victim Pod with long termination grace period that is going to be blocked in binding",
|
||||
createPod: &createPod{
|
||||
pod: st.MakePod().Name(podBlockedInBindingName).Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).TerminationGracePeriodSeconds(1000).Container("image").Priority(1).Obj(),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "schedule victim Pod",
|
||||
schedulePod: &schedulePod{
|
||||
podName: podBlockedInBindingName,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "create a preemptor Pod",
|
||||
createPod: &createPod{
|
||||
pod: st.MakePod().Name("preemptor").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Priority(100).Obj(),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "schedule the preemptor Pod",
|
||||
schedulePod: &schedulePod{
|
||||
podName: "preemptor",
|
||||
expectUnschedulable: true,
|
||||
|
|
@ -862,15 +897,170 @@ func TestAsyncPreemption(t *testing.T) {
|
|||
completePreemption: "preemptor",
|
||||
},
|
||||
{
|
||||
name: "check that preemptor remained in unschedulable queue",
|
||||
verifyPodInUnschedulable: "preemptor",
|
||||
name: "schedule the preemptor Pod again and expect it to be scheduled (assumed victim pod was forgotten)",
|
||||
schedulePod: &schedulePod{
|
||||
podName: "preemptor",
|
||||
expectSuccess: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
// This scenario verifies the fix for https://github.com/kubernetes/kubernetes/issues/134217
|
||||
// Scenario reproduces the issue, but with a victim that is under graceful termination:
|
||||
// Victim pod takes long in binding. Preemptor pod attempts preemption, goes to unschedulable, then the victim's graceful termination is initiated.
|
||||
// Preemptor pod is woken up by the Pod/Update event (working like AssignedPodDeleted) and is being scheduled, even before the victim binding is terminated.
|
||||
name: "victim blocked in binding, preemptor pod gets scheduled when victim-in-binding is under graceful termination",
|
||||
scenarios: []scenario{
|
||||
{
|
||||
name: "create victim Pod with long termination grace period that is going to be blocked in binding",
|
||||
createPod: &createPod{
|
||||
pod: st.MakePod().Name(podBlockedInBindingName).Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Container("image").TerminationGracePeriodSeconds(1000).Priority(1).Obj(),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "schedule victim Pod",
|
||||
schedulePod: &schedulePod{
|
||||
podName: podBlockedInBindingName,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "create a preemptor Pod",
|
||||
createPod: &createPod{
|
||||
pod: st.MakePod().Name("preemptor").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Priority(100).Obj(),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "schedule the preemptor Pod",
|
||||
schedulePod: &schedulePod{
|
||||
podName: "preemptor",
|
||||
expectUnschedulable: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "complete the preemption API call",
|
||||
completePreemption: "preemptor",
|
||||
},
|
||||
{
|
||||
name: "schedule the preemptor Pod again and expect it to be scheduled (assumed victim pod was forgotten)",
|
||||
schedulePod: &schedulePod{
|
||||
podName: "preemptor",
|
||||
expectSuccess: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "resume binding of the blocked pod",
|
||||
resumeBind: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
// This scenario verifies the fix for https://github.com/kubernetes/kubernetes/issues/134217
|
||||
// Scenario reproduces the issue, but with a victim that is reserving some resources required by the preemptor:
|
||||
// Victim pod takes long in binding. Preemptor pod attempts preemption, goes to unschedulable, then the victim is deleted.
|
||||
// Preemptor pod is woken up by the Pod/Update event (working like AssignedPodDeleted), but is still unschedulable, because victim has to unreserve its resources.
|
||||
// After resuming binding for a victim, it releases the resources in its failure handler, preemptor is woken up again and ultimately scheduled.
|
||||
name: "victim blocked in binding, preemptor pod gets scheduled after victim-in-binding is deleted and its resources are unreserved",
|
||||
scenarios: []scenario{
|
||||
{
|
||||
name: "create victim Pod that is going to be blocked in binding",
|
||||
createPod: &createPod{
|
||||
pod: st.MakePod().Name(podBlockedInBindingName + reservingPodName).Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Container("image").ZeroTerminationGracePeriod().Priority(1).Obj(),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "schedule victim Pod",
|
||||
schedulePod: &schedulePod{
|
||||
podName: podBlockedInBindingName + reservingPodName,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "create a preemptor Pod",
|
||||
createPod: &createPod{
|
||||
pod: st.MakePod().Name("preemptor").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Priority(100).Obj(),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "schedule the preemptor Pod",
|
||||
schedulePod: &schedulePod{
|
||||
podName: "preemptor",
|
||||
expectUnschedulable: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "complete the preemption API call",
|
||||
completePreemption: "preemptor",
|
||||
},
|
||||
{
|
||||
name: "schedule the preemptor Pod again and expect it to be unschedulable (resources are still reserved by the victim)",
|
||||
schedulePod: &schedulePod{
|
||||
podName: "preemptor",
|
||||
expectUnschedulable: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "resume binding of the blocked pod",
|
||||
resumeBind: true,
|
||||
},
|
||||
{
|
||||
name: "schedule the preemptor Pod after the completed binding and preemption of the blocked pod",
|
||||
name: "schedule the preemptor Pod again and expect it to be scheduled (victim pod unreserved its resources)",
|
||||
schedulePod: &schedulePod{
|
||||
podName: "preemptor",
|
||||
expectSuccess: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
// This scenario verifies the fix for https://github.com/kubernetes/kubernetes/issues/134217
|
||||
// Scenario reproduces the issue, but with a victim that is under graceful termination and sis reserving some resources required by the preemptor:
|
||||
// Victim pod takes long in binding. Preemptor pod attempts preemption, goes to unschedulable, then the victim's graceful termination is initiated.
|
||||
// Preemptor pod is woken up by the Pod/Update event (working like AssignedPodDeleted), but is still unschedulable, because victim has to unreserve its resources.
|
||||
// After resuming binding for a victim, it releases the resources in its failure handler, preemptor is woken up again and ultimately scheduled.
|
||||
name: "victim blocked in binding, preemptor pod gets scheduled after victim-in-binding is under graceful termination and its resources are unreserved",
|
||||
scenarios: []scenario{
|
||||
{
|
||||
name: "create victim Pod that is going to be blocked in binding",
|
||||
createPod: &createPod{
|
||||
pod: st.MakePod().Name(podBlockedInBindingName + reservingPodName).Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Container("image").TerminationGracePeriodSeconds(1000).Priority(1).Obj(),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "schedule victim Pod",
|
||||
schedulePod: &schedulePod{
|
||||
podName: podBlockedInBindingName + reservingPodName,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "create a preemptor Pod",
|
||||
createPod: &createPod{
|
||||
pod: st.MakePod().Name("preemptor").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Priority(100).Obj(),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "schedule the preemptor Pod",
|
||||
schedulePod: &schedulePod{
|
||||
podName: "preemptor",
|
||||
expectUnschedulable: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "complete the preemption API call",
|
||||
completePreemption: "preemptor",
|
||||
},
|
||||
{
|
||||
name: "schedule the preemptor Pod again and expect it to be unschedulable (resources are still reserved by the victim)",
|
||||
schedulePod: &schedulePod{
|
||||
podName: "preemptor",
|
||||
expectUnschedulable: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "resume binding of the blocked pod",
|
||||
resumeBind: true,
|
||||
},
|
||||
{
|
||||
name: "schedule the preemptor Pod again and expect it to be scheduled (victim pod unreserved its resources)",
|
||||
schedulePod: &schedulePod{
|
||||
podName: "preemptor",
|
||||
expectSuccess: true,
|
||||
|
|
@ -882,7 +1072,7 @@ func TestAsyncPreemption(t *testing.T) {
|
|||
|
||||
// All test cases have the same node.
|
||||
node := st.MakeNode().Name("node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj()
|
||||
for _, asyncAPICallsEnabled := range []bool{true, false} {
|
||||
for _, asyncAPICallsEnabled := range []bool{true} {
|
||||
for _, test := range tests {
|
||||
t.Run(fmt.Sprintf("%s (Async API calls enabled: %v)", test.name, asyncAPICallsEnabled), func(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerAsyncAPICalls, asyncAPICallsEnabled)
|
||||
|
|
@ -956,6 +1146,19 @@ func TestAsyncPreemption(t *testing.T) {
|
|||
t.Fatalf("Error registering a bind plugin: %v", err)
|
||||
}
|
||||
|
||||
// Register fake plugin that will reserve some fake resources for one pod.
|
||||
// This could be used to check scheduler's behavior when the victim has to unreserve these resources to let the preemptor schedule.
|
||||
reservingPluginName := "reservingPlugin"
|
||||
err = registry.Register(reservingPluginName, func(ctx context.Context, o runtime.Object, fh fwk.Handle) (fwk.Plugin, error) {
|
||||
return &reservingPlugin{
|
||||
name: reservingPluginName,
|
||||
nameOfPodToReserve: reservingPodName,
|
||||
}, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Error registering a reserving plugin: %v", err)
|
||||
}
|
||||
|
||||
cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{
|
||||
Profiles: []configv1.KubeSchedulerProfile{{
|
||||
SchedulerName: ptr.To(v1.DefaultSchedulerName),
|
||||
|
|
@ -964,6 +1167,7 @@ func TestAsyncPreemption(t *testing.T) {
|
|||
Enabled: []configv1.Plugin{
|
||||
{Name: blockingBindPluginName},
|
||||
{Name: delayedPreemptionPluginName},
|
||||
{Name: reservingPluginName},
|
||||
},
|
||||
Disabled: []configv1.Plugin{
|
||||
{Name: names.DefaultPreemption},
|
||||
|
|
@ -1176,3 +1380,93 @@ func (bp *blockingBindPlugin) Bind(ctx context.Context, state fwk.CycleState, p
|
|||
}
|
||||
|
||||
var _ fwk.BindPlugin = &blockingBindPlugin{}
|
||||
|
||||
// reservingPlugin is a fake plugin that reserves some resource in memory for nameOfPodToReserve pod.
|
||||
// Other pods won't be scheduled, unless the resources are unreserved.
|
||||
type reservingPlugin struct {
|
||||
lock sync.Mutex
|
||||
name string
|
||||
nameOfPodToReserve string
|
||||
reserved bool
|
||||
}
|
||||
|
||||
func (rp *reservingPlugin) Name() string {
|
||||
return rp.name
|
||||
}
|
||||
|
||||
func (rp *reservingPlugin) EventsToRegister(_ context.Context) ([]fwk.ClusterEventWithHint, error) {
|
||||
return []fwk.ClusterEventWithHint{
|
||||
// Plugin will wake up the pod on any Pod/Delete event.
|
||||
{Event: fwk.ClusterEvent{Resource: fwk.Pod, ActionType: fwk.Delete}},
|
||||
}, nil
|
||||
}
|
||||
|
||||
const reservingPluginStateKey = "PreFilterReserving"
|
||||
|
||||
type reservingPluginState struct {
|
||||
reserved bool
|
||||
}
|
||||
|
||||
func (s reservingPluginState) Clone() fwk.StateData {
|
||||
return reservingPluginState{
|
||||
reserved: s.reserved,
|
||||
}
|
||||
}
|
||||
|
||||
func (rp *reservingPlugin) PreFilter(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodes []fwk.NodeInfo) (*fwk.PreFilterResult, *fwk.Status) {
|
||||
rp.lock.Lock()
|
||||
state.Write(reservingPluginStateKey, reservingPluginState{reserved: rp.reserved})
|
||||
rp.lock.Unlock()
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (rp *reservingPlugin) Filter(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) *fwk.Status {
|
||||
s, err := state.Read(reservingPluginStateKey)
|
||||
if err != nil {
|
||||
return fwk.AsStatus(err)
|
||||
}
|
||||
if s.(reservingPluginState).reserved {
|
||||
return fwk.NewStatus(fwk.Unschedulable, "resources are reserved")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rp *reservingPlugin) Reserve(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) *fwk.Status {
|
||||
if strings.Contains(p.Name, rp.nameOfPodToReserve) {
|
||||
rp.lock.Lock()
|
||||
rp.reserved = true
|
||||
rp.lock.Unlock()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rp *reservingPlugin) Unreserve(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) {
|
||||
if strings.Contains(p.Name, rp.nameOfPodToReserve) {
|
||||
rp.lock.Lock()
|
||||
rp.reserved = false
|
||||
rp.lock.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (rp *reservingPlugin) PreFilterExtensions() fwk.PreFilterExtensions {
|
||||
return rp
|
||||
}
|
||||
|
||||
func (rp *reservingPlugin) AddPod(ctx context.Context, state fwk.CycleState, podToSchedule *v1.Pod, podInfoToAdd fwk.PodInfo, nodeInfo fwk.NodeInfo) *fwk.Status {
|
||||
if strings.Contains(podInfoToAdd.GetPod().Name, rp.nameOfPodToReserve) {
|
||||
state.Write(reservingPluginStateKey, reservingPluginState{reserved: true})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rp *reservingPlugin) RemovePod(ctx context.Context, state fwk.CycleState, podToSchedule *v1.Pod, podInfoToRemove fwk.PodInfo, nodeInfo fwk.NodeInfo) *fwk.Status {
|
||||
if strings.Contains(podInfoToRemove.GetPod().Name, rp.nameOfPodToReserve) {
|
||||
state.Write(reservingPluginStateKey, reservingPluginState{reserved: false})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ fwk.PreFilterPlugin = &reservingPlugin{}
|
||||
var _ fwk.FilterPlugin = &reservingPlugin{}
|
||||
var _ fwk.PreFilterExtensions = &reservingPlugin{}
|
||||
var _ fwk.ReservePlugin = &reservingPlugin{}
|
||||
|
|
|
|||
Loading…
Reference in a new issue