diff --git a/pkg/scheduler/framework/plugins/noderesources/fit.go b/pkg/scheduler/framework/plugins/noderesources/fit.go index 839b8ca1704..c4d42553203 100644 --- a/pkg/scheduler/framework/plugins/noderesources/fit.go +++ b/pkg/scheduler/framework/plugins/noderesources/fit.go @@ -22,6 +22,7 @@ import ( "strings" v1 "k8s.io/api/core/v1" + resourceapi "k8s.io/api/resource/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/sets" @@ -35,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" schedutil "k8s.io/kubernetes/pkg/scheduler/util" + "k8s.io/utils/ptr" ) var _ fwk.PreFilterPlugin = &Fit{} @@ -314,10 +316,16 @@ func (f *Fit) EventsToRegister(_ context.Context) ([]fwk.ClusterEventWithHint, e nodeActionType = fwk.Add | fwk.UpdateNodeAllocatable } - return []fwk.ClusterEventWithHint{ + events := []fwk.ClusterEventWithHint{ {Event: fwk.ClusterEvent{Resource: fwk.Pod, ActionType: podActionType}, QueueingHintFn: f.isSchedulableAfterPodEvent}, {Event: fwk.ClusterEvent{Resource: fwk.Node, ActionType: nodeActionType}, QueueingHintFn: f.isSchedulableAfterNodeChange}, - }, nil + } + if f.enableDRAExtendedResource { + events = append(events, + // A pod might be waiting for an exteneded resurce from a class to get created or modified. + fwk.ClusterEventWithHint{Event: fwk.ClusterEvent{Resource: fwk.DeviceClass, ActionType: fwk.Add | fwk.Update}, QueueingHintFn: f.isSchedulableAfterDeviceClassEvent}) + } + return events, nil } // isSchedulableAfterPodEvent is invoked whenever a pod deleted or scaled down. It checks whether @@ -425,6 +433,7 @@ func (f *Fit) isSchedulableAfterNodeChange(logger klog.Logger, pod *v1.Pod, oldO EnablePodLevelResources: f.enablePodLevelResources, EnableDRAExtendedResource: f.enableDRAExtendedResource, } + // Leaving in the queue, since the pod won't fit into the modified node anyway. if !isFit(pod, modifiedNode, draManager, opts) { logger.V(5).Info("node was created or updated, but it doesn't have enough resource(s) to accommodate this pod", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode)) @@ -445,6 +454,37 @@ func (f *Fit) isSchedulableAfterNodeChange(logger klog.Logger, pod *v1.Pod, oldO return fwk.Queue, nil } +// isSchedulableAfterDeviceClassChange is invoked whenever a device class added or changed. It checks whether +// that change could make a previously unschedulable pod schedulable. +func (f *Fit) isSchedulableAfterDeviceClassEvent(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (fwk.QueueingHint, error) { + originalClass, modifiedClass, err := schedutil.As[*resourceapi.DeviceClass](oldObj, newObj) + if err != nil { + return fwk.Queue, err + } + if originalClass != nil { + if ptr.Deref(originalClass.Spec.ExtendedResourceName, "") == ptr.Deref(modifiedClass.Spec.ExtendedResourceName, "") { + logger.V(5).Info("device class has identical extended resource name", "pod", klog.KObj(pod), "deviceclass", klog.KObj(modifiedClass)) + return fwk.QueueSkip, nil + } + } else { + // only check implicit extended resource name for Add, as device class name does not change during Update. + reqs := resource.PodRequests(pod, resource.PodResourcesOptions{}) + if _, ok := reqs[v1.ResourceName(resourceapi.ResourceDeviceClassPrefix+modifiedClass.Name)]; ok { + logger.V(5).Info("device class was added, and may now fit the pod's resource requests", "pod", klog.KObj(pod), "deviceclass", klog.KObj(modifiedClass)) + return fwk.Queue, nil + } + } + if modifiedClass.Spec.ExtendedResourceName != nil { + reqs := resource.PodRequests(pod, resource.PodResourcesOptions{}) + if _, ok := reqs[v1.ResourceName(*modifiedClass.Spec.ExtendedResourceName)]; ok { + logger.V(5).Info("device class was created or updated, and may fit the pod's resoruce requests", "pod", klog.KObj(pod), "deviceclass", klog.KObj(modifiedClass)) + return fwk.Queue, nil + } + } + logger.V(5).Info("created or updated deivce class extended resource name is either nil, or does not match pod's resource request", "pod", klog.KObj(pod), "deviceclass", klog.KObj(modifiedClass)) + return fwk.QueueSkip, nil +} + // haveAnyRequestedResourcesIncreased returns true if any of the resources requested by the pod have increased or if allowed pod number increased. func haveAnyRequestedResourcesIncreased(pod *v1.Pod, originalNode, modifiedNode *v1.Node, draManager fwk.SharedDRAManager, opts ResourceRequestsOptions) bool { podRequest := computePodResourceRequest(pod, opts) diff --git a/pkg/scheduler/framework/plugins/noderesources/fit_test.go b/pkg/scheduler/framework/plugins/noderesources/fit_test.go index 2be8ed60457..8088591815b 100644 --- a/pkg/scheduler/framework/plugins/noderesources/fit_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/fit_test.go @@ -48,6 +48,7 @@ import ( st "k8s.io/kubernetes/pkg/scheduler/testing" tf "k8s.io/kubernetes/pkg/scheduler/testing/framework" "k8s.io/kubernetes/pkg/scheduler/util/assumecache" + "k8s.io/utils/ptr" ) var ( @@ -1416,6 +1417,7 @@ func TestEventsToRegister(t *testing.T) { name string enableInPlacePodVerticalScaling bool enableSchedulingQueueHint bool + enableDRAExtendedResource bool expectedClusterEvents []fwk.ClusterEventWithHint }{ { @@ -1442,11 +1444,28 @@ func TestEventsToRegister(t *testing.T) { {Event: fwk.ClusterEvent{Resource: "Node", ActionType: fwk.Add | fwk.UpdateNodeAllocatable | fwk.UpdateNodeTaint | fwk.UpdateNodeLabel}}, }, }, + { + name: "Register events with DRAExtendedResource feature enabled", + enableDRAExtendedResource: true, + expectedClusterEvents: []fwk.ClusterEventWithHint{ + {Event: fwk.ClusterEvent{Resource: "Pod", ActionType: fwk.Delete}}, + {Event: fwk.ClusterEvent{Resource: "Node", ActionType: fwk.Add | fwk.UpdateNodeAllocatable | fwk.UpdateNodeTaint | fwk.UpdateNodeLabel}}, + {Event: fwk.ClusterEvent{Resource: fwk.DeviceClass, ActionType: fwk.Add | fwk.Update}}, + }, + }, + { + name: "Register events with DRAExtendedResource feature disabled", + enableDRAExtendedResource: false, + expectedClusterEvents: []fwk.ClusterEventWithHint{ + {Event: fwk.ClusterEvent{Resource: "Pod", ActionType: fwk.Delete}}, + {Event: fwk.ClusterEvent{Resource: "Node", ActionType: fwk.Add | fwk.UpdateNodeAllocatable | fwk.UpdateNodeTaint | fwk.UpdateNodeLabel}}, + }, + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - fp := &Fit{enableInPlacePodVerticalScaling: test.enableInPlacePodVerticalScaling, enableSchedulingQueueHint: test.enableSchedulingQueueHint} + fp := &Fit{enableInPlacePodVerticalScaling: test.enableInPlacePodVerticalScaling, enableSchedulingQueueHint: test.enableSchedulingQueueHint, enableDRAExtendedResource: test.enableDRAExtendedResource} _, ctx := ktesting.NewTestContext(t) actualClusterEvents, err := fp.EventsToRegister(ctx) if err != nil { @@ -1682,6 +1701,155 @@ func Test_isSchedulableAfterNodeChange(t *testing.T) { } } +func Test_isSchedulableAfterDeviceClassChange(t *testing.T) { + ern := "example.com/gpu" + testcases := map[string]struct { + pod *v1.Pod + oldObj, newObj any + expectedHint fwk.QueueingHint + expectedErr bool + }{ + "backoff-wrong-new-object": { + pod: &v1.Pod{}, + newObj: "not-a-class", + expectedHint: fwk.Queue, + expectedErr: true, + }, + "backoff-wrong-old-object": { + pod: &v1.Pod{}, + oldObj: "not-a-class", + newObj: &resourceapi.DeviceClass{}, + expectedHint: fwk.Queue, + expectedErr: true, + }, + "skip-queue-on-class-nil-extended-resource-name-pointer": { + pod: newResourcePod(framework.Resource{Memory: 2}), + newObj: &resourceapi.DeviceClass{ + Spec: resourceapi.DeviceClassSpec{}, + }, + oldObj: &resourceapi.DeviceClass{ + Spec: resourceapi.DeviceClassSpec{}, + }, + expectedHint: fwk.QueueSkip, + }, + "skip-queue-on-class-same-extended-resource-name-pointer": { + pod: newResourcePod(framework.Resource{Memory: 2}), + newObj: &resourceapi.DeviceClass{ + Spec: resourceapi.DeviceClassSpec{ + ExtendedResourceName: &ern, + }, + }, + oldObj: &resourceapi.DeviceClass{ + Spec: resourceapi.DeviceClassSpec{ + ExtendedResourceName: &ern, + }, + }, + expectedHint: fwk.QueueSkip, + }, + "skip-queue-on-class-same-extended-resource-name": { + pod: newResourcePod(framework.Resource{Memory: 2}), + newObj: &resourceapi.DeviceClass{ + Spec: resourceapi.DeviceClassSpec{ExtendedResourceName: ptr.To("example.com/gpu")}, + }, + oldObj: &resourceapi.DeviceClass{ + Spec: resourceapi.DeviceClassSpec{ExtendedResourceName: ptr.To("example.com/gpu")}, + }, + expectedHint: fwk.QueueSkip, + }, + "queue-on-class-add-with-implicit-extended-resource-name": { + pod: newResourcePod(framework.Resource{ + ScalarResources: map[v1.ResourceName]int64{"deviceclass.resource.kubernetes.io/gpuclass": 1}, + }), + newObj: &resourceapi.DeviceClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "gpuclass", + }, + Spec: resourceapi.DeviceClassSpec{ExtendedResourceName: ptr.To("example.com/gpu")}, + }, + expectedHint: fwk.Queue, + }, + "skip-on-class-add-with-implicit-extended-resource-name-not-matching": { + pod: newResourcePod(framework.Resource{ + ScalarResources: map[v1.ResourceName]int64{"deviceclass.resource.kubernetes.io/gpuclass": 1}, + }), + newObj: &resourceapi.DeviceClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "myclass", + }, + Spec: resourceapi.DeviceClassSpec{ExtendedResourceName: ptr.To("example.com/gpu")}, + }, + expectedHint: fwk.QueueSkip, + }, + "skip-on-class-add-with-explicit-extended-resource-name-not-matching": { + pod: newResourcePod(framework.Resource{ + ScalarResources: map[v1.ResourceName]int64{"example.com/othergpu": 1}, + }), + newObj: &resourceapi.DeviceClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "myclass", + }, + Spec: resourceapi.DeviceClassSpec{ExtendedResourceName: ptr.To("example.com/gpu")}, + }, + expectedHint: fwk.QueueSkip, + }, + "skip-on-class-update-with-implicit-extended-resource-name": { + pod: newResourcePod(framework.Resource{ + ScalarResources: map[v1.ResourceName]int64{"deviceclass.resource.kubernetes.io/gpuclass": 1}, + }), + newObj: &resourceapi.DeviceClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "gpuclass", + }, + Spec: resourceapi.DeviceClassSpec{ExtendedResourceName: ptr.To("example.com/gpu")}, + }, + oldObj: &resourceapi.DeviceClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "gpuclass", + }, + }, + expectedHint: fwk.QueueSkip, + }, + "queue-on-class-add-with-extended-resource-name": { + pod: newResourcePod(framework.Resource{ + ScalarResources: map[v1.ResourceName]int64{"example.com/gpu": 1}, + }), + newObj: &resourceapi.DeviceClass{ + Spec: resourceapi.DeviceClassSpec{ExtendedResourceName: ptr.To("example.com/gpu")}, + }, + expectedHint: fwk.Queue, + }, + "queue-on-class-update-with-extended-resource-name": { + pod: newResourcePod(framework.Resource{ + ScalarResources: map[v1.ResourceName]int64{"example.com/gpu": 1}, + }), + newObj: &resourceapi.DeviceClass{ + Spec: resourceapi.DeviceClassSpec{ExtendedResourceName: ptr.To("example.com/gpu")}, + }, + oldObj: &resourceapi.DeviceClass{ + Spec: resourceapi.DeviceClassSpec{ExtendedResourceName: ptr.To("example.com/gpu1")}, + }, + expectedHint: fwk.Queue, + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + p, err := NewFit(ctx, &config.NodeResourcesFitArgs{ScoringStrategy: defaultScoringStrategy}, nil, plfeature.Features{}) + if err != nil { + t.Fatal(err) + } + actualHint, err := p.(*Fit).isSchedulableAfterDeviceClassEvent(logger, tc.pod, tc.oldObj, tc.newObj) + if tc.expectedErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, tc.expectedHint, actualHint) + }) + } +} + func TestIsFit(t *testing.T) { testCases := map[string]struct { pod *v1.Pod diff --git a/test/integration/scheduler/queueing/queue.go b/test/integration/scheduler/queueing/queue.go index 2a697520c71..1acd94a079a 100644 --- a/test/integration/scheduler/queueing/queue.go +++ b/test/integration/scheduler/queueing/queue.go @@ -462,6 +462,53 @@ var CoreResourceEnqueueTestCases = []*CoreResourceEnqueueTestCase{ EnableSchedulingQueueHint: sets.New(true), EnableDRAExtendedResource: true, }, + { + Name: "Pod rejected by the NodeResourcesFit plugin is requeued when created DeviceClass having the extended resource matching pod's requests, and DRAExtendedResource is enabled", + EnablePlugins: []string{names.NodeResourcesFit}, + InitialNodes: []*v1.Node{ + st.MakeNode().Name("fake-node1").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj(), + }, + Pods: []*v1.Pod{ + // - Pod1 requests available amount of CPU (in fake-node1), but will be rejected due to lack of extended resource exampe.com/gpu. + st.MakePod().Name("pod1").Res(map[v1.ResourceName]string{v1.ResourceCPU: "4", "example.com/gpu": "1"}).Container("image").Obj(), + st.MakePod().Name("pod2").Res(map[v1.ResourceName]string{v1.ResourceCPU: "4", "example.com/othergpu": "1"}).Container("image").Obj(), + }, + TriggerFn: func(testCtx *testutils.TestContext) (map[fwk.ClusterEvent]uint64, error) { + // Trigger a DeviceClass Create event that has the extended resource name that matches pod's resource request. + if _, err := testCtx.ClientSet.ResourceV1().DeviceClasses().Create(testCtx.Ctx, &resourceapi.DeviceClass{ObjectMeta: metav1.ObjectMeta{Name: "fake-class"}, Spec: resourceapi.DeviceClassSpec{ExtendedResourceName: ptr.To("example.com/gpu")}}, metav1.CreateOptions{}); err != nil { + return nil, fmt.Errorf("failed to create the fake-class: %w", err) + } + + return map[fwk.ClusterEvent]uint64{{Resource: fwk.DeviceClass, ActionType: fwk.Add}: 1}, nil + }, + WantRequeuedPods: sets.New("pod1"), + EnableSchedulingQueueHint: sets.New(true), + EnableDRAExtendedResource: true, + }, + { + Name: "Pod rejected by the NodeResourcesFit plugin is requeued when updated DeviceClass has the extended resource, and DRAExtendedResource is enabled", + EnablePlugins: []string{names.NodeResourcesFit}, + InitialDeviceClasses: []*resourceapi.DeviceClass{{ObjectMeta: metav1.ObjectMeta{Name: "fake-class"}, Spec: resourceapi.DeviceClassSpec{ExtendedResourceName: nil}}}, + InitialNodes: []*v1.Node{ + st.MakeNode().Name("fake-node1").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj(), + }, + Pods: []*v1.Pod{ + // - Pod1 requests available amount of CPU (in fake-node1), but will be rejected due to lack of extended resource example.com/gpu. + st.MakePod().Name("pod1").Res(map[v1.ResourceName]string{v1.ResourceCPU: "4", "example.com/gpu": "1"}).Container("image").Obj(), + st.MakePod().Name("pod2").Res(map[v1.ResourceName]string{v1.ResourceCPU: "4", "example.com/othergpu": "1"}).Container("image").Obj(), + }, + TriggerFn: func(testCtx *testutils.TestContext) (map[fwk.ClusterEvent]uint64, error) { + // Trigger a DeviceClass Update event that adds the extended resource name that matches pod's resource request. + if _, err := testCtx.ClientSet.ResourceV1().DeviceClasses().Update(testCtx.Ctx, &resourceapi.DeviceClass{ObjectMeta: metav1.ObjectMeta{Name: "fake-class"}, Spec: resourceapi.DeviceClassSpec{ExtendedResourceName: ptr.To("example.com/gpu")}}, metav1.UpdateOptions{}); err != nil { + return nil, fmt.Errorf("failed to update the fake-class: %w", err) + } + + return map[fwk.ClusterEvent]uint64{{Resource: fwk.DeviceClass, ActionType: fwk.Update}: 1}, nil + }, + WantRequeuedPods: sets.New("pod1"), + EnableSchedulingQueueHint: sets.New(true), + EnableDRAExtendedResource: true, + }, { Name: "Pod rejected by the NodeResourcesFit plugin isn't requeued when a Node is updated without increase in the requested resources", EnablePlugins: []string{names.NodeResourcesFit, names.NodeAffinity},