mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-05-28 04:04:39 -04:00
Merge ce3cd58b22 into e136f39334
This commit is contained in:
commit
4b3cfe4bfc
10 changed files with 470 additions and 64 deletions
25
pkg/scheduler/backend/cache/snapshot.go
vendored
25
pkg/scheduler/backend/cache/snapshot.go
vendored
|
|
@ -480,3 +480,28 @@ func (s *Snapshot) ListNodesInPlacement() ([]fwk.NodeInfo, error) {
|
|||
}
|
||||
return s.placementNodes.nodeInfoList, nil
|
||||
}
|
||||
|
||||
// AssumeAllExistingPods assumes all pods that are already present in the snapshot.
|
||||
func (s *Snapshot) AssumeAllExistingPods() error {
|
||||
currentlyAssumed := sets.New[string]()
|
||||
revertAssumed := func() {
|
||||
for key := range currentlyAssumed {
|
||||
delete(s.assumedPods, key)
|
||||
}
|
||||
}
|
||||
|
||||
for _, nodeInfo := range s.nodeInfoList {
|
||||
for _, podInfo := range nodeInfo.GetPods() {
|
||||
p := podInfo.GetPod()
|
||||
key, err := framework.GetPodKey(p)
|
||||
if err != nil {
|
||||
revertAssumed()
|
||||
return fmt.Errorf("couldn't create key for pod: %w", err)
|
||||
}
|
||||
s.assumedPods[key] = p
|
||||
currentlyAssumed.Insert(key)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
65
pkg/scheduler/backend/cache/snapshot_test.go
vendored
65
pkg/scheduler/backend/cache/snapshot_test.go
vendored
|
|
@ -798,6 +798,71 @@ func TestSnapshot_Placement(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestSnapshot_AssumeAllExistingPods(t *testing.T) {
|
||||
node1 := st.MakeNode().Name("node-1").Obj()
|
||||
node2 := st.MakeNode().Name("node-2").Obj()
|
||||
|
||||
pod1 := st.MakePod().Name("pod-1").Namespace("ns").UID("pod-1").Node("node-1").Obj()
|
||||
pod2 := st.MakePod().Name("pod-2").Namespace("ns").UID("pod-2").Node("node-1").Obj()
|
||||
pod3 := st.MakePod().Name("pod-3").Namespace("ns").UID("pod-3").Node("node-2").Obj()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
initialPods []*v1.Pod
|
||||
initialNodes []*v1.Node
|
||||
preAssumedPods map[string]*v1.Pod
|
||||
expectedAssumedPods sets.Set[string]
|
||||
}{
|
||||
{
|
||||
name: "all existing pods assumed successfully",
|
||||
initialPods: []*v1.Pod{pod1, pod2, pod3},
|
||||
initialNodes: []*v1.Node{node1, node2},
|
||||
expectedAssumedPods: sets.New("pod-1", "pod-2", "pod-3"),
|
||||
},
|
||||
{
|
||||
name: "empty snapshot, no pods assumed",
|
||||
initialPods: nil,
|
||||
initialNodes: []*v1.Node{node1, node2},
|
||||
expectedAssumedPods: sets.New[string](),
|
||||
},
|
||||
{
|
||||
name: "pods already pre-assumed are kept or overwritten",
|
||||
initialPods: []*v1.Pod{pod1},
|
||||
initialNodes: []*v1.Node{node1},
|
||||
preAssumedPods: map[string]*v1.Pod{
|
||||
"pod-2": pod2,
|
||||
},
|
||||
expectedAssumedPods: sets.New("pod-1", "pod-2"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
snapshot := NewSnapshot(tt.initialPods, tt.initialNodes)
|
||||
if tt.preAssumedPods != nil {
|
||||
for k, v := range tt.preAssumedPods {
|
||||
snapshot.assumedPods[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
err := snapshot.AssumeAllExistingPods()
|
||||
if err != nil {
|
||||
t.Fatalf("AssumeAllExistingPods() failed: %v", err)
|
||||
}
|
||||
|
||||
if len(tt.expectedAssumedPods) != len(snapshot.assumedPods) {
|
||||
t.Errorf("Unexpected number of assumed pods: want %d, got %d", len(tt.expectedAssumedPods), len(snapshot.assumedPods))
|
||||
}
|
||||
|
||||
for key := range tt.expectedAssumedPods {
|
||||
if _, ok := snapshot.assumedPods[key]; !ok {
|
||||
t.Errorf("Expected pod %q to be assumed, but it wasn't", key)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSnapshot_BackupRestore(t *testing.T) {
|
||||
podWithAffinity := st.MakePod().Name("p-aff").Namespace("ns").UID("p-aff").PodAffinity("key", &metav1.LabelSelector{MatchLabels: map[string]string{"key": "value"}}, st.PodAffinityWithRequiredReq).Node("node-1").Obj()
|
||||
podWithAntiAffinity := st.MakePod().Name("p-anti").Namespace("ns").UID("p-anti").PodAntiAffinity("key", &metav1.LabelSelector{MatchLabels: map[string]string{"key": "value"}}, st.PodAntiAffinityWithRequiredReq).Node("node-1").Obj()
|
||||
|
|
|
|||
|
|
@ -38,6 +38,8 @@ type CycleState struct {
|
|||
skipPreBindPlugins sets.Set[string]
|
||||
// skipAllPostFilterPlugins indicates whether to skip all plugins in the PostFilter extension point.
|
||||
skipAllPostFilterPlugins bool
|
||||
// skipAllScorePlugins indicates whether to skip all plugins in the Score extension point.
|
||||
skipAllScorePlugins bool
|
||||
// GetParallelPreBindPlugins returns plugins that can be run in parallel with other plugins
|
||||
// in the PreBind extension point.
|
||||
parallelPreBindPlugins sets.Set[string]
|
||||
|
|
@ -85,6 +87,16 @@ func (c *CycleState) GetSkipScorePlugins() sets.Set[string] {
|
|||
return c.skipScorePlugins
|
||||
}
|
||||
|
||||
// SetSkipAllScorePlugins sets whether to skip all plugins in the Score extension point.
|
||||
func (c *CycleState) SetSkipAllScorePlugins(flag bool) {
|
||||
c.skipAllScorePlugins = flag
|
||||
}
|
||||
|
||||
// ShouldSkipAllScorePlugins returns whether to skip all plugins in the Score extension point.
|
||||
func (c *CycleState) ShouldSkipAllScorePlugins() bool {
|
||||
return c.skipAllScorePlugins
|
||||
}
|
||||
|
||||
func (c *CycleState) SetSkipPreBindPlugins(plugins sets.Set[string]) {
|
||||
c.skipPreBindPlugins = plugins
|
||||
}
|
||||
|
|
@ -141,6 +153,7 @@ func (c *CycleState) Clone() fwk.CycleState {
|
|||
copy.parallelPreBindPlugins = c.parallelPreBindPlugins
|
||||
copy.podGroupCycleState = c.podGroupCycleState
|
||||
copy.skipAllPostFilterPlugins = c.skipAllPostFilterPlugins
|
||||
copy.skipAllScorePlugins = c.skipAllScorePlugins
|
||||
|
||||
return copy
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ var key fwk.StateKey = "fakedata_key"
|
|||
|
||||
// createCycleStateWithFakeData creates *CycleState with fakeData.
|
||||
// The given data is used in stored fakeData.
|
||||
func createCycleStateWithFakeData(data string, recordPluginMetrics bool, skipAllPostFilterPlugins bool, skipPlugins ...[]string) *CycleState {
|
||||
func createCycleStateWithFakeData(data string, recordPluginMetrics bool, skipAllPostFilterPlugins bool, skipAllScorePlugins bool, skipPlugins ...[]string) *CycleState {
|
||||
c := NewCycleState()
|
||||
c.Write(key, &fakeData{
|
||||
data: data,
|
||||
|
|
@ -53,6 +53,7 @@ func createCycleStateWithFakeData(data string, recordPluginMetrics bool, skipAll
|
|||
c.SetSkipScorePlugins(sets.New(skipPlugins[1]...))
|
||||
}
|
||||
c.SetSkipAllPostFilterPlugins(skipAllPostFilterPlugins)
|
||||
c.SetSkipAllScorePlugins(skipAllScorePlugins)
|
||||
return c
|
||||
}
|
||||
|
||||
|
|
@ -78,6 +79,9 @@ func isCycleStateEqual(a, b *CycleState) (bool, string) {
|
|||
if diff := cmp.Diff(a.skipAllPostFilterPlugins, b.skipAllPostFilterPlugins); diff != "" {
|
||||
return false, fmt.Sprintf("CycleState A and B have different SkipAllPostFilterPlugins sets. -wanted,+got:\n%s", diff)
|
||||
}
|
||||
if diff := cmp.Diff(a.skipAllScorePlugins, b.skipAllScorePlugins); diff != "" {
|
||||
return false, fmt.Sprintf("CycleState A and B have different SkipAllScorePlugins sets. -wanted,+got:\n%s", diff)
|
||||
}
|
||||
|
||||
var msg string
|
||||
isEqual := true
|
||||
|
|
@ -133,33 +137,38 @@ func TestCycleStateClone(t *testing.T) {
|
|||
}{
|
||||
{
|
||||
name: "clone with recordPluginMetrics true",
|
||||
state: createCycleStateWithFakeData("data", true, false),
|
||||
wantClonedState: createCycleStateWithFakeData("data", true, false),
|
||||
state: createCycleStateWithFakeData("data", true, false, false),
|
||||
wantClonedState: createCycleStateWithFakeData("data", true, false, false),
|
||||
},
|
||||
{
|
||||
name: "clone with recordPluginMetrics false",
|
||||
state: createCycleStateWithFakeData("data", false, false),
|
||||
wantClonedState: createCycleStateWithFakeData("data", false, false),
|
||||
state: createCycleStateWithFakeData("data", false, false, false),
|
||||
wantClonedState: createCycleStateWithFakeData("data", false, false, false),
|
||||
},
|
||||
{
|
||||
name: "clone with SkipFilterPlugins",
|
||||
state: createCycleStateWithFakeData("data", true, false, []string{"p1", "p2", "p3"}),
|
||||
wantClonedState: createCycleStateWithFakeData("data", true, false, []string{"p1", "p2", "p3"}),
|
||||
state: createCycleStateWithFakeData("data", true, false, false, []string{"p1", "p2", "p3"}),
|
||||
wantClonedState: createCycleStateWithFakeData("data", true, false, false, []string{"p1", "p2", "p3"}),
|
||||
},
|
||||
{
|
||||
name: "clone with SkipScorePlugins",
|
||||
state: createCycleStateWithFakeData("data", false, false, []string{}, []string{"p1", "p2", "p3"}),
|
||||
wantClonedState: createCycleStateWithFakeData("data", false, false, []string{}, []string{"p1", "p2", "p3"}),
|
||||
state: createCycleStateWithFakeData("data", false, false, false, []string{}, []string{"p1", "p2", "p3"}),
|
||||
wantClonedState: createCycleStateWithFakeData("data", false, false, false, []string{}, []string{"p1", "p2", "p3"}),
|
||||
},
|
||||
{
|
||||
name: "clone with SkipScorePlugins and SkipFilterPlugins",
|
||||
state: createCycleStateWithFakeData("data", true, false, []string{"p0"}, []string{"p1", "p2", "p3"}),
|
||||
wantClonedState: createCycleStateWithFakeData("data", true, false, []string{"p0"}, []string{"p1", "p2", "p3"}),
|
||||
state: createCycleStateWithFakeData("data", true, false, false, []string{"p0"}, []string{"p1", "p2", "p3"}),
|
||||
wantClonedState: createCycleStateWithFakeData("data", true, false, false, []string{"p0"}, []string{"p1", "p2", "p3"}),
|
||||
},
|
||||
{
|
||||
name: "clone with SkipAllPostFilterPlugins",
|
||||
state: createCycleStateWithFakeData("data", true, true, []string{"p0"}, []string{"p1", "p2", "p3"}),
|
||||
wantClonedState: createCycleStateWithFakeData("data", true, true, []string{"p0"}, []string{"p1", "p2", "p3"}),
|
||||
state: createCycleStateWithFakeData("data", true, true, false, []string{"p0"}, []string{"p1", "p2", "p3"}),
|
||||
wantClonedState: createCycleStateWithFakeData("data", true, true, false, []string{"p0"}, []string{"p1", "p2", "p3"}),
|
||||
},
|
||||
{
|
||||
name: "clone with SkipAllScorePlugins",
|
||||
state: createCycleStateWithFakeData("data", true, false, true, []string{"p0"}, []string{"p1", "p2", "p3"}),
|
||||
wantClonedState: createCycleStateWithFakeData("data", true, false, true, []string{"p0"}, []string{"p1", "p2", "p3"}),
|
||||
},
|
||||
{
|
||||
name: "clone with nil CycleState",
|
||||
|
|
|
|||
|
|
@ -1346,7 +1346,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state fwk.CycleStat
|
|||
plugins := make([]fwk.ScorePlugin, 0, numPlugins)
|
||||
pluginToNodeScores := make(map[string]fwk.NodeScoreList, numPlugins)
|
||||
for _, pl := range f.scorePlugins {
|
||||
if state.GetSkipScorePlugins().Has(pl.Name()) {
|
||||
if state.ShouldSkipAllScorePlugins() || state.GetSkipScorePlugins().Has(pl.Name()) {
|
||||
continue
|
||||
}
|
||||
plugins = append(plugins, pl)
|
||||
|
|
|
|||
|
|
@ -1381,12 +1381,13 @@ func TestRunPreScorePlugins(t *testing.T) {
|
|||
|
||||
func TestRunScorePlugins(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
registry Registry
|
||||
plugins *config.Plugins
|
||||
pluginConfigs []config.PluginConfig
|
||||
want []fwk.NodePluginScores
|
||||
skippedPlugins sets.Set[string]
|
||||
name string
|
||||
registry Registry
|
||||
plugins *config.Plugins
|
||||
pluginConfigs []config.PluginConfig
|
||||
want []fwk.NodePluginScores
|
||||
skippedPlugins sets.Set[string]
|
||||
skipAllScorePlugins bool
|
||||
// If err is true, we expect RunScorePlugin to fail.
|
||||
err bool
|
||||
}{
|
||||
|
|
@ -1744,6 +1745,29 @@ func TestRunScorePlugins(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "skip all score plugins",
|
||||
plugins: buildScoreConfigDefaultWeights(scorePlugin1),
|
||||
skipAllScorePlugins: true,
|
||||
pluginConfigs: []config.PluginConfig{
|
||||
{
|
||||
Name: scorePlugin1,
|
||||
Args: &runtime.Unknown{
|
||||
Raw: []byte(`{ "scoreStatus": 1 }`), // To make sure this plugin isn't called, set error as an injected result.
|
||||
},
|
||||
},
|
||||
},
|
||||
want: []fwk.NodePluginScores{
|
||||
{
|
||||
Name: "node1",
|
||||
Scores: []fwk.PluginScore{},
|
||||
},
|
||||
{
|
||||
Name: "node2",
|
||||
Scores: []fwk.PluginScore{},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
|
|
@ -1766,6 +1790,7 @@ func TestRunScorePlugins(t *testing.T) {
|
|||
|
||||
state := framework.NewCycleState()
|
||||
state.SetSkipScorePlugins(tt.skippedPlugins)
|
||||
state.SetSkipAllScorePlugins(tt.skipAllScorePlugins)
|
||||
res, status := f.RunScorePlugins(ctx, state, pod, BuildNodeInfos(nodes))
|
||||
|
||||
if tt.err {
|
||||
|
|
|
|||
|
|
@ -207,7 +207,7 @@ func (sched *Scheduler) prepareForBindingCycle(
|
|||
podsToActivate *framework.PodsToActivate,
|
||||
scheduleResult ScheduleResult,
|
||||
) (*framework.QueuedPodInfo, *fwk.Status) {
|
||||
assumedPodInfo, status := sched.assumeAndReserve(ctx, state, schedFramework, podInfo, scheduleResult)
|
||||
assumedPodInfo, status := sched.AssumeAndReserve(ctx, state, schedFramework, podInfo, scheduleResult)
|
||||
if !status.IsSuccess() {
|
||||
return assumedPodInfo, status
|
||||
}
|
||||
|
|
@ -219,7 +219,7 @@ func (sched *Scheduler) prepareForBindingCycle(
|
|||
schedFramework.AddWaitingPod(assumedPod, pluginsWaitTime)
|
||||
} else if !runPermitStatus.IsSuccess() {
|
||||
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
||||
err := sched.unreserveAndForget(ctx, state, schedFramework, assumedPodInfo, scheduleResult.SuggestedHost)
|
||||
err := sched.UnreserveAndForget(ctx, state, schedFramework, assumedPodInfo, scheduleResult.SuggestedHost)
|
||||
if err != nil {
|
||||
utilruntime.HandleErrorWithContext(ctx, err, "ForgetPod failed")
|
||||
}
|
||||
|
|
@ -251,6 +251,77 @@ func (sched *Scheduler) prepareForBindingCycle(
|
|||
return assumedPodInfo, nil
|
||||
}
|
||||
|
||||
// SchedulingTryResult holds the outcome of a TryScheduling operation.
|
||||
type SchedulingTryResult struct {
|
||||
// Pod is the pod that was tentatively scheduled.
|
||||
Pod *v1.Pod
|
||||
// ScheduleResult contains the details of the scheduling decision (e.g., suggested host).
|
||||
ScheduleResult ScheduleResult
|
||||
// Status indicates the success or failure of the scheduling operation.
|
||||
Status *fwk.Status
|
||||
// RequiresPreemption is true if the pod was only schedulable after nominating a node for preemption.
|
||||
RequiresPreemption bool
|
||||
// AssumedPodInfo contains the queued pod info after assumption and reservation.
|
||||
AssumedPodInfo *framework.QueuedPodInfo
|
||||
}
|
||||
|
||||
// TryScheduling performs a tentative scheduling of a pod by running the scheduling
|
||||
// algorithm and assuming the pod in memory.
|
||||
// It returns a revert function that can be used to undo the assumption/reservation.
|
||||
// This is primarily used in pod group scheduling to check if an entire group can fit.
|
||||
func (sched *Scheduler) TryScheduling(ctx context.Context,
|
||||
state fwk.CycleState,
|
||||
schedFramework framework.Framework,
|
||||
podInfo *framework.QueuedPodInfo,
|
||||
) (*SchedulingTryResult, func()) {
|
||||
pod := podInfo.GetPod()
|
||||
|
||||
requiresPreemption := false
|
||||
scheduleResult, status := sched.schedulingAlgorithm(ctx, state, schedFramework, podInfo, time.Now())
|
||||
if !status.IsSuccess() {
|
||||
if scheduleResult.nominatingInfo != nil && scheduleResult.nominatingInfo.NominatedNodeName != "" {
|
||||
// If the NominatedNodeName is set, the preemption is required.
|
||||
// Continue with assuming and reserving, because the subsequent pods from this group
|
||||
// have to see this one as already scheduled on its nominated place.
|
||||
// Set SuggestedHost to NominatedNodeName to handle the pod similarly to one that is feasible.
|
||||
scheduleResult.SuggestedHost = scheduleResult.nominatingInfo.NominatedNodeName
|
||||
requiresPreemption = true
|
||||
} else {
|
||||
// In case of pod being just unschedulable or having an error, just return now.
|
||||
return &SchedulingTryResult{
|
||||
Pod: pod,
|
||||
ScheduleResult: scheduleResult,
|
||||
Status: status,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
assumedPodInfo, assumeStatus := sched.AssumeAndReserve(ctx, state, schedFramework, podInfo, scheduleResult)
|
||||
if !assumeStatus.IsSuccess() {
|
||||
return &SchedulingTryResult{
|
||||
Pod: pod,
|
||||
ScheduleResult: ScheduleResult{nominatingInfo: clearNominatedNode},
|
||||
Status: assumeStatus,
|
||||
}, nil
|
||||
}
|
||||
|
||||
revertFn := func() {
|
||||
err := sched.UnreserveAndForget(ctx, state, schedFramework, assumedPodInfo, scheduleResult.SuggestedHost)
|
||||
if err != nil {
|
||||
utilruntime.HandleErrorWithContext(ctx, err, "ForgetPod failed")
|
||||
}
|
||||
}
|
||||
|
||||
return &SchedulingTryResult{
|
||||
Pod: pod,
|
||||
ScheduleResult: scheduleResult,
|
||||
Status: status,
|
||||
RequiresPreemption: requiresPreemption,
|
||||
AssumedPodInfo: assumedPodInfo,
|
||||
}, revertFn
|
||||
|
||||
}
|
||||
|
||||
// schedulingAlgorithm runs fitering and scoring phases for a single pod,
|
||||
// together with post filter when the pod is unschedulable.
|
||||
func (sched *Scheduler) schedulingAlgorithm(
|
||||
|
|
@ -310,7 +381,7 @@ func (sched *Scheduler) schedulingAlgorithm(
|
|||
}
|
||||
|
||||
// assumeAndReserve assumes and reserves the pod in scheduler's memory.
|
||||
func (sched *Scheduler) assumeAndReserve(
|
||||
func (sched *Scheduler) AssumeAndReserve(
|
||||
ctx context.Context,
|
||||
state fwk.CycleState,
|
||||
schedFramework framework.Framework,
|
||||
|
|
@ -336,7 +407,7 @@ func (sched *Scheduler) assumeAndReserve(
|
|||
// Run the Reserve method of reserve plugins.
|
||||
if sts := schedFramework.RunReservePluginsReserve(ctx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
|
||||
// trigger un-reserve to clean up state associated with the reserved Pod
|
||||
err := sched.unreserveAndForget(ctx, state, schedFramework, assumedPodInfo, scheduleResult.SuggestedHost)
|
||||
err := sched.UnreserveAndForget(ctx, state, schedFramework, assumedPodInfo, scheduleResult.SuggestedHost)
|
||||
if err != nil {
|
||||
utilruntime.HandleErrorWithContext(ctx, err, "ForgetPod failed")
|
||||
}
|
||||
|
|
@ -361,7 +432,7 @@ func (sched *Scheduler) assumeAndReserve(
|
|||
// unreserveAndForget unreserves and forgets the pod from scheduler's memory.
|
||||
// This function shouldn't be called during binding cycle with a state, where IsPodGroupSchedulingCycle is set to true,
|
||||
// but this shouldn't happen, because such pods with such state cannot reach binding.
|
||||
func (sched *Scheduler) unreserveAndForget(
|
||||
func (sched *Scheduler) UnreserveAndForget(
|
||||
ctx context.Context,
|
||||
state fwk.CycleState,
|
||||
schedFramework framework.Framework,
|
||||
|
|
@ -514,7 +585,7 @@ func (sched *Scheduler) handleBindingCycleError(
|
|||
|
||||
assumedPod := podInfo.Pod
|
||||
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
||||
if forgetErr := sched.unreserveAndForget(ctx, state, fwk, podInfo, scheduleResult.SuggestedHost); forgetErr != nil {
|
||||
if forgetErr := sched.UnreserveAndForget(ctx, state, fwk, podInfo, scheduleResult.SuggestedHost); forgetErr != nil {
|
||||
utilruntime.HandleErrorWithContext(ctx, forgetErr, "ForgetPod failed")
|
||||
} else {
|
||||
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
|
||||
|
|
|
|||
|
|
@ -416,46 +416,21 @@ func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, sche
|
|||
|
||||
logger.V(4).Info("Attempting to schedule a pod belonging to a pod group", "podGroup", klog.KObj(podGroupInfo), "pod", klog.KObj(pod))
|
||||
|
||||
requiresPreemption := false
|
||||
scheduleResult, status := sched.schedulingAlgorithm(ctx, podCtx.state, schedFwk, podInfo, start)
|
||||
if !status.IsSuccess() {
|
||||
if scheduleResult.nominatingInfo != nil && scheduleResult.nominatingInfo.NominatedNodeName != "" {
|
||||
// If the NominatedNodeName is set, the preemption is required.
|
||||
// Continue with assuming and reserving, because the subsequent pods from this group
|
||||
// have to see this one as already scheduled on its nominated place.
|
||||
// Set SuggestedHost to NominatedNodeName to handle the pod similarly to one that is feasible.
|
||||
scheduleResult.SuggestedHost = scheduleResult.nominatingInfo.NominatedNodeName
|
||||
requiresPreemption = true
|
||||
} else {
|
||||
// In case of pod being just unschedulable or having an error, just return now.
|
||||
return algorithmResult{
|
||||
pod: pod,
|
||||
scheduleResult: scheduleResult,
|
||||
podCtx: podCtx,
|
||||
schedulingDuration: time.Since(start),
|
||||
status: status,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
assumedPodInfo, assumeStatus := sched.assumeAndReserve(ctx, podCtx.state, schedFwk, podInfo, scheduleResult)
|
||||
if !assumeStatus.IsSuccess() {
|
||||
tryResult, revertFn := sched.TryScheduling(ctx, podCtx.state, schedFwk, podInfo)
|
||||
assumedPodInfo := tryResult.AssumedPodInfo
|
||||
schedRes := tryResult.ScheduleResult
|
||||
|
||||
if assumedPodInfo == nil {
|
||||
return algorithmResult{
|
||||
pod: pod,
|
||||
scheduleResult: ScheduleResult{nominatingInfo: clearNominatedNode},
|
||||
pod: tryResult.Pod,
|
||||
scheduleResult: tryResult.ScheduleResult,
|
||||
podCtx: podCtx,
|
||||
schedulingDuration: time.Since(start),
|
||||
status: assumeStatus,
|
||||
status: tryResult.Status,
|
||||
}, nil
|
||||
}
|
||||
|
||||
revertFn := func() {
|
||||
err := sched.unreserveAndForget(ctx, podCtx.state, schedFwk, assumedPodInfo, scheduleResult.SuggestedHost)
|
||||
if err != nil {
|
||||
utilruntime.HandleErrorWithContext(ctx, err, "ForgetPod failed")
|
||||
}
|
||||
}
|
||||
|
||||
_, permitStatus := schedFwk.RunPermitPlugins(ctx, podCtx.state, assumedPodInfo.Pod, scheduleResult.SuggestedHost)
|
||||
_, permitStatus := schedFwk.RunPermitPlugins(ctx, podCtx.state, assumedPodInfo.Pod, schedRes.SuggestedHost)
|
||||
if !permitStatus.IsWait() && !permitStatus.IsSuccess() {
|
||||
revertFn()
|
||||
if permitStatus.IsRejected() {
|
||||
|
|
@ -466,7 +441,7 @@ func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, sche
|
|||
NodeToStatus: framework.NewDefaultNodeToStatus(),
|
||||
},
|
||||
}
|
||||
fitErr.Diagnosis.NodeToStatus.Set(scheduleResult.SuggestedHost, permitStatus)
|
||||
fitErr.Diagnosis.NodeToStatus.Set(schedRes.SuggestedHost, permitStatus)
|
||||
fitErr.Diagnosis.AddPluginStatus(permitStatus)
|
||||
permitStatus = fwk.NewStatus(permitStatus.Code()).WithError(fitErr)
|
||||
}
|
||||
|
|
@ -481,12 +456,12 @@ func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, sche
|
|||
|
||||
return algorithmResult{
|
||||
pod: pod,
|
||||
scheduleResult: scheduleResult,
|
||||
scheduleResult: schedRes,
|
||||
podCtx: podCtx,
|
||||
schedulingDuration: time.Since(start),
|
||||
status: status,
|
||||
status: tryResult.Status,
|
||||
permitStatus: permitStatus,
|
||||
requiresPreemption: requiresPreemption,
|
||||
requiresPreemption: tryResult.RequiresPreemption,
|
||||
}, revertFn
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4902,3 +4902,222 @@ func TestEvaluateNominatedNode(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTryScheduling(t *testing.T) {
|
||||
node1 := st.MakeNode().Name("node1").Obj()
|
||||
pod1 := st.MakePod().Name("pod1").Namespace("default").UID("pod1").Obj()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
pod *v1.Pod
|
||||
filterStatus *fwk.Status
|
||||
postFilterStatus *fwk.Status
|
||||
postFilterResult *fwk.PostFilterResult
|
||||
reserveStatus *fwk.Status
|
||||
isPodGroupCycle bool
|
||||
wantSuccess bool
|
||||
wantSuggestedHost string
|
||||
wantAssumedInCache bool
|
||||
wantAssumedInSnap bool
|
||||
wantRequiresPreempt bool
|
||||
wantErrorMessage string
|
||||
}{
|
||||
{
|
||||
name: "success: pod fits on node",
|
||||
pod: pod1,
|
||||
filterStatus: fwk.NewStatus(fwk.Success),
|
||||
wantSuccess: true,
|
||||
wantSuggestedHost: "node1",
|
||||
wantAssumedInCache: true,
|
||||
},
|
||||
{
|
||||
name: "failure: algorithm finds no nodes",
|
||||
pod: pod1,
|
||||
filterStatus: fwk.NewStatus(fwk.Unschedulable, "fake failure"),
|
||||
postFilterStatus: fwk.NewStatus(fwk.Unschedulable),
|
||||
wantSuccess: false,
|
||||
wantErrorMessage: "fake failure",
|
||||
},
|
||||
{
|
||||
name: "preemption: algorithm fails but PostFilter nominates node",
|
||||
pod: pod1,
|
||||
filterStatus: fwk.NewStatus(fwk.Unschedulable, "fake failure"),
|
||||
postFilterStatus: fwk.NewStatus(fwk.Success),
|
||||
postFilterResult: &fwk.PostFilterResult{NominatingInfo: &fwk.NominatingInfo{NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride}},
|
||||
wantSuccess: false,
|
||||
wantRequiresPreempt: true,
|
||||
wantSuggestedHost: "node1",
|
||||
wantAssumedInCache: true,
|
||||
},
|
||||
{
|
||||
name: "reserve failure: algorithm succeeds but Reserve plugin fails",
|
||||
pod: pod1,
|
||||
filterStatus: fwk.NewStatus(fwk.Success),
|
||||
reserveStatus: fwk.NewStatus(fwk.Error, "reserve fake failure"),
|
||||
wantSuccess: false,
|
||||
wantErrorMessage: "reserve fake failure",
|
||||
wantAssumedInCache: false,
|
||||
},
|
||||
{
|
||||
name: "pod group cycle: assumed in snapshot only",
|
||||
pod: pod1,
|
||||
filterStatus: fwk.NewStatus(fwk.Success),
|
||||
isPodGroupCycle: true,
|
||||
wantSuccess: true,
|
||||
wantSuggestedHost: "node1",
|
||||
wantAssumedInCache: false,
|
||||
wantAssumedInSnap: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
client := clientsetfake.NewClientset(node1, tt.pod)
|
||||
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
||||
cache := internalcache.New(ctx, nil, true)
|
||||
cache.AddNode(logger, node1)
|
||||
snapshot := internalcache.NewEmptySnapshot()
|
||||
queue := internalqueue.NewTestQueue(ctx, nil)
|
||||
|
||||
podInfo, _ := framework.NewPodInfo(tt.pod)
|
||||
queuedPodInfo := &framework.QueuedPodInfo{PodInfo: podInfo}
|
||||
|
||||
fakePlugin := &trySchedulingPlugin{
|
||||
fakePodGroupPlugin: &fakePodGroupPlugin{
|
||||
filterStatus: map[string]*fwk.Status{tt.pod.Name: tt.filterStatus},
|
||||
postFilterStatus: map[string]*fwk.Status{tt.pod.Name: tt.postFilterStatus},
|
||||
postFilterResult: map[string]*fwk.PostFilterResult{tt.pod.Name: tt.postFilterResult},
|
||||
},
|
||||
reserveStatus: tt.reserveStatus,
|
||||
}
|
||||
|
||||
registry := frameworkruntime.Registry{
|
||||
queuesort.Name: queuesort.New,
|
||||
defaultbinder.Name: defaultbinder.New,
|
||||
"TrySchedulingPlugin": func(ctx context.Context, obj runtime.Object, handle fwk.Handle) (fwk.Plugin, error) {
|
||||
return fakePlugin, nil
|
||||
},
|
||||
}
|
||||
profileCfg := schedulerapi.KubeSchedulerProfile{
|
||||
SchedulerName: "default-scheduler",
|
||||
Plugins: &schedulerapi.Plugins{
|
||||
QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: queuesort.Name}}},
|
||||
Filter: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "TrySchedulingPlugin"}}},
|
||||
PostFilter: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "TrySchedulingPlugin"}}},
|
||||
Reserve: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "TrySchedulingPlugin"}}},
|
||||
Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: defaultbinder.Name}}},
|
||||
},
|
||||
}
|
||||
|
||||
schedFwk, err := frameworkruntime.NewFramework(ctx, registry, &profileCfg,
|
||||
frameworkruntime.WithInformerFactory(informerFactory),
|
||||
frameworkruntime.WithSnapshotSharedLister(snapshot),
|
||||
frameworkruntime.WithPodNominator(queue),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create framework: %v", err)
|
||||
}
|
||||
|
||||
sched := &Scheduler{
|
||||
Cache: cache,
|
||||
nodeInfoSnapshot: snapshot,
|
||||
Profiles: profile.Map{"default-scheduler": schedFwk},
|
||||
SchedulingQueue: queue,
|
||||
}
|
||||
sched.SchedulePod = sched.schedulePod
|
||||
|
||||
if err := sched.Cache.UpdateSnapshot(logger, sched.nodeInfoSnapshot); err != nil {
|
||||
t.Fatalf("Failed to update snapshot: %v", err)
|
||||
}
|
||||
|
||||
state := framework.NewCycleState()
|
||||
if tt.isPodGroupCycle {
|
||||
state.SetPodGroupSchedulingCycle(framework.NewCycleState())
|
||||
}
|
||||
|
||||
tryResult, revertFn := sched.TryScheduling(ctx, state, schedFwk, queuedPodInfo)
|
||||
|
||||
// Verify Success/Failure
|
||||
if tryResult.Status.IsSuccess() != tt.wantSuccess {
|
||||
t.Errorf("tryResult.Status.IsSuccess() = %v, want %v", tryResult.Status.IsSuccess(), tt.wantSuccess)
|
||||
}
|
||||
|
||||
// Verify Error Message
|
||||
if tt.wantErrorMessage != "" && !strings.Contains(tryResult.Status.Message(), tt.wantErrorMessage) {
|
||||
t.Errorf("tryResult.Status.Message() = %q, want it to contain %q", tryResult.Status.Message(), tt.wantErrorMessage)
|
||||
}
|
||||
|
||||
// Verify Suggested Host
|
||||
if tryResult.ScheduleResult.SuggestedHost != tt.wantSuggestedHost {
|
||||
t.Errorf("tryResult.ScheduleResult.SuggestedHost = %q, want %q", tryResult.ScheduleResult.SuggestedHost, tt.wantSuggestedHost)
|
||||
}
|
||||
|
||||
// Verify RequiresPreemption
|
||||
if tryResult.RequiresPreemption != tt.wantRequiresPreempt {
|
||||
t.Errorf("tryResult.RequiresPreemption = %v, want %v", tryResult.RequiresPreemption, tt.wantRequiresPreempt)
|
||||
}
|
||||
|
||||
// Verify Assumption in Cache
|
||||
isAssumed, _ := cache.IsAssumedPod(tt.pod)
|
||||
if isAssumed != tt.wantAssumedInCache {
|
||||
t.Errorf("cache.IsAssumedPod() = %v, want %v", isAssumed, tt.wantAssumedInCache)
|
||||
}
|
||||
|
||||
// Verify Assumption in Snapshot
|
||||
inSnap := false
|
||||
if nodeInfo, err := snapshot.Get("node1"); err == nil {
|
||||
for _, p := range nodeInfo.GetPods() {
|
||||
if p.GetPod().Name == tt.pod.Name {
|
||||
inSnap = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if inSnap != tt.wantAssumedInSnap {
|
||||
t.Errorf("pod in snapshot = %v, want %v", inSnap, tt.wantAssumedInSnap)
|
||||
}
|
||||
|
||||
// Verify Revert Function
|
||||
if (revertFn != nil) != (tt.wantAssumedInCache || tt.wantAssumedInSnap) {
|
||||
t.Errorf("revertFn is nil = %v, want %v", revertFn == nil, !(tt.wantAssumedInCache || tt.wantAssumedInSnap))
|
||||
}
|
||||
|
||||
if revertFn != nil {
|
||||
revertFn()
|
||||
isAssumed, _ = cache.IsAssumedPod(tt.pod)
|
||||
if isAssumed {
|
||||
t.Errorf("pod still assumed in cache after revert")
|
||||
}
|
||||
inSnap = false
|
||||
if nodeInfo, err := snapshot.Get("node1"); err == nil {
|
||||
for _, p := range nodeInfo.GetPods() {
|
||||
if p.GetPod().Name == tt.pod.Name {
|
||||
inSnap = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if inSnap {
|
||||
t.Errorf("pod still in snapshot after revert")
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// trySchedulingPlugin is a mock plugin used in TestTryScheduling to control
|
||||
// the outcome of various extension points.
|
||||
type trySchedulingPlugin struct {
|
||||
*fakePodGroupPlugin
|
||||
reserveStatus *fwk.Status
|
||||
}
|
||||
|
||||
func (p *trySchedulingPlugin) Reserve(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status {
|
||||
if p.reserveStatus != nil {
|
||||
return p.reserveStatus
|
||||
}
|
||||
return fwk.NewStatus(fwk.Success)
|
||||
}
|
||||
func (p *trySchedulingPlugin) Unreserve(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) {
|
||||
}
|
||||
|
|
|
|||
|
|
@ -76,6 +76,10 @@ type CycleState interface {
|
|||
// ShouldSkipAllPostFilterPlugins returns whether all plugins should be skipped in the PostFilter extension point.
|
||||
// This function is mostly for the scheduling framework runtime, plugins usually don't have to use it.
|
||||
ShouldSkipAllPostFilterPlugins() bool
|
||||
// SetSkipAllScorePlugins sets whether to skip all plugins in the Score extension point.
|
||||
SetSkipAllScorePlugins(flag bool)
|
||||
// ShouldSkipAllScorePlugins returns whether to skip all plugins in the Score extension point.
|
||||
ShouldSkipAllScorePlugins() bool
|
||||
|
||||
// Read retrieves data with the given "key" from CycleState. If the key is not
|
||||
// present, ErrNotFound is returned.
|
||||
|
|
|
|||
Loading…
Reference in a new issue