Merge pull request #134991 from yliaog/class_events

added device class add/update events to noderesources plugin when DRAExtendedResource feature is enabled
This commit is contained in:
Kubernetes Prow Robot 2025-11-05 14:16:51 -08:00 committed by GitHub
commit 738475f9e2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 258 additions and 3 deletions

View file

@ -22,6 +22,7 @@ import (
"strings"
v1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/sets"
@ -35,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/utils/ptr"
)
var _ fwk.PreFilterPlugin = &Fit{}
@ -314,10 +316,16 @@ func (f *Fit) EventsToRegister(_ context.Context) ([]fwk.ClusterEventWithHint, e
nodeActionType = fwk.Add | fwk.UpdateNodeAllocatable
}
return []fwk.ClusterEventWithHint{
events := []fwk.ClusterEventWithHint{
{Event: fwk.ClusterEvent{Resource: fwk.Pod, ActionType: podActionType}, QueueingHintFn: f.isSchedulableAfterPodEvent},
{Event: fwk.ClusterEvent{Resource: fwk.Node, ActionType: nodeActionType}, QueueingHintFn: f.isSchedulableAfterNodeChange},
}, nil
}
if f.enableDRAExtendedResource {
events = append(events,
// A pod might be waiting for an exteneded resurce from a class to get created or modified.
fwk.ClusterEventWithHint{Event: fwk.ClusterEvent{Resource: fwk.DeviceClass, ActionType: fwk.Add | fwk.Update}, QueueingHintFn: f.isSchedulableAfterDeviceClassEvent})
}
return events, nil
}
// isSchedulableAfterPodEvent is invoked whenever a pod deleted or scaled down. It checks whether
@ -425,6 +433,7 @@ func (f *Fit) isSchedulableAfterNodeChange(logger klog.Logger, pod *v1.Pod, oldO
EnablePodLevelResources: f.enablePodLevelResources,
EnableDRAExtendedResource: f.enableDRAExtendedResource,
}
// Leaving in the queue, since the pod won't fit into the modified node anyway.
if !isFit(pod, modifiedNode, draManager, opts) {
logger.V(5).Info("node was created or updated, but it doesn't have enough resource(s) to accommodate this pod", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
@ -445,6 +454,37 @@ func (f *Fit) isSchedulableAfterNodeChange(logger klog.Logger, pod *v1.Pod, oldO
return fwk.Queue, nil
}
// isSchedulableAfterDeviceClassChange is invoked whenever a device class added or changed. It checks whether
// that change could make a previously unschedulable pod schedulable.
func (f *Fit) isSchedulableAfterDeviceClassEvent(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (fwk.QueueingHint, error) {
originalClass, modifiedClass, err := schedutil.As[*resourceapi.DeviceClass](oldObj, newObj)
if err != nil {
return fwk.Queue, err
}
if originalClass != nil {
if ptr.Deref(originalClass.Spec.ExtendedResourceName, "") == ptr.Deref(modifiedClass.Spec.ExtendedResourceName, "") {
logger.V(5).Info("device class has identical extended resource name", "pod", klog.KObj(pod), "deviceclass", klog.KObj(modifiedClass))
return fwk.QueueSkip, nil
}
} else {
// only check implicit extended resource name for Add, as device class name does not change during Update.
reqs := resource.PodRequests(pod, resource.PodResourcesOptions{})
if _, ok := reqs[v1.ResourceName(resourceapi.ResourceDeviceClassPrefix+modifiedClass.Name)]; ok {
logger.V(5).Info("device class was added, and may now fit the pod's resource requests", "pod", klog.KObj(pod), "deviceclass", klog.KObj(modifiedClass))
return fwk.Queue, nil
}
}
if modifiedClass.Spec.ExtendedResourceName != nil {
reqs := resource.PodRequests(pod, resource.PodResourcesOptions{})
if _, ok := reqs[v1.ResourceName(*modifiedClass.Spec.ExtendedResourceName)]; ok {
logger.V(5).Info("device class was created or updated, and may fit the pod's resoruce requests", "pod", klog.KObj(pod), "deviceclass", klog.KObj(modifiedClass))
return fwk.Queue, nil
}
}
logger.V(5).Info("created or updated deivce class extended resource name is either nil, or does not match pod's resource request", "pod", klog.KObj(pod), "deviceclass", klog.KObj(modifiedClass))
return fwk.QueueSkip, nil
}
// haveAnyRequestedResourcesIncreased returns true if any of the resources requested by the pod have increased or if allowed pod number increased.
func haveAnyRequestedResourcesIncreased(pod *v1.Pod, originalNode, modifiedNode *v1.Node, draManager fwk.SharedDRAManager, opts ResourceRequestsOptions) bool {
podRequest := computePodResourceRequest(pod, opts)

View file

@ -48,6 +48,7 @@ import (
st "k8s.io/kubernetes/pkg/scheduler/testing"
tf "k8s.io/kubernetes/pkg/scheduler/testing/framework"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
"k8s.io/utils/ptr"
)
var (
@ -1416,6 +1417,7 @@ func TestEventsToRegister(t *testing.T) {
name string
enableInPlacePodVerticalScaling bool
enableSchedulingQueueHint bool
enableDRAExtendedResource bool
expectedClusterEvents []fwk.ClusterEventWithHint
}{
{
@ -1442,11 +1444,28 @@ func TestEventsToRegister(t *testing.T) {
{Event: fwk.ClusterEvent{Resource: "Node", ActionType: fwk.Add | fwk.UpdateNodeAllocatable | fwk.UpdateNodeTaint | fwk.UpdateNodeLabel}},
},
},
{
name: "Register events with DRAExtendedResource feature enabled",
enableDRAExtendedResource: true,
expectedClusterEvents: []fwk.ClusterEventWithHint{
{Event: fwk.ClusterEvent{Resource: "Pod", ActionType: fwk.Delete}},
{Event: fwk.ClusterEvent{Resource: "Node", ActionType: fwk.Add | fwk.UpdateNodeAllocatable | fwk.UpdateNodeTaint | fwk.UpdateNodeLabel}},
{Event: fwk.ClusterEvent{Resource: fwk.DeviceClass, ActionType: fwk.Add | fwk.Update}},
},
},
{
name: "Register events with DRAExtendedResource feature disabled",
enableDRAExtendedResource: false,
expectedClusterEvents: []fwk.ClusterEventWithHint{
{Event: fwk.ClusterEvent{Resource: "Pod", ActionType: fwk.Delete}},
{Event: fwk.ClusterEvent{Resource: "Node", ActionType: fwk.Add | fwk.UpdateNodeAllocatable | fwk.UpdateNodeTaint | fwk.UpdateNodeLabel}},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fp := &Fit{enableInPlacePodVerticalScaling: test.enableInPlacePodVerticalScaling, enableSchedulingQueueHint: test.enableSchedulingQueueHint}
fp := &Fit{enableInPlacePodVerticalScaling: test.enableInPlacePodVerticalScaling, enableSchedulingQueueHint: test.enableSchedulingQueueHint, enableDRAExtendedResource: test.enableDRAExtendedResource}
_, ctx := ktesting.NewTestContext(t)
actualClusterEvents, err := fp.EventsToRegister(ctx)
if err != nil {
@ -1682,6 +1701,155 @@ func Test_isSchedulableAfterNodeChange(t *testing.T) {
}
}
func Test_isSchedulableAfterDeviceClassChange(t *testing.T) {
ern := "example.com/gpu"
testcases := map[string]struct {
pod *v1.Pod
oldObj, newObj any
expectedHint fwk.QueueingHint
expectedErr bool
}{
"backoff-wrong-new-object": {
pod: &v1.Pod{},
newObj: "not-a-class",
expectedHint: fwk.Queue,
expectedErr: true,
},
"backoff-wrong-old-object": {
pod: &v1.Pod{},
oldObj: "not-a-class",
newObj: &resourceapi.DeviceClass{},
expectedHint: fwk.Queue,
expectedErr: true,
},
"skip-queue-on-class-nil-extended-resource-name-pointer": {
pod: newResourcePod(framework.Resource{Memory: 2}),
newObj: &resourceapi.DeviceClass{
Spec: resourceapi.DeviceClassSpec{},
},
oldObj: &resourceapi.DeviceClass{
Spec: resourceapi.DeviceClassSpec{},
},
expectedHint: fwk.QueueSkip,
},
"skip-queue-on-class-same-extended-resource-name-pointer": {
pod: newResourcePod(framework.Resource{Memory: 2}),
newObj: &resourceapi.DeviceClass{
Spec: resourceapi.DeviceClassSpec{
ExtendedResourceName: &ern,
},
},
oldObj: &resourceapi.DeviceClass{
Spec: resourceapi.DeviceClassSpec{
ExtendedResourceName: &ern,
},
},
expectedHint: fwk.QueueSkip,
},
"skip-queue-on-class-same-extended-resource-name": {
pod: newResourcePod(framework.Resource{Memory: 2}),
newObj: &resourceapi.DeviceClass{
Spec: resourceapi.DeviceClassSpec{ExtendedResourceName: ptr.To("example.com/gpu")},
},
oldObj: &resourceapi.DeviceClass{
Spec: resourceapi.DeviceClassSpec{ExtendedResourceName: ptr.To("example.com/gpu")},
},
expectedHint: fwk.QueueSkip,
},
"queue-on-class-add-with-implicit-extended-resource-name": {
pod: newResourcePod(framework.Resource{
ScalarResources: map[v1.ResourceName]int64{"deviceclass.resource.kubernetes.io/gpuclass": 1},
}),
newObj: &resourceapi.DeviceClass{
ObjectMeta: metav1.ObjectMeta{
Name: "gpuclass",
},
Spec: resourceapi.DeviceClassSpec{ExtendedResourceName: ptr.To("example.com/gpu")},
},
expectedHint: fwk.Queue,
},
"skip-on-class-add-with-implicit-extended-resource-name-not-matching": {
pod: newResourcePod(framework.Resource{
ScalarResources: map[v1.ResourceName]int64{"deviceclass.resource.kubernetes.io/gpuclass": 1},
}),
newObj: &resourceapi.DeviceClass{
ObjectMeta: metav1.ObjectMeta{
Name: "myclass",
},
Spec: resourceapi.DeviceClassSpec{ExtendedResourceName: ptr.To("example.com/gpu")},
},
expectedHint: fwk.QueueSkip,
},
"skip-on-class-add-with-explicit-extended-resource-name-not-matching": {
pod: newResourcePod(framework.Resource{
ScalarResources: map[v1.ResourceName]int64{"example.com/othergpu": 1},
}),
newObj: &resourceapi.DeviceClass{
ObjectMeta: metav1.ObjectMeta{
Name: "myclass",
},
Spec: resourceapi.DeviceClassSpec{ExtendedResourceName: ptr.To("example.com/gpu")},
},
expectedHint: fwk.QueueSkip,
},
"skip-on-class-update-with-implicit-extended-resource-name": {
pod: newResourcePod(framework.Resource{
ScalarResources: map[v1.ResourceName]int64{"deviceclass.resource.kubernetes.io/gpuclass": 1},
}),
newObj: &resourceapi.DeviceClass{
ObjectMeta: metav1.ObjectMeta{
Name: "gpuclass",
},
Spec: resourceapi.DeviceClassSpec{ExtendedResourceName: ptr.To("example.com/gpu")},
},
oldObj: &resourceapi.DeviceClass{
ObjectMeta: metav1.ObjectMeta{
Name: "gpuclass",
},
},
expectedHint: fwk.QueueSkip,
},
"queue-on-class-add-with-extended-resource-name": {
pod: newResourcePod(framework.Resource{
ScalarResources: map[v1.ResourceName]int64{"example.com/gpu": 1},
}),
newObj: &resourceapi.DeviceClass{
Spec: resourceapi.DeviceClassSpec{ExtendedResourceName: ptr.To("example.com/gpu")},
},
expectedHint: fwk.Queue,
},
"queue-on-class-update-with-extended-resource-name": {
pod: newResourcePod(framework.Resource{
ScalarResources: map[v1.ResourceName]int64{"example.com/gpu": 1},
}),
newObj: &resourceapi.DeviceClass{
Spec: resourceapi.DeviceClassSpec{ExtendedResourceName: ptr.To("example.com/gpu")},
},
oldObj: &resourceapi.DeviceClass{
Spec: resourceapi.DeviceClassSpec{ExtendedResourceName: ptr.To("example.com/gpu1")},
},
expectedHint: fwk.Queue,
},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
p, err := NewFit(ctx, &config.NodeResourcesFitArgs{ScoringStrategy: defaultScoringStrategy}, nil, plfeature.Features{})
if err != nil {
t.Fatal(err)
}
actualHint, err := p.(*Fit).isSchedulableAfterDeviceClassEvent(logger, tc.pod, tc.oldObj, tc.newObj)
if tc.expectedErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.expectedHint, actualHint)
})
}
}
func TestIsFit(t *testing.T) {
testCases := map[string]struct {
pod *v1.Pod

View file

@ -462,6 +462,53 @@ var CoreResourceEnqueueTestCases = []*CoreResourceEnqueueTestCase{
EnableSchedulingQueueHint: sets.New(true),
EnableDRAExtendedResource: true,
},
{
Name: "Pod rejected by the NodeResourcesFit plugin is requeued when created DeviceClass having the extended resource matching pod's requests, and DRAExtendedResource is enabled",
EnablePlugins: []string{names.NodeResourcesFit},
InitialNodes: []*v1.Node{
st.MakeNode().Name("fake-node1").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj(),
},
Pods: []*v1.Pod{
// - Pod1 requests available amount of CPU (in fake-node1), but will be rejected due to lack of extended resource exampe.com/gpu.
st.MakePod().Name("pod1").Res(map[v1.ResourceName]string{v1.ResourceCPU: "4", "example.com/gpu": "1"}).Container("image").Obj(),
st.MakePod().Name("pod2").Res(map[v1.ResourceName]string{v1.ResourceCPU: "4", "example.com/othergpu": "1"}).Container("image").Obj(),
},
TriggerFn: func(testCtx *testutils.TestContext) (map[fwk.ClusterEvent]uint64, error) {
// Trigger a DeviceClass Create event that has the extended resource name that matches pod's resource request.
if _, err := testCtx.ClientSet.ResourceV1().DeviceClasses().Create(testCtx.Ctx, &resourceapi.DeviceClass{ObjectMeta: metav1.ObjectMeta{Name: "fake-class"}, Spec: resourceapi.DeviceClassSpec{ExtendedResourceName: ptr.To("example.com/gpu")}}, metav1.CreateOptions{}); err != nil {
return nil, fmt.Errorf("failed to create the fake-class: %w", err)
}
return map[fwk.ClusterEvent]uint64{{Resource: fwk.DeviceClass, ActionType: fwk.Add}: 1}, nil
},
WantRequeuedPods: sets.New("pod1"),
EnableSchedulingQueueHint: sets.New(true),
EnableDRAExtendedResource: true,
},
{
Name: "Pod rejected by the NodeResourcesFit plugin is requeued when updated DeviceClass has the extended resource, and DRAExtendedResource is enabled",
EnablePlugins: []string{names.NodeResourcesFit},
InitialDeviceClasses: []*resourceapi.DeviceClass{{ObjectMeta: metav1.ObjectMeta{Name: "fake-class"}, Spec: resourceapi.DeviceClassSpec{ExtendedResourceName: nil}}},
InitialNodes: []*v1.Node{
st.MakeNode().Name("fake-node1").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj(),
},
Pods: []*v1.Pod{
// - Pod1 requests available amount of CPU (in fake-node1), but will be rejected due to lack of extended resource example.com/gpu.
st.MakePod().Name("pod1").Res(map[v1.ResourceName]string{v1.ResourceCPU: "4", "example.com/gpu": "1"}).Container("image").Obj(),
st.MakePod().Name("pod2").Res(map[v1.ResourceName]string{v1.ResourceCPU: "4", "example.com/othergpu": "1"}).Container("image").Obj(),
},
TriggerFn: func(testCtx *testutils.TestContext) (map[fwk.ClusterEvent]uint64, error) {
// Trigger a DeviceClass Update event that adds the extended resource name that matches pod's resource request.
if _, err := testCtx.ClientSet.ResourceV1().DeviceClasses().Update(testCtx.Ctx, &resourceapi.DeviceClass{ObjectMeta: metav1.ObjectMeta{Name: "fake-class"}, Spec: resourceapi.DeviceClassSpec{ExtendedResourceName: ptr.To("example.com/gpu")}}, metav1.UpdateOptions{}); err != nil {
return nil, fmt.Errorf("failed to update the fake-class: %w", err)
}
return map[fwk.ClusterEvent]uint64{{Resource: fwk.DeviceClass, ActionType: fwk.Update}: 1}, nil
},
WantRequeuedPods: sets.New("pod1"),
EnableSchedulingQueueHint: sets.New(true),
EnableDRAExtendedResource: true,
},
{
Name: "Pod rejected by the NodeResourcesFit plugin isn't requeued when a Node is updated without increase in the requested resources",
EnablePlugins: []string{names.NodeResourcesFit, names.NodeAffinity},