mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-05-28 04:04:39 -04:00
Add PlacementFeasible plugin to support early gang termination
This commit is contained in:
parent
865560c3d2
commit
1e1bad1dde
7 changed files with 575 additions and 23 deletions
|
|
@ -180,6 +180,21 @@ type PodGroupPostFilterPlugin interface {
|
|||
PodGroupPostFilter(ctx context.Context, pg *v1alpha3.PodGroup, pods []*v1.Pod, pgSchedulingFunc PodGroupSchedulingFunc) (*PodGroupPostFilterResult, *fwk.Status)
|
||||
}
|
||||
|
||||
// PlacementFeasiblePlugin is an interface for plugins that are called after each pod in a pod group is evaluated.
|
||||
// It is used to determine if a pod group is schedulable, may become schedulable or will not become schedulable regardless of the scheduling result of the remaining pods in the pod group.
|
||||
type PlacementFeasiblePlugin interface {
|
||||
fwk.Plugin
|
||||
|
||||
// PlacementFeasible is called after each pod in a pod group is evaluated.
|
||||
// Use placementCycleState to accumulate the results from the evaluated pods in current cycle.
|
||||
// Return Unschedulable status if the pod group cannot be scheduled in the current state, but may become schedulable once more pods are evaluated.
|
||||
// Return UnschedulableAndUnresolvable status if the pod group cannot be scheduled in the current placement.
|
||||
// The scheduler will give up this placement and won't even evaluate remaining pods. The placement will remain eligible for preemption.
|
||||
// Return Success status if the pod group can be scheduled in the current state.
|
||||
// After returning Success, the plugin should keep returning Success for the remaining pods.
|
||||
PlacementFeasible(ctx context.Context, placementCycleState fwk.PodGroupCycleState, podGroupInfo fwk.PodGroupInfo) *fwk.Status
|
||||
}
|
||||
|
||||
// Framework manages the set of plugins in use by the scheduling framework.
|
||||
// Configured plugins are called at specified points in a scheduling context.
|
||||
type Framework interface {
|
||||
|
|
@ -262,6 +277,14 @@ type Framework interface {
|
|||
// This function itself will NOT create a waiting pod object and the caller should call AddWaitingPod method to do this.
|
||||
RunPermitPlugins(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (pluginsWaitTime map[string]time.Duration, status *fwk.Status)
|
||||
|
||||
// RunPlacementFeasiblePlugins runs the set of configured Permit plugins that implement PlacementFeasible interface.
|
||||
// The result will be Success if all plugins return Success.
|
||||
// The only other valid statuses are UnschedulableAndUnresolvable and Unschedulable.
|
||||
// If any plugin returns invalid status, the result will be Error and the remaining plugins won't be invoked.
|
||||
// Otherwise, if at least 1 plugin returns UnschedulableAndUnresolvable, the remaining plugins won't be invoked and the result will be UnschdulableAndUnresolvable. The placement will remain eligible for preemption.
|
||||
// Otherwise, if at least 1 plugin returns Unschedulable, the remaining plugins will be invoked and the result will be Unschedulable.
|
||||
RunPlacementFeasiblePlugins(ctx context.Context, placementCycleState fwk.PodGroupCycleState, podGroupInfo fwk.PodGroupInfo) *fwk.Status
|
||||
|
||||
// AddWaitingPod creates a waiting pod instance and adds it to the framework.
|
||||
// It takes the pluginsWaitTime map returned by the RunPermitPlugins.
|
||||
// Pod will remain waiting pod for the minimum duration returned by the Permit plugins.
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ import (
|
|||
schedulinglisters "k8s.io/client-go/listers/scheduling/v1alpha3"
|
||||
"k8s.io/klog/v2"
|
||||
fwk "k8s.io/kube-scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
|
||||
|
|
@ -54,6 +55,7 @@ type GangScheduling struct {
|
|||
var _ fwk.EnqueueExtensions = &GangScheduling{}
|
||||
var _ fwk.PreEnqueuePlugin = &GangScheduling{}
|
||||
var _ fwk.PermitPlugin = &GangScheduling{}
|
||||
var _ framework.PlacementFeasiblePlugin = &GangScheduling{}
|
||||
|
||||
// New initializes a new plugin and returns it.
|
||||
func New(_ context.Context, _ runtime.Object, fh fwk.Handle, fts feature.Features) (fwk.Plugin, error) {
|
||||
|
|
@ -182,15 +184,7 @@ func (pl *GangScheduling) Permit(ctx context.Context, state fwk.CycleState, pod
|
|||
return nil, 0
|
||||
}
|
||||
|
||||
// Select a lister for the pod group state based on the currently executed scheduling phase.
|
||||
// In the pod group scheduling cycle, it reads from the snapshot.
|
||||
// Otherwise, it reads the runtime state of the pod group from the cache.
|
||||
var podGroupStateLister fwk.PodGroupStateLister
|
||||
if state.IsPodGroupSchedulingCycle() {
|
||||
podGroupStateLister = pl.snapshotLister.PodGroupStates()
|
||||
} else {
|
||||
podGroupStateLister = pl.podGroupManager.PodGroupStates()
|
||||
}
|
||||
podGroupStateLister := pl.podGroupManager.PodGroupStates()
|
||||
podGroupState, err := podGroupStateLister.Get(namespace, *schedulingGroup.PodGroupName)
|
||||
if err != nil {
|
||||
return fwk.AsStatus(err), 0
|
||||
|
|
@ -217,3 +211,71 @@ func (pl *GangScheduling) Permit(ctx context.Context, state fwk.CycleState, pod
|
|||
|
||||
return nil, 0
|
||||
}
|
||||
|
||||
const placementFeasibleStateKey = "PlacementFeasible" + Name
|
||||
|
||||
type placementFeasibleState struct {
|
||||
evaluated, succeeded int
|
||||
}
|
||||
|
||||
func (s *placementFeasibleState) Clone() fwk.StateData {
|
||||
return &placementFeasibleState{
|
||||
evaluated: s.evaluated,
|
||||
succeeded: s.succeeded,
|
||||
}
|
||||
}
|
||||
|
||||
func getPlacementFeasibleState(placementCycleState fwk.PodGroupCycleState) *placementFeasibleState {
|
||||
state, err := placementCycleState.Read(placementFeasibleStateKey)
|
||||
if err != nil {
|
||||
state = &placementFeasibleState{}
|
||||
placementCycleState.Write(placementFeasibleStateKey, state)
|
||||
}
|
||||
return state.(*placementFeasibleState)
|
||||
}
|
||||
|
||||
// PlacementFeasible is responsible for enforcing the gang's MinCount constraint.
|
||||
// The function will only return success once the gang's MinCount is satisfied or if the pod group is not using gang scheduling policy.
|
||||
// In case there are not enough remaining pods to satisfy the gang's MinCount, it returns UnschedulableAndUnresolvable which will terminate the pod group scheduling cycle early.
|
||||
func (pl *GangScheduling) PlacementFeasible(ctx context.Context, placementCycleState fwk.PodGroupCycleState, podGroupInfo fwk.PodGroupInfo) *fwk.Status {
|
||||
pg, err := pl.podGroupLister.PodGroups(podGroupInfo.GetNamespace()).Get(podGroupInfo.GetName())
|
||||
if err != nil {
|
||||
return fwk.AsStatus(fmt.Errorf("failed to get podGroup %s to compute gang feasibility: %w", klog.KObj(podGroupInfo), err))
|
||||
}
|
||||
|
||||
gangPolicy := pg.Spec.SchedulingPolicy.Gang
|
||||
// This plugin only cares about pods with a Gang scheduling policy.
|
||||
if gangPolicy == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
podGroupState, err := pl.snapshotLister.PodGroupStates().Get(podGroupInfo.GetNamespace(), podGroupInfo.GetName())
|
||||
if err != nil {
|
||||
return fwk.AsStatus(fmt.Errorf("failed to get podGroup state for podGroup %s to compute gang feasibility: %w", klog.KObj(pg), err))
|
||||
}
|
||||
|
||||
// We need to keep track of how many pods have already been evaluated in the current PodGroup scheduling cycle.
|
||||
pgState := getPlacementFeasibleState(placementCycleState)
|
||||
pgState.evaluated++
|
||||
|
||||
// remaining is the number of unscheduled pods that haven't been evaluated yet in the current PodGroup scheduling cycle.
|
||||
remaining := len(podGroupInfo.GetUnscheduledPods()) - pgState.evaluated
|
||||
|
||||
// scheduled includes the pods that are assigned or assumed in the current PodGroup scheduling cycle.
|
||||
scheduled := podGroupState.ScheduledPodsCount()
|
||||
|
||||
minCount := int(gangPolicy.MinCount)
|
||||
|
||||
if remaining+scheduled < minCount {
|
||||
// minCount can't be satisfied because there are not enough remaining pods.
|
||||
return fwk.NewStatus(fwk.UnschedulableAndUnresolvable, fmt.Sprintf("minCount (%d) cannot be satisfied: %d scheduled, %d remaining", minCount, scheduled, remaining))
|
||||
}
|
||||
|
||||
if scheduled < minCount {
|
||||
// minCount might be satisfied once more remaining pods are evaluated.
|
||||
return fwk.NewStatus(fwk.Unschedulable, fmt.Sprintf("minCount (%d) is not yet satisfied: %d scheduled, %d remaining", minCount, scheduled, remaining))
|
||||
}
|
||||
|
||||
// minCount is satisfied.
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -167,6 +167,31 @@ func (pam *podActivatorMock) Activate(_ klog.Logger, pods map[string]*v1.Pod) {
|
|||
}
|
||||
}
|
||||
|
||||
type mockPodGroupState struct {
|
||||
fwk.PodGroupState
|
||||
scheduledPodsCount int
|
||||
}
|
||||
|
||||
func (m *mockPodGroupState) ScheduledPodsCount() int { return m.scheduledPodsCount }
|
||||
|
||||
type mockPodGroupStateLister struct {
|
||||
state *mockPodGroupState
|
||||
err error
|
||||
}
|
||||
|
||||
func (m *mockPodGroupStateLister) Get(namespace, podGroupName string) (fwk.PodGroupState, error) {
|
||||
return m.state, m.err
|
||||
}
|
||||
|
||||
type mockSharedLister struct {
|
||||
fwk.SharedLister
|
||||
podGroupStateLister *mockPodGroupStateLister
|
||||
}
|
||||
|
||||
func (m *mockSharedLister) PodGroupStates() fwk.PodGroupStateLister {
|
||||
return m.podGroupStateLister
|
||||
}
|
||||
|
||||
func TestGangSchedulingFlow(t *testing.T) {
|
||||
gangPodGroup1 := st.MakePodGroup().Namespace("ns1").Name("pg1").TemplateRef("t1", "gang-wl").MinCount(3).Obj()
|
||||
gangPodGroup2 := st.MakePodGroup().Namespace("ns1").Name("pg2").TemplateRef("t2", "gang-wl").MinCount(4).Obj()
|
||||
|
|
@ -244,17 +269,6 @@ func TestGangSchedulingFlow(t *testing.T) {
|
|||
wantPermitStatus: nil,
|
||||
wantAllowedPods: []types.UID{"p1", "p2", "p3"},
|
||||
},
|
||||
{
|
||||
name: "final gang pod arrives at Permit during pod group scheduling cycle",
|
||||
pod: p1,
|
||||
initialPods: []*v1.Pod{p2, p3, p4, p5},
|
||||
initialPodGroups: []*schedulingapi.PodGroup{gangPodGroup1, gangPodGroup2},
|
||||
podsWaitingOnPermit: []*v1.Pod{p2, p3, p4, p5},
|
||||
isDuringPodGroupSchedulingCycle: true,
|
||||
wantPreEnqueueStatus: nil,
|
||||
wantPermitStatus: nil,
|
||||
wantAllowedPods: []types.UID{"p1", "p2", "p3"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
|
|
@ -380,3 +394,262 @@ func TestGangSchedulingFlow(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPlacementFeasible(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
minCount int32
|
||||
unscheduledPods []*v1.Pod
|
||||
podStatuses []fwk.Code
|
||||
expectedStatuses []fwk.Code
|
||||
initialScheduledCount int
|
||||
}{
|
||||
{
|
||||
name: "All pods succeed, minCount met at end",
|
||||
minCount: 2,
|
||||
unscheduledPods: []*v1.Pod{
|
||||
st.MakePod().Name("p1").Obj(),
|
||||
st.MakePod().Name("p2").Obj(),
|
||||
},
|
||||
podStatuses: []fwk.Code{
|
||||
fwk.Success,
|
||||
fwk.Success,
|
||||
},
|
||||
expectedStatuses: []fwk.Code{
|
||||
fwk.Unschedulable,
|
||||
fwk.Success,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "First pod fails, minCount not satisfiable",
|
||||
minCount: 3,
|
||||
unscheduledPods: []*v1.Pod{
|
||||
st.MakePod().Name("p1").Obj(),
|
||||
st.MakePod().Name("p2").Obj(),
|
||||
st.MakePod().Name("p3").Obj(),
|
||||
},
|
||||
podStatuses: []fwk.Code{
|
||||
fwk.Unschedulable,
|
||||
},
|
||||
expectedStatuses: []fwk.Code{
|
||||
fwk.UnschedulableAndUnresolvable,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Second pod fails, minCount not satisfiable",
|
||||
minCount: 2,
|
||||
unscheduledPods: []*v1.Pod{
|
||||
st.MakePod().Name("p1").Obj(),
|
||||
st.MakePod().Name("p2").Obj(),
|
||||
},
|
||||
podStatuses: []fwk.Code{
|
||||
fwk.Success,
|
||||
fwk.Unschedulable,
|
||||
},
|
||||
expectedStatuses: []fwk.Code{
|
||||
fwk.Unschedulable,
|
||||
fwk.UnschedulableAndUnresolvable,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Non-gang pod group ignored",
|
||||
minCount: 0, // No gang policy
|
||||
unscheduledPods: []*v1.Pod{st.MakePod().Name("p1").Obj()},
|
||||
podStatuses: []fwk.Code{
|
||||
fwk.Unschedulable,
|
||||
},
|
||||
expectedStatuses: []fwk.Code{
|
||||
fwk.Success,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "More than minCount pods, all succeed",
|
||||
minCount: 2,
|
||||
unscheduledPods: []*v1.Pod{
|
||||
st.MakePod().Name("p1").Obj(),
|
||||
st.MakePod().Name("p2").Obj(),
|
||||
st.MakePod().Name("p3").Obj(),
|
||||
},
|
||||
podStatuses: []fwk.Code{
|
||||
fwk.Success,
|
||||
fwk.Success,
|
||||
fwk.Success,
|
||||
},
|
||||
expectedStatuses: []fwk.Code{
|
||||
fwk.Unschedulable,
|
||||
fwk.Success,
|
||||
fwk.Success,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "More than minCount pods, first fails",
|
||||
minCount: 2,
|
||||
unscheduledPods: []*v1.Pod{
|
||||
st.MakePod().Name("p1").Obj(),
|
||||
st.MakePod().Name("p2").Obj(),
|
||||
st.MakePod().Name("p3").Obj(),
|
||||
},
|
||||
podStatuses: []fwk.Code{
|
||||
fwk.Unschedulable,
|
||||
fwk.Success,
|
||||
fwk.Success,
|
||||
},
|
||||
expectedStatuses: []fwk.Code{
|
||||
fwk.Unschedulable,
|
||||
fwk.Unschedulable,
|
||||
fwk.Success,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "More than minCount pods, minCount not satisfiable",
|
||||
minCount: 2,
|
||||
unscheduledPods: []*v1.Pod{
|
||||
st.MakePod().Name("p1").Obj(),
|
||||
st.MakePod().Name("p2").Obj(),
|
||||
st.MakePod().Name("p3").Obj(),
|
||||
},
|
||||
podStatuses: []fwk.Code{
|
||||
fwk.Unschedulable,
|
||||
fwk.Unschedulable,
|
||||
fwk.Success,
|
||||
},
|
||||
expectedStatuses: []fwk.Code{
|
||||
fwk.Unschedulable,
|
||||
fwk.UnschedulableAndUnresolvable,
|
||||
fwk.UnschedulableAndUnresolvable,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "1 pod scheduled, 2 unscheduled pods succeed, minCount 3 met",
|
||||
minCount: 3,
|
||||
unscheduledPods: []*v1.Pod{
|
||||
st.MakePod().Name("p1").Obj(),
|
||||
st.MakePod().Name("p2").Obj(),
|
||||
},
|
||||
podStatuses: []fwk.Code{
|
||||
fwk.Success,
|
||||
fwk.Success,
|
||||
},
|
||||
expectedStatuses: []fwk.Code{
|
||||
fwk.Unschedulable,
|
||||
fwk.Success,
|
||||
},
|
||||
initialScheduledCount: 1,
|
||||
},
|
||||
{
|
||||
name: "minCount already met by scheduled pods",
|
||||
minCount: 2,
|
||||
unscheduledPods: []*v1.Pod{
|
||||
st.MakePod().Name("p1").Obj(),
|
||||
},
|
||||
podStatuses: []fwk.Code{
|
||||
fwk.Unschedulable,
|
||||
},
|
||||
expectedStatuses: []fwk.Code{
|
||||
fwk.Success,
|
||||
},
|
||||
initialScheduledCount: 2,
|
||||
},
|
||||
{
|
||||
name: "1 pod scheduled, minCount 3, first unscheduled fails, not enough remaining",
|
||||
minCount: 3,
|
||||
unscheduledPods: []*v1.Pod{
|
||||
st.MakePod().Name("p1").Obj(),
|
||||
st.MakePod().Name("p2").Obj(),
|
||||
},
|
||||
podStatuses: []fwk.Code{
|
||||
fwk.Unschedulable,
|
||||
},
|
||||
expectedStatuses: []fwk.Code{
|
||||
fwk.UnschedulableAndUnresolvable,
|
||||
},
|
||||
initialScheduledCount: 1,
|
||||
},
|
||||
{
|
||||
name: "1 pod scheduled, minCount 4, first unscheduled succeeds, not enough remaining",
|
||||
minCount: 4,
|
||||
unscheduledPods: []*v1.Pod{
|
||||
st.MakePod().Name("p1").Obj(),
|
||||
st.MakePod().Name("p2").Obj(),
|
||||
},
|
||||
podStatuses: []fwk.Code{
|
||||
fwk.Success,
|
||||
},
|
||||
expectedStatuses: []fwk.Code{
|
||||
fwk.UnschedulableAndUnresolvable,
|
||||
},
|
||||
initialScheduledCount: 1,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
|
||||
pgName := "test-pg"
|
||||
namespace := "default"
|
||||
pg := st.MakePodGroup().Namespace(namespace).Name(pgName).Obj()
|
||||
if tc.minCount > 0 {
|
||||
pg.Spec.SchedulingPolicy.Gang = &schedulingapi.GangSchedulingPolicy{MinCount: tc.minCount}
|
||||
} else {
|
||||
pg.Spec.SchedulingPolicy.Basic = &schedulingapi.BasicSchedulingPolicy{}
|
||||
}
|
||||
|
||||
informerFactory := informers.NewSharedInformerFactory(fake.NewClientset(pg), 0)
|
||||
informerFactory.Scheduling().V1alpha2().PodGroups().Informer()
|
||||
informerFactory.StartWithContext(ctx)
|
||||
informerFactory.WaitForCacheSyncWithContext(ctx)
|
||||
|
||||
mockState := &mockPodGroupState{scheduledPodsCount: tc.initialScheduledCount}
|
||||
mockLister := &mockSharedLister{
|
||||
podGroupStateLister: &mockPodGroupStateLister{state: mockState},
|
||||
}
|
||||
|
||||
fh, err := frameworkruntime.NewFramework(ctx, nil, nil,
|
||||
frameworkruntime.WithInformerFactory(informerFactory),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create framework: %v", err)
|
||||
}
|
||||
|
||||
p, err := New(ctx, nil, fh, feature.Features{EnableGangScheduling: true})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create plugin: %v", err)
|
||||
}
|
||||
pl := p.(*GangScheduling)
|
||||
|
||||
// Inject the mock lister
|
||||
pl.snapshotLister = mockLister
|
||||
|
||||
pgInfo := &testPodGroupInfo{
|
||||
namespace: namespace,
|
||||
name: pgName,
|
||||
unscheduledPods: tc.unscheduledPods,
|
||||
}
|
||||
|
||||
cycleState := schedulerframework.NewCycleState()
|
||||
|
||||
for i, code := range tc.podStatuses {
|
||||
if code == fwk.Success {
|
||||
mockState.scheduledPodsCount++
|
||||
}
|
||||
|
||||
gotStatus := pl.PlacementFeasible(ctx, cycleState, pgInfo)
|
||||
|
||||
if gotCode := gotStatus.Code(); gotCode != tc.expectedStatuses[i] {
|
||||
t.Errorf("Step %d: expected status %v, got %v", i, tc.expectedStatuses[i], gotCode)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type testPodGroupInfo struct {
|
||||
namespace string
|
||||
name string
|
||||
unscheduledPods []*v1.Pod
|
||||
}
|
||||
|
||||
func (t *testPodGroupInfo) GetNamespace() string { return t.namespace }
|
||||
func (t *testPodGroupInfo) GetName() string { return t.name }
|
||||
func (t *testPodGroupInfo) GetUnscheduledPods() []*v1.Pod { return t.unscheduledPods }
|
||||
|
|
|
|||
|
|
@ -78,6 +78,7 @@ type frameworkImpl struct {
|
|||
podGroupPostFilterPlugins []framework.PodGroupPostFilterPlugin
|
||||
|
||||
placementGeneratePlugins []fwk.PlacementGeneratePlugin
|
||||
placementFeasiblePlugins []framework.PlacementFeasiblePlugin
|
||||
placementScorePlugins []fwk.PlacementScorePlugin
|
||||
placementScorePluginWeight map[string]int
|
||||
|
||||
|
|
@ -487,6 +488,17 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler
|
|||
}
|
||||
}
|
||||
|
||||
// Use GangScheduling plugin as the only PlacementFeasiblePlugin.
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.GenericWorkload) {
|
||||
if gs, ok := f.pluginsMap[names.GangScheduling]; ok {
|
||||
if p, ok := gs.(framework.PlacementFeasiblePlugin); ok {
|
||||
f.placementFeasiblePlugins = append(f.placementFeasiblePlugins, p)
|
||||
} else {
|
||||
logger.V(2).Info("GenericWorkload is enabled, but GangScheduling plugin does not fulfill PlacementFeasiblePlugin interface.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if options.captureProfile != nil {
|
||||
if len(outputProfile.PluginConfig) != 0 {
|
||||
sort.Slice(outputProfile.PluginConfig, func(i, j int) bool {
|
||||
|
|
@ -1996,6 +2008,49 @@ func (f *frameworkImpl) runPermitPlugin(ctx context.Context, pl fwk.PermitPlugin
|
|||
return status, timeout
|
||||
}
|
||||
|
||||
// RunPlacementFeasiblePlugins runs the set of configured Permit plugins that implement PlacementFeasible interface.
|
||||
// The result will be Success if all plugins return Success.
|
||||
// The only other valid statuses are UnschedulableAndUnresolvable and Unschedulable.
|
||||
// If any plugin returns invalid status, the result will be Error and the remaining plugins won't be invoked.
|
||||
// Otherwise, if at least 1 plugin returns UnschedulableAndUnresolvable, the remaining plugins won't be invoked and the result will be UnschdulableAndUnresolvable.
|
||||
// Otherwise, if at least 1 plugin returns Unschedulable, the remaining plugins will be invoked and the result will be Unschedulable.
|
||||
func (f *frameworkImpl) RunPlacementFeasiblePlugins(ctx context.Context, placementCycleState fwk.PodGroupCycleState, podGroupInfo fwk.PodGroupInfo) (status *fwk.Status) {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PlacementFeasible, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
|
||||
}()
|
||||
|
||||
for _, pl := range f.placementFeasiblePlugins {
|
||||
plStatus := f.runPlacementFeasiblePlugin(ctx, pl, placementCycleState, podGroupInfo)
|
||||
if plStatus.IsSuccess() {
|
||||
continue
|
||||
}
|
||||
if plStatus.Code() == fwk.Unschedulable {
|
||||
status = plStatus.WithPlugin(pl.Name())
|
||||
continue
|
||||
}
|
||||
if plStatus.Code() == fwk.UnschedulableAndUnresolvable {
|
||||
return plStatus.WithPlugin(pl.Name())
|
||||
}
|
||||
if plStatus.IsError() {
|
||||
return fwk.AsStatus(fmt.Errorf("running PlacementFeasible plugin: %w", plStatus.AsError())).WithPlugin(pl.Name())
|
||||
}
|
||||
return fwk.AsStatus(fmt.Errorf("unexpected status from PlacementFeasible plugin: %v", plStatus.Code())).WithPlugin(pl.Name())
|
||||
}
|
||||
|
||||
return status
|
||||
}
|
||||
|
||||
func (f *frameworkImpl) runPlacementFeasiblePlugin(ctx context.Context, pl framework.PlacementFeasiblePlugin, state fwk.PodGroupCycleState, podGroup fwk.PodGroupInfo) *fwk.Status {
|
||||
if !state.ShouldRecordPluginMetrics() {
|
||||
return pl.PlacementFeasible(ctx, state, podGroup)
|
||||
}
|
||||
startTime := time.Now()
|
||||
status := pl.PlacementFeasible(ctx, state, podGroup)
|
||||
f.metricsRecorder.ObservePluginDurationAsync(metrics.PlacementFeasible, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
|
||||
return status
|
||||
}
|
||||
|
||||
// AddWaitingPod creates a waiting pod instance and adds it to the framework.
|
||||
// It takes the pluginsWaitTime map returned by the RunPermitPlugins.
|
||||
// Pod will remain waiting pod for the minimum duration returned by the Permit plugins.
|
||||
|
|
|
|||
|
|
@ -228,6 +228,10 @@ func (pl *TestPlugin) PreFilter(ctx context.Context, state fwk.CycleState, p *v1
|
|||
return pl.inj.PreFilterResult, fwk.NewStatus(fwk.Code(pl.inj.PreFilterStatus), injectReason)
|
||||
}
|
||||
|
||||
func (pl *TestPlugin) PlacementFeasible(ctx context.Context, state fwk.PodGroupCycleState, podGroup fwk.PodGroupInfo) *fwk.Status {
|
||||
return fwk.NewStatus(fwk.Code(pl.inj.PlacementFeasibleStatus), injectReason)
|
||||
}
|
||||
|
||||
func (pl *TestPlugin) PreFilterExtensions() fwk.PreFilterExtensions {
|
||||
return pl
|
||||
}
|
||||
|
|
@ -783,6 +787,116 @@ func TestPodGroupPostFilterPlugins(t *testing.T) {
|
|||
|
||||
}
|
||||
|
||||
type mockPlacementFeasiblePlugin struct {
|
||||
name string
|
||||
status *fwk.Status
|
||||
called bool
|
||||
}
|
||||
|
||||
func (p *mockPlacementFeasiblePlugin) Name() string { return p.name }
|
||||
|
||||
func (p *mockPlacementFeasiblePlugin) PlacementFeasible(ctx context.Context, state fwk.PodGroupCycleState, podGroup fwk.PodGroupInfo) *fwk.Status {
|
||||
p.called = true
|
||||
return p.status
|
||||
}
|
||||
|
||||
func TestRunPlacementFeasiblePlugins(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
plugins []*mockPlacementFeasiblePlugin
|
||||
expectedStatus *fwk.Status
|
||||
expectedCalled []bool
|
||||
}{
|
||||
{
|
||||
name: "All plugins succeed",
|
||||
plugins: []*mockPlacementFeasiblePlugin{
|
||||
{name: "p1", status: nil},
|
||||
{name: "p2", status: nil},
|
||||
},
|
||||
expectedStatus: nil,
|
||||
expectedCalled: []bool{true, true},
|
||||
},
|
||||
{
|
||||
name: "First plugin returns Unschedulable, continues",
|
||||
plugins: []*mockPlacementFeasiblePlugin{
|
||||
{name: "p1", status: fwk.NewStatus(fwk.Unschedulable, "unschedulable")},
|
||||
{name: "p2", status: nil},
|
||||
},
|
||||
expectedStatus: fwk.NewStatus(fwk.Unschedulable, "unschedulable").WithPlugin("p1"),
|
||||
expectedCalled: []bool{true, true},
|
||||
},
|
||||
{
|
||||
name: "First plugin returns UnschedulableAndUnresolvable, breaks",
|
||||
plugins: []*mockPlacementFeasiblePlugin{
|
||||
{name: "p1", status: fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "unresolvable")},
|
||||
{name: "p2", status: nil},
|
||||
},
|
||||
expectedStatus: fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "unresolvable").WithPlugin("p1"),
|
||||
expectedCalled: []bool{true, false},
|
||||
},
|
||||
{
|
||||
name: "First plugin returns Unschedulable, second returns UnschedulableAndUnresolvable, returns unresolvable",
|
||||
plugins: []*mockPlacementFeasiblePlugin{
|
||||
{name: "p1", status: fwk.NewStatus(fwk.Unschedulable, "unschedulable")},
|
||||
{name: "p2", status: fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "unresolvable")},
|
||||
},
|
||||
expectedStatus: fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "unresolvable").WithPlugin("p2"),
|
||||
expectedCalled: []bool{true, true},
|
||||
},
|
||||
{
|
||||
name: "First plugin returns Unschedulable, second returns Unschedulable, returns last unschedulable",
|
||||
plugins: []*mockPlacementFeasiblePlugin{
|
||||
{name: "p1", status: fwk.NewStatus(fwk.Unschedulable, "unschedulable1")},
|
||||
{name: "p2", status: fwk.NewStatus(fwk.Unschedulable, "unschedulable2")},
|
||||
},
|
||||
expectedStatus: fwk.NewStatus(fwk.Unschedulable, "unschedulable2").WithPlugin("p2"),
|
||||
expectedCalled: []bool{true, true},
|
||||
},
|
||||
{
|
||||
name: "Plugin returns Error, breaks",
|
||||
plugins: []*mockPlacementFeasiblePlugin{
|
||||
{name: "p1", status: fwk.NewStatus(fwk.Error, "error")},
|
||||
{name: "p2", status: nil},
|
||||
},
|
||||
expectedStatus: fwk.AsStatus(fmt.Errorf("running PlacementFeasible plugin: %w", errors.New("error"))).WithPlugin("p1"),
|
||||
expectedCalled: []bool{true, false},
|
||||
},
|
||||
{
|
||||
name: "Plugin returns unexpected status, breaks",
|
||||
plugins: []*mockPlacementFeasiblePlugin{
|
||||
{name: "p1", status: fwk.NewStatus(fwk.Skip, "error")},
|
||||
{name: "p2", status: nil},
|
||||
},
|
||||
expectedStatus: fwk.AsStatus(fmt.Errorf("unexpected status from PlacementFeasible plugin: Skip")).WithPlugin("p1"),
|
||||
expectedCalled: []bool{true, false},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
f := &frameworkImpl{
|
||||
placementFeasiblePlugins: make([]framework.PlacementFeasiblePlugin, len(tc.plugins)),
|
||||
}
|
||||
for i, p := range tc.plugins {
|
||||
f.placementFeasiblePlugins[i] = p
|
||||
}
|
||||
|
||||
status := f.RunPlacementFeasiblePlugins(ctx, framework.NewCycleState(), nil)
|
||||
|
||||
if diff := cmp.Diff(tc.expectedStatus, status, statusCmpOpts...); diff != "" {
|
||||
t.Errorf("Unexpected status (-want, +got):\n%s", diff)
|
||||
}
|
||||
|
||||
for i, p := range tc.plugins {
|
||||
if p.called != tc.expectedCalled[i] {
|
||||
t.Errorf("Expected plugin %s called=%v, got %v", p.name, tc.expectedCalled[i], p.called)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewFrameworkMultiPointExpansion(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
|
@ -3557,6 +3671,14 @@ func TestRecordingMetrics(t *testing.T) {
|
|||
wantExtensionPoint: "PlacementScore",
|
||||
wantStatus: fwk.Success,
|
||||
},
|
||||
{
|
||||
name: "PlacementFeasible - Success",
|
||||
action: func(ctx context.Context, f framework.Framework) {
|
||||
f.RunPlacementFeasiblePlugins(ctx, state, nil)
|
||||
},
|
||||
wantExtensionPoint: "PlacementFeasible",
|
||||
wantStatus: fwk.Success,
|
||||
},
|
||||
|
||||
{
|
||||
name: "PreFilter - Error",
|
||||
|
|
@ -3634,6 +3756,15 @@ func TestRecordingMetrics(t *testing.T) {
|
|||
wantExtensionPoint: "PlacementScore",
|
||||
wantStatus: fwk.Error,
|
||||
},
|
||||
{
|
||||
name: "PlacementFeasible - Error",
|
||||
action: func(ctx context.Context, f framework.Framework) {
|
||||
f.RunPlacementFeasiblePlugins(ctx, state, nil)
|
||||
},
|
||||
inject: injectedResult{PlacementFeasibleStatus: int(fwk.Error)},
|
||||
wantExtensionPoint: "PlacementFeasible",
|
||||
wantStatus: fwk.Error,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
|
|
@ -3683,6 +3814,10 @@ func TestRecordingMetrics(t *testing.T) {
|
|||
_ = f.Close()
|
||||
}()
|
||||
|
||||
if tt.wantExtensionPoint == "PlacementFeasible" {
|
||||
f.(*frameworkImpl).placementFeasiblePlugins = []framework.PlacementFeasiblePlugin{plugin}
|
||||
}
|
||||
|
||||
tt.action(ctx, f)
|
||||
|
||||
// Stop the goroutine which records metrics and ensure it's stopped.
|
||||
|
|
@ -4074,6 +4209,7 @@ type injectedResult struct {
|
|||
GeneratePlacementsResult []*fwk.Placement `json:"generatePlacementsResult,omitempty"`
|
||||
GeneratePlacementsStatus int `json:"generatePlacementsStatus,omitempty"`
|
||||
PlacementScoreStatus int `json:"placementScoreStatus,omitempty"`
|
||||
PlacementFeasibleStatus int `json:"placementFeasibleStatus,omitempty"`
|
||||
}
|
||||
|
||||
func setScoreRes(inj injectedResult) (int64, *fwk.Status) {
|
||||
|
|
|
|||
|
|
@ -65,6 +65,7 @@ var ExtensionPoints = []string{
|
|||
Permit,
|
||||
Sign,
|
||||
PlacementGenerate,
|
||||
PlacementFeasible,
|
||||
}
|
||||
|
||||
const (
|
||||
|
|
@ -85,6 +86,7 @@ const (
|
|||
Permit = "Permit"
|
||||
Sign = "Sign"
|
||||
PlacementGenerate = "PlacementGenerate"
|
||||
PlacementFeasible = "PlacementFeasible"
|
||||
PlacementScore = "PlacementScore"
|
||||
PlacementScoreExtensionNormalize = "PlacementScoreExtensionNormalize"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -830,11 +830,12 @@ func TestPostFilterInvocationCount(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// 5. Verify that MockPostFilter was called exactly 3 times
|
||||
// It should be called for each pod from pod group in pod group cycle
|
||||
// 5. Verify that MockPostFilter was called exactly once
|
||||
// It should be called for each evaluated pod from pod group in pod group cycle
|
||||
// but should not be called in WAP.
|
||||
// Only one pod is evaluated for pod group because minCount=3 can't be satisfied with the remaining 2 pods.
|
||||
err = wait.PollUntilContextTimeout(testCtx.Ctx, 100*time.Millisecond, 10*time.Second, false, func(ctx context.Context) (bool, error) {
|
||||
if mockPlugin.count == 3 {
|
||||
if mockPlugin.count == 1 {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
|
|
|
|||
Loading…
Reference in a new issue