diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index ad9b49d775c..7f4431cd492 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -22,7 +22,6 @@ import ( "fmt" "iter" "slices" - "sort" "strings" "sync" "time" @@ -32,17 +31,14 @@ import ( apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" apimeta "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/util/retry" - resourcehelper "k8s.io/component-helpers/resource" "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" "k8s.io/dynamic-resource-allocation/cel" "k8s.io/dynamic-resource-allocation/resourceclaim" @@ -51,14 +47,10 @@ import ( fwk "k8s.io/kube-scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/validation" - "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" - "k8s.io/kubernetes/pkg/scheduler/metrics" schedutil "k8s.io/kubernetes/pkg/scheduler/util" - "k8s.io/kubernetes/pkg/scheduler/util/assumecache" - "k8s.io/kubernetes/pkg/util/slice" "k8s.io/utils/ptr" ) @@ -67,18 +59,6 @@ const ( Name = names.DynamicResources stateKey fwk.StateKey = Name - - // specialClaimInMemName is the name of the special resource claim that - // exists only in memory. The claim will get a generated name when it is - // written to API server. - // - // It's intentionally not a valid ResourceClaim name to avoid conflicts with - // some actual ResourceClaim in the apiserver. - specialClaimInMemName = "" - - // AssumeExtendedResourceTimeoutDefaultSeconds is the default timeout for waiting - // for the extended resource claim to be updated in assumed cache. - AssumeExtendedResourceTimeoutDefaultSeconds = 120 ) // The state is initialized in PreFilter phase. Because we save the pointer in @@ -133,13 +113,6 @@ func (d *stateData) Clone() fwk.StateData { return d } -// draExtendedResource stores data for extended resources backed by DRA. -// It will remain empty when the DRAExtendedResource feature is disabled. -type draExtendedResource struct { - // May have extended resource backed by DRA. - podScalarResources map[v1.ResourceName]int64 -} - type informationForClaim struct { // Node selector based on the claim status if allocated. availableOnNodes *nodeaffinity.NodeSelector @@ -412,116 +385,6 @@ func (pl *DynamicResources) foreachPodResourceClaim(pod *v1.Pod, cb func(podReso return nil } -// hasDeviceClassMappedExtendedResource returns true when the given resource list has an extended resource, that has -// a mapping to a device class. -func hasDeviceClassMappedExtendedResource(reqs v1.ResourceList, cache fwk.DeviceClassResolver) bool { - for rName, rValue := range reqs { - if rValue.IsZero() { - // We only care about the resources requested by the pod we are trying to schedule. - continue - } - if schedutil.IsDRAExtendedResourceName(rName) { - if cache.GetDeviceClass(rName) != nil { - return true - } - } - } - return false -} - -// findExtendedResourceClaim looks for the extended resource claim, i.e., the claim with special annotation -// set to "true", and with the pod as owner. It must be called with all ResourceClaims in the cluster. -// The returned ResourceClaim is read-only. -func findExtendedResourceClaim(pod *v1.Pod, resourceClaims []*resourceapi.ResourceClaim) *resourceapi.ResourceClaim { - for _, c := range resourceClaims { - if c.Annotations[resourceapi.ExtendedResourceClaimAnnotation] == "true" { - for _, or := range c.OwnerReferences { - if or.Name == pod.Name && *or.Controller && or.UID == pod.UID { - return c - } - } - } - } - return nil -} - -// preFilterExtendedResources checks if there is any extended resource in the -// pod requests that has a device class mapping, i.e., there is a device class -// that has spec.ExtendedResourceName or its implicit extended resource name -// matching the given extended resource in that pod requests. -// -// It looks for the special resource claim for the pod created from prior scheduling -// cycle. If not found, it creates the special claim with no Requests in the Spec, -// with a temporary UID, and the specialClaimInMemName name. -// Either way, the special claim is stored in state.claims. -// -// In addition, draExtendedResource is also stored in the cycle state. -// -// It returns the special ResourceClaim and an error status. It returns nil for both -// if the feature is disabled or not required for the Pod. -func (pl *DynamicResources) preFilterExtendedResources(pod *v1.Pod, logger klog.Logger, s *stateData) (*resourceapi.ResourceClaim, *fwk.Status) { - if !pl.fts.EnableDRAExtendedResource { - return nil, nil - } - - // Try to build device class mapping from cache - cache := pl.draManager.DeviceClassResolver() - reqs := resourcehelper.PodRequests(pod, resourcehelper.PodResourcesOptions{}) - - hasExtendedResource := hasDeviceClassMappedExtendedResource(reqs, cache) - if !hasExtendedResource { - return nil, nil - } - r := framework.NewResource(reqs) - s.draExtendedResource.podScalarResources = r.ScalarResources - - resourceClaims, err := pl.draManager.ResourceClaims().List() - if err != nil { - return nil, statusError(logger, err, "listing ResourceClaims") - } - - // Check if the special resource claim has been created from prior scheduling cycle. - // - // If it was already allocated earlier, that allocation might not be valid anymore. - // We could try to check that, but it depends on various factors that are difficult to - // cover (basically needs to replicate allocator logic) and if it turns out that the - // allocation is stale, we would have to schedule with those allocated devices not - // available for a new allocation. This situation should be rare (= binding failure), - // so we solve it via brute-force - // - Kick off deallocation in the background. - // - Mark the pod as unschedulable. Successful deallocation will make it schedulable again. - extendedResourceClaim := findExtendedResourceClaim(pod, resourceClaims) - if extendedResourceClaim == nil { - // Create one special claim for all extended resources backed by DRA in the Pod. - // Create the ResourceClaim with pod as owner, with a generated name that uses - // -extended-resources- as base. The final name will get truncated if it - // would be too long. - extendedResourceClaim = &resourceapi.ResourceClaim{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: pod.Namespace, - Name: specialClaimInMemName, - // fake temporary UID for use in SignalClaimPendingAllocation - UID: types.UID(uuid.NewUUID()), - GenerateName: pod.Name + "-extended-resources-", - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: "v1", - Kind: "Pod", - Name: pod.Name, - UID: pod.UID, - Controller: ptr.To(true), - }, - }, - Annotations: map[string]string{ - resourceapi.ExtendedResourceClaimAnnotation: "true", - }, - }, - Spec: resourceapi.ResourceClaimSpec{}, - } - } - return extendedResourceClaim, nil -} - // PreFilter invoked at the prefilter extension point to check if pod has all // immediate claims bound. UnschedulableAndUnresolvable is returned if // the pod cannot be scheduled at the moment on any node. @@ -725,76 +588,6 @@ func getStateData(cs fwk.CycleState) (*stateData, error) { return s, nil } -// filterExtendedResources computes the special claim's Requests based on the -// node's Allocatable. It returns the special claim updated to match what needs -// to be allocated through DRA for the node or nil if nothing needs to be allocated. -// -// It returns an error when the pod's extended resource requests cannot be allocated -// from node's Allocatable, nor matching any device class's explicit or implicit -// ExtendedResourceName. -func (pl *DynamicResources) filterExtendedResources(state *stateData, pod *v1.Pod, nodeInfo fwk.NodeInfo, logger klog.Logger) (*resourceapi.ResourceClaim, []v1.ContainerExtendedResourceRequest, *fwk.Status) { - extendedResourceClaim := state.claims.extendedResourceClaim() - if extendedResourceClaim == nil { - // Nothing to do. - return nil, nil, nil - } - - // The claim is from the prior scheduling cycle, return unschedulable such that it can be - // deleted at the PostFilter phase, and retry anew. - if extendedResourceClaim.Spec.Devices.Requests != nil { - return nil, nil, statusUnschedulable(logger, "cannot schedule extended resource claim", "pod", klog.KObj(pod), "node", klog.KObj(nodeInfo.Node()), "claim", klog.KObj(extendedResourceClaim)) - } - - extendedResources := make(map[v1.ResourceName]int64) - hasExtendedResource := false - cache := pl.draManager.DeviceClassResolver() - for rName, rQuant := range state.draExtendedResource.podScalarResources { - if !schedutil.IsDRAExtendedResourceName(rName) { - continue - } - // Skip in case request quantity is zero - if rQuant == 0 { - continue - } - allocatable, okScalar := nodeInfo.GetAllocatable().GetScalarResources()[rName] - isBackedByDRA := cache.GetDeviceClass(rName) != nil - if isBackedByDRA && allocatable == 0 { - // node needs to provide the resource via DRA - extendedResources[rName] = rQuant - hasExtendedResource = true - } else if !okScalar { - // has request neither provided by device plugin, nor backed by DRA, - // hence the pod does not fit the node. - return nil, nil, statusUnschedulable(logger, "cannot fit resource", "pod", klog.KObj(pod), "node", klog.KObj(nodeInfo.Node()), "resource", rName) - } - } - // No extended resources backed by DRA on this node. - // The pod may have extended resources, but they are all backed by device - // plugin, hence the noderesources plugin should have checked if the node - // can fit the pod. - // This dynamic resources plugin Filter phase has nothing left to do. - if state.claims.noUserClaim() && !hasExtendedResource { - // It cannot be allocated when reaching here, as the claim from prior scheduling cycle - // would return unschedulable earlier in this function. - return nil, nil, nil - } - - if extendedResourceClaim.Status.Allocation != nil { - // If it is already allocated, then we cannot simply allocate it again. - // - // It cannot be allocated when reaching here, as the claim found from prior scheduling cycle - // would return unschedulable earlier in this function. - return nil, nil, nil - } - - // Each node needs its own, potentially different variant of the claim. - nodeExtendedResourceClaim := extendedResourceClaim.DeepCopy() - reqs, mappings := createRequestsAndMappings(pod, extendedResources, logger, cache) - nodeExtendedResourceClaim.Spec.Devices.Requests = reqs - - return nodeExtendedResourceClaim, mappings, nil -} - // Filter invoked at the filter extension point. // It evaluates if a pod can fit due to the resources it requests, // for both allocated and unallocated claims. @@ -942,11 +735,6 @@ func (pl *DynamicResources) Filter(ctx context.Context, cs fwk.CycleState, pod * return nil } -// isSpecialClaimName return true when the name is the specialClaimInMemName. -func isSpecialClaimName(name string) bool { - return name == specialClaimInMemName -} - // PostFilter checks whether there are allocated claims that could get // deallocated to help get the Pod schedulable. If yes, it picks one and // requests its deallocation. This only gets called when filtering found no @@ -1215,62 +1003,7 @@ func (pl *DynamicResources) Unreserve(ctx context.Context, cs fwk.CycleState, po } } } - - extendedResourceClaim := state.claims.extendedResourceClaim() - if extendedResourceClaim == nil { - // there is no extended resource claim - return - } - - if deleted := pl.draManager.ResourceClaims().RemoveClaimPendingAllocation(state.claims.getInitialExtendedResourceClaimUID()); deleted { - pl.draManager.ResourceClaims().AssumedClaimRestore(extendedResourceClaim.Namespace, extendedResourceClaim.Name) - } - if isSpecialClaimName(extendedResourceClaim.Name) { - // In memory temporary extended resource claim does not need to be deleted - return - } - logger.V(5).Info("delete extended resource backed by DRA", "resourceclaim", klog.KObj(extendedResourceClaim), "pod", klog.KObj(pod), "claim.UID", extendedResourceClaim.UID) - extendedResourceClaim = extendedResourceClaim.DeepCopy() - if err := pl.deleteClaim(ctx, extendedResourceClaim, logger); err != nil { - logger.Error(err, "delete", "resourceclaim", klog.KObj(extendedResourceClaim)) - } -} - -// deleteClaim deletes the claim after removing the finalizer from the claim, if there is any. -func (pl *DynamicResources) deleteClaim(ctx context.Context, claim *resourceapi.ResourceClaim, logger klog.Logger) error { - refreshClaim := false - retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { - if refreshClaim { - updatedClaim, err := pl.clientset.ResourceV1().ResourceClaims(claim.Namespace).Get(ctx, claim.Name, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("get resourceclaim %s/%s: %w", claim.Namespace, claim.Name, err) - } - claim = updatedClaim - } else { - refreshClaim = true - } - // Remove the finalizer to unblock removal first. - builtinControllerFinalizer := slices.Index(claim.Finalizers, resourceapi.Finalizer) - if builtinControllerFinalizer >= 0 { - claim.Finalizers = slices.Delete(claim.Finalizers, builtinControllerFinalizer, builtinControllerFinalizer+1) - } - - _, err := pl.clientset.ResourceV1().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}) - if err != nil { - return fmt.Errorf("update resourceclaim %s/%s: %w", claim.Namespace, claim.Name, err) - } - return nil - }) - if retryErr != nil { - return retryErr - } - - logger.V(5).Info("Delete", "resourceclaim", klog.KObj(claim)) - err := pl.clientset.ResourceV1().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{}) - if err != nil { - return err - } - return nil + pl.unreserveExtendedResourceClaim(ctx, logger, pod, state) } // PreBind gets called in a separate goroutine after it has been determined @@ -1357,169 +1090,6 @@ func (pl *DynamicResources) PreBindPreFlight(ctx context.Context, cs fwk.CycleSt return nil } -func partitionContainerIndices(containers []v1.Container, numInitContainers int) ([]int, []int) { - longLivedContainerIndices := make([]int, 0, len(containers)) - shortLivedInitContainerIndices := make([]int, 0, numInitContainers) - for i, c := range containers { - isInit := i < numInitContainers - isSidecar := c.RestartPolicy != nil && *c.RestartPolicy == v1.ContainerRestartPolicyAlways - if isInit && !isSidecar { - shortLivedInitContainerIndices = append(shortLivedInitContainerIndices, i) - continue - } - longLivedContainerIndices = append(longLivedContainerIndices, i) - } - return longLivedContainerIndices, shortLivedInitContainerIndices -} - -// createResourceRequestAndMappings returns the request and mappings for the given container and resource. -// reusableRequests is a list of other DeviceRequests this container can use before requesting its own. -// items in reusableRequests may be nil. -// The returned request may be nil if no additional request was required. -// The returned mappings may be empty if this container does not use this resource. -func createResourceRequestAndMappings(containerIndex int, container *v1.Container, rName v1.ResourceName, className string, reusableRequests []*resourceapi.DeviceRequest) (*resourceapi.DeviceRequest, []v1.ContainerExtendedResourceRequest) { - var mappings []v1.ContainerExtendedResourceRequest - creqs := container.Resources.Requests - if creqs == nil { - return nil, nil - } - var rQuant resource.Quantity - var ok bool - if rQuant, ok = creqs[rName]; !ok { - return nil, nil - } - crq, ok := (&rQuant).AsInt64() - if !ok || crq == 0 { - return nil, nil - } - sum := int64(0) - for _, r := range reusableRequests { - if r != nil { - sum += r.Exactly.Count - mappings = append(mappings, v1.ContainerExtendedResourceRequest{ - ContainerName: container.Name, - ResourceName: rName.String(), - RequestName: r.Name, - }) - if sum >= crq { - return nil, mappings - } - } - } - keys := make([]string, 0, len(creqs)) - for k := range creqs { - keys = append(keys, k.String()) - } - // resource requests in a container is a map, their names must - // be sorted to determine the resource's index order. - slice.SortStrings(keys) - ridx := 0 - for j := range keys { - if keys[j] == rName.String() { - ridx = j - break - } - } - // containerIndex is the index of the container in the list of initContainers + containers. - // ridx is the index of the extended resource request in the sorted all requests in the container. - // crq is the quantity of the extended resource request. - reqName := fmt.Sprintf("container-%d-request-%d", containerIndex, ridx) - deviceReq := resourceapi.DeviceRequest{ - Name: reqName, // need to be container name index - extended resource name index - Exactly: &resourceapi.ExactDeviceRequest{ - DeviceClassName: className, - AllocationMode: resourceapi.DeviceAllocationModeExactCount, - Count: crq - sum, // the extra devices to request - }, - } - mappings = append(mappings, v1.ContainerExtendedResourceRequest{ - ContainerName: container.Name, - ResourceName: rName.String(), - RequestName: reqName, - }) - - return &deviceReq, mappings -} - -func createRequestsAndMappings(pod *v1.Pod, extendedResources map[v1.ResourceName]int64, logger klog.Logger, deviceClassMapping fwk.DeviceClassResolver) ([]resourceapi.DeviceRequest, []v1.ContainerExtendedResourceRequest) { - containers := slices.Clone(pod.Spec.InitContainers) - containers = append(containers, pod.Spec.Containers...) - longLivedContainerIndices, shortLivedInitContainerIndices := partitionContainerIndices(containers, len(pod.Spec.InitContainers)) - - // all requests across all containers and resource types - var deviceRequests []resourceapi.DeviceRequest - // all mappings across all containers and resource types - var mappings []v1.ContainerExtendedResourceRequest - - for resource := range extendedResources { - class := deviceClassMapping.GetDeviceClass(resource) - // skip if the resource does not map to a device class - if class == nil { - continue - } - - // shortLivedResourceMappings is the mapping of container+resource→request for short lived containers (init non-sidecar container) - var shortLivedResourceMappings []v1.ContainerExtendedResourceRequest - // longLivedResourceMappings is the mapping of container+resource→request for long lived containers (init sidecar or regular container) - var longLivedResourceMappings []v1.ContainerExtendedResourceRequest - - // longLivedResourceRequests is the list of requests for a given resource by long-lived containers. - // The length of this list is the same as the length of containers. - // Entries may be nil if the container at that index did not produce a request for that resource. - // Requests at later indices are reusable by non-sidecar initContainers at earlier indices. - longLivedResourceRequests := make([]*resourceapi.DeviceRequest, len(containers)) - for _, i := range longLivedContainerIndices { - containerRequest, containerMappings := createResourceRequestAndMappings(i, &containers[i], resource, class.Name, nil) - longLivedResourceRequests[i] = containerRequest // might be nil - longLivedResourceMappings = append(longLivedResourceMappings, containerMappings...) // might be zero-length - } - - // maxShortLivedResourceRequest is the maximum request for a given resource by short-lived containers - var maxShortLivedResourceRequest *resourceapi.DeviceRequest - // shortLivedRequestNames is all request names for a given resource by short-lived containers. All mappings to any name in - // this set will be replaced by maxShortLivedResourceRequest.Name. - shortLivedRequestNames := sets.New[string]() - for _, i := range shortLivedInitContainerIndices { - containerRequest, containerMappings := createResourceRequestAndMappings(i, &containers[i], resource, class.Name, longLivedResourceRequests[i:]) - if containerRequest != nil { - shortLivedRequestNames.Insert(containerRequest.Name) - if maxShortLivedResourceRequest == nil || maxShortLivedResourceRequest.Exactly.Count < containerRequest.Exactly.Count { - maxShortLivedResourceRequest = containerRequest - } - } - shortLivedResourceMappings = append(shortLivedResourceMappings, containerMappings...) // might be zero-length - } - - // rewrite mappings to short-lived requests to use the maximum short-lived request name - if maxShortLivedResourceRequest != nil && len(shortLivedRequestNames) > 1 { - shortLivedRequestNames.Delete(maxShortLivedResourceRequest.Name) - for i := range shortLivedResourceMappings { - if shortLivedRequestNames.Has(shortLivedResourceMappings[i].RequestName) { - shortLivedResourceMappings[i].RequestName = maxShortLivedResourceRequest.Name - } - } - } - - // append non-nil requests - if maxShortLivedResourceRequest != nil { - deviceRequests = append(deviceRequests, *maxShortLivedResourceRequest) - } - for _, request := range longLivedResourceRequests { - if request != nil { - deviceRequests = append(deviceRequests, *request) - } - } - // append mappings - mappings = append(mappings, longLivedResourceMappings...) - mappings = append(mappings, shortLivedResourceMappings...) - } - - sort.Slice(deviceRequests, func(i, j int) bool { - return deviceRequests[i].Name < deviceRequests[j].Name - }) - return deviceRequests, mappings -} - // bindClaim gets called by PreBind for claim which is not reserved for the pod yet. // It might not even be allocated. bindClaim then ensures that the allocation // and reservation are recorded. This finishes the work started in Reserve. @@ -1542,23 +1112,7 @@ func (pl *DynamicResources) bindClaim(ctx context.Context, state *stateData, ind // completed, either successfully or with a failure. if resourceClaimModified { if isExtendedResourceClaim { - // Unlike other claims, extended resource claim is created in API server below. - // AssumeClaimAfterAPICall returns ErrNotFound when the informer update has not reached assumed cache yet. - // Hence we must poll and wait for it. - pollErr := wait.PollUntilContextTimeout(ctx, 1*time.Second, time.Duration(AssumeExtendedResourceTimeoutDefaultSeconds)*time.Second, true, - func(ctx context.Context) (bool, error) { - if err := pl.draManager.ResourceClaims().AssumeClaimAfterAPICall(claim); err != nil { - if errors.Is(err, assumecache.ErrNotFound) { - return false, nil - } - logger.V(5).Info("Claim not stored in assume cache", "claim", klog.KObj(claim), "err", err) - return false, err - } - return true, nil - }) - if pollErr != nil { - logger.V(5).Info("Claim not stored in assume cache after retries", "claim", klog.KObj(claim), "err", pollErr) - } + pl.waitForExtendedClaimInAssumeCache(ctx, logger, claim) } else { // This can fail, but only for reasons that are okay (concurrent delete or update). // Shouldn't happen in this case. @@ -1576,28 +1130,13 @@ func (pl *DynamicResources) bindClaim(ctx context.Context, state *stateData, ind // Create the special claim for extended resource backed by DRA if isExtendedResourceClaim && isSpecialClaimName(claim.Name) { - logger.V(5).Info("preparing to create claim for extended resources", "pod", klog.KObj(pod), "node", nodeName, "resourceclaim", klog.Format(claim)) - // Replace claim template with instantiated claim for the node. - if nodeAllocation, ok := state.nodeAllocations[nodeName]; ok && nodeAllocation.extendedResourceClaim != nil { - claim = nodeAllocation.extendedResourceClaim.DeepCopy() - } else { - return nil, fmt.Errorf("extended resource claim not found for node %s", nodeName) - } - logger.V(5).Info("create claim for extended resources", "pod", klog.KObj(pod), "node", nodeName, "resourceclaim", klog.Format(claim)) - // Clear fields which must or can not be set during creation. - claim.Status.Allocation = nil - claim.Name = "" - claim.UID = "" var err error - claim, err = pl.clientset.ResourceV1().ResourceClaims(claim.Namespace).Create(ctx, claim, metav1.CreateOptions{}) + claim, err = pl.createExtendedResourceClaimInAPI(ctx, logger, pod, nodeName, state) if err != nil { - metrics.ResourceClaimCreatesTotal.WithLabelValues("failure").Inc() - return nil, fmt.Errorf("create claim for extended resources %v: %w", klog.KObj(claim), err) + return nil, err } - metrics.ResourceClaimCreatesTotal.WithLabelValues("success").Inc() - resourceClaimModified = true - logger.V(5).Info("created claim for extended resources", "pod", klog.KObj(pod), "node", nodeName, "resourceclaim", klog.Format(claim)) + resourceClaimModified = true // Track the actual extended ResourceClaim from now. // Relevant if we need to delete again in Unreserve. if err := state.claims.updateExtendedResourceClaim(claim); err != nil { @@ -1676,22 +1215,9 @@ func (pl *DynamicResources) bindClaim(ctx context.Context, state *stateData, ind // Patch the pod status with the new information about the generated // special resource claim. if isExtendedResourceClaim { - var cer []v1.ContainerExtendedResourceRequest - if nodeAllocation, ok := state.nodeAllocations[nodeName]; ok { - cer = nodeAllocation.containerResourceRequestMappings - } - if len(cer) == 0 { - return nil, fmt.Errorf("nil or empty request mappings, no update of pod %s/%s ExtendedResourceClaimStatus", pod.Namespace, pod.Name) - } - - podStatusCopy := pod.Status.DeepCopy() - podStatusCopy.ExtendedResourceClaimStatus = &v1.PodExtendedResourceClaimStatus{ - RequestMappings: cer, - ResourceClaimName: claim.Name, - } - err := schedutil.PatchPodStatus(ctx, pl.clientset, pod.Name, pod.Namespace, &pod.Status, podStatusCopy) + err := pl.patchPodExtendedResourceClaimStatus(ctx, pod, claim, nodeName, state) if err != nil { - return nil, fmt.Errorf("update pod %s/%s ExtendedResourceClaimStatus: %w", pod.Namespace, pod.Name, err) + return nil, err } } diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go index d1ab39c166a..fc089251eab 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go @@ -3239,542 +3239,6 @@ func (m *mockDeviceClassResolver) GetDeviceClass(resourceName v1.ResourceName) * return m.mapping[resourceName] } -func Test_createRequestsAndMappings_requests(t *testing.T) { - pod1 := st.MakePod().Name(podName).Namespace(namespace). - UID(podUID). - Res(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName): "1", - v1.ResourceName(extendedResourceName + "1"): "2", - }). - Obj() - pod2 := st.MakePod().Name(podName).Namespace(namespace). - UID(podUID). - Res(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName): "1", - }). - Res(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName + "1"): "2", - }). - Obj() - - podInit := st.MakePod().Name(podName).Namespace(namespace). - UID(podUID). - Res(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName): "1", - }). - InitReq(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName + "init"): "2", - }). - Obj() - podInit2 := st.MakePod().Name(podName).Namespace(namespace). - UID(podUID). - Res(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName): "1", - }). - InitReq(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName + "init"): "1", - }). - InitReq(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName + "init"): "2", - }). - Obj() - podInit3 := st.MakePod().Name(podName).Namespace(namespace). - UID(podUID). - Res(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName): "1", - }). - InitReq(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName + "init"): "1", - }). - SidecarReq(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName + "init"): "1", - }). - InitReq(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName + "init"): "2", - }). - Obj() - - res := map[v1.ResourceName]int64{ - v1.ResourceName(extendedResourceName): 1, - } - res2 := map[v1.ResourceName]int64{ - v1.ResourceName(extendedResourceName): 1, - v1.ResourceName(extendedResourceName + "1"): 2, - } - resInit := map[v1.ResourceName]int64{ - v1.ResourceName(extendedResourceName): 1, - v1.ResourceName(extendedResourceName + "init"): 2, - } - devMap := map[v1.ResourceName]*resourceapi.DeviceClass{ - v1.ResourceName(extendedResourceName): { - ObjectMeta: metav1.ObjectMeta{ - Name: "class", - }, - }, - } - devMap2 := map[v1.ResourceName]*resourceapi.DeviceClass{ - v1.ResourceName(extendedResourceName): { - ObjectMeta: metav1.ObjectMeta{ - Name: "class", - }, - }, - v1.ResourceName(extendedResourceName + "1"): { - ObjectMeta: metav1.ObjectMeta{ - Name: "class1", - }, - }, - } - devMapInit := map[v1.ResourceName]*resourceapi.DeviceClass{ - v1.ResourceName(extendedResourceName): { - ObjectMeta: metav1.ObjectMeta{ - Name: "class", - }, - }, - v1.ResourceName(extendedResourceName + "init"): { - ObjectMeta: metav1.ObjectMeta{ - Name: "classInit", - }, - }, - } - devReq := resourceapi.DeviceRequest{ - Name: "container-0-request-0", - Exactly: &resourceapi.ExactDeviceRequest{ - DeviceClassName: "class", - AllocationMode: resourceapi.DeviceAllocationModeExactCount, - Count: 1, - }, - } - devReq2 := resourceapi.DeviceRequest{ - Name: "container-0-request-1", - Exactly: &resourceapi.ExactDeviceRequest{ - DeviceClassName: "class1", - AllocationMode: resourceapi.DeviceAllocationModeExactCount, - Count: 2, - }, - } - devReq3 := resourceapi.DeviceRequest{ - Name: "container-1-request-0", - Exactly: &resourceapi.ExactDeviceRequest{ - DeviceClassName: "class1", - AllocationMode: resourceapi.DeviceAllocationModeExactCount, - Count: 2, - }, - } - devReqInit := resourceapi.DeviceRequest{ - Name: "container-1-request-0", - Exactly: &resourceapi.ExactDeviceRequest{ - DeviceClassName: "class", - AllocationMode: resourceapi.DeviceAllocationModeExactCount, - Count: 1, - }, - } - devReqSidecar := resourceapi.DeviceRequest{ - Name: "container-1-request-0", - Exactly: &resourceapi.ExactDeviceRequest{ - DeviceClassName: "classInit", - AllocationMode: resourceapi.DeviceAllocationModeExactCount, - Count: 1, - }, - } - devReq2Init := resourceapi.DeviceRequest{ - Name: "container-1-request-0", - Exactly: &resourceapi.ExactDeviceRequest{ - DeviceClassName: "classInit", - AllocationMode: resourceapi.DeviceAllocationModeExactCount, - Count: 2, - }, - } - devReq6Init := resourceapi.DeviceRequest{ - Name: "container-0-request-0", - Exactly: &resourceapi.ExactDeviceRequest{ - DeviceClassName: "classInit", - AllocationMode: resourceapi.DeviceAllocationModeExactCount, - Count: 2, - }, - } - devReq3Init := resourceapi.DeviceRequest{ - Name: "container-2-request-0", - Exactly: &resourceapi.ExactDeviceRequest{ - DeviceClassName: "class", - AllocationMode: resourceapi.DeviceAllocationModeExactCount, - Count: 1, - }, - } - devReq4Init := resourceapi.DeviceRequest{ - Name: "container-3-request-0", - Exactly: &resourceapi.ExactDeviceRequest{ - DeviceClassName: "class", - AllocationMode: resourceapi.DeviceAllocationModeExactCount, - Count: 1, - }, - } - devReq5Init := resourceapi.DeviceRequest{ - Name: "container-2-request-0", - Exactly: &resourceapi.ExactDeviceRequest{ - DeviceClassName: "classInit", - AllocationMode: resourceapi.DeviceAllocationModeExactCount, - Count: 2, - }, - } - - testcases := map[string]struct { - pod *v1.Pod - extendedResources map[v1.ResourceName]int64 - cache fwk.DeviceClassResolver - wantDeviceRequests []resourceapi.DeviceRequest - }{ - "nil": { - pod: pod1, - wantDeviceRequests: nil, - }, - "one resource match": { - pod: pod1, - extendedResources: res, - cache: &mockDeviceClassResolver{mapping: devMap}, - wantDeviceRequests: []resourceapi.DeviceRequest{devReq}, - }, - "one resource match, one resource not match": { - pod: pod1, - extendedResources: res2, - cache: &mockDeviceClassResolver{mapping: devMap}, - wantDeviceRequests: []resourceapi.DeviceRequest{devReq}, - }, - "two resources match": { - pod: pod1, - extendedResources: res2, - cache: &mockDeviceClassResolver{mapping: devMap2}, - wantDeviceRequests: []resourceapi.DeviceRequest{devReq, devReq2}, - }, - "two containers match": { - pod: pod2, - extendedResources: res2, - cache: &mockDeviceClassResolver{mapping: devMap2}, - wantDeviceRequests: []resourceapi.DeviceRequest{devReq, devReq3}, - }, - "one init container, one regular container": { - pod: podInit, - extendedResources: resInit, - cache: &mockDeviceClassResolver{mapping: devMapInit}, - wantDeviceRequests: []resourceapi.DeviceRequest{devReq6Init, devReqInit}, - }, - "two init containers, one regular container": { - pod: podInit2, - extendedResources: resInit, - cache: &mockDeviceClassResolver{mapping: devMapInit}, - wantDeviceRequests: []resourceapi.DeviceRequest{devReq2Init, devReq3Init}, - }, - "three init containers, one sidecar, one regular container": { - pod: podInit3, - extendedResources: resInit, - cache: &mockDeviceClassResolver{mapping: devMapInit}, - wantDeviceRequests: []resourceapi.DeviceRequest{devReqSidecar, devReq5Init, devReq4Init}, - }, - } - - for name, tc := range testcases { - t.Run(name, func(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) - gotDeviceRequests, _ := createRequestsAndMappings(tc.pod, tc.extendedResources, logger, tc.cache) - if len(tc.wantDeviceRequests) != len(gotDeviceRequests) { - t.Fatalf("different length, want %#v, len=%v, got %#v, len=%v", tc.wantDeviceRequests, len(tc.wantDeviceRequests), gotDeviceRequests, len(gotDeviceRequests)) - } - sort.Slice(gotDeviceRequests, func(i, j int) bool { return gotDeviceRequests[i].Name < gotDeviceRequests[j].Name }) - for i, r := range tc.wantDeviceRequests { - if r.Name != gotDeviceRequests[i].Name { - t.Errorf("different name, want %#v, got %#v", r, gotDeviceRequests[i]) - } - if r.Exactly.DeviceClassName != gotDeviceRequests[i].Exactly.DeviceClassName { - t.Errorf("different deviceClassName, want %#v, got %#v", r, gotDeviceRequests[i]) - } - if r.Exactly.AllocationMode != gotDeviceRequests[i].Exactly.AllocationMode { - t.Errorf("different allocationMode, want %#v, got %#v", r, gotDeviceRequests[i]) - } - if r.Exactly.Count != gotDeviceRequests[i].Exactly.Count { - t.Errorf("different count, want %#v, got %#v", r.Exactly.Count, gotDeviceRequests[i].Exactly.Count) - } - } - }) - } -} - -func Test_createRequestsAndMappings_mappings(t *testing.T) { - pod1 := st.MakePod().Name(podName).Namespace(namespace). - UID(podUID). - Res(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName): "1", - v1.ResourceName(extendedResourceName + "1"): "2", - }). - Obj() - pod1InitImplicit := st.MakePod().Name(podName).Namespace(namespace). - UID(podUID). - InitReq(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName + "init"): "1", - v1.ResourceName(resourceapi.ResourceDeviceClassPrefix + "classInit"): "2", - }). - Obj() - pod2 := st.MakePod().Name(podName).Namespace(namespace). - UID(podUID). - Res(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName): "1", - }). - Res(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName + "1"): "2", - }). - Obj() - - podInit := st.MakePod().Name(podName).Namespace(namespace). - UID(podUID). - Res(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName): "1", - }). - InitReq(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName + "init"): "2", - }). - Obj() - podInit2 := st.MakePod().Name(podName).Namespace(namespace). - UID(podUID). - Res(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName): "1", - }). - InitReq(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName + "init"): "1", - }). - InitReq(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName + "init"): "2", - }). - Obj() - podInitImplicit := st.MakePod().Name(podName).Namespace(namespace). - UID(podUID). - Res(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName): "1", - }). - InitReq(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName + "init"): "1", - }). - InitReq(map[v1.ResourceName]string{ - v1.ResourceName(resourceapi.ResourceDeviceClassPrefix + "classInit"): "2", - }). - Obj() - podInit3 := st.MakePod().Name(podName).Namespace(namespace). - UID(podUID). - Res(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName): "1", - }). - InitReq(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName + "init"): "1", - }). - SidecarReq(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName + "init"): "1", - }). - InitReq(map[v1.ResourceName]string{ - v1.ResourceName(extendedResourceName + "init"): "2", - }). - Obj() - res := map[v1.ResourceName]int64{ - v1.ResourceName(extendedResourceName): 1, - v1.ResourceName(extendedResourceName + "1"): 2, - } - resInit := map[v1.ResourceName]int64{ - v1.ResourceName(extendedResourceName): 1, - v1.ResourceName(extendedResourceName + "init"): 2, - } - resInitImplicit := map[v1.ResourceName]int64{ - v1.ResourceName(extendedResourceName): 1, - v1.ResourceName(extendedResourceName + "init"): 2, - v1.ResourceName(resourceapi.ResourceDeviceClassPrefix + "classInit"): 2, - } - devMap := map[v1.ResourceName]*resourceapi.DeviceClass{ - v1.ResourceName(extendedResourceName): { - ObjectMeta: metav1.ObjectMeta{ - Name: "class", - }, - }, - v1.ResourceName(extendedResourceName + "1"): { - ObjectMeta: metav1.ObjectMeta{ - Name: "class1", - }, - }, - } - devMapInit := map[v1.ResourceName]*resourceapi.DeviceClass{ - v1.ResourceName(extendedResourceName): { - ObjectMeta: metav1.ObjectMeta{ - Name: "class", - }, - }, - v1.ResourceName(extendedResourceName + "init"): { - ObjectMeta: metav1.ObjectMeta{ - Name: "classInit", - }, - }, - v1.ResourceName(resourceapi.ResourceDeviceClassPrefix + "classInit"): { - ObjectMeta: metav1.ObjectMeta{ - Name: "classInit", - }, - }, - } - cer := v1.ContainerExtendedResourceRequest{ - ContainerName: "con0", - ResourceName: extendedResourceName, - RequestName: "container-0-request-0", - } - cer1 := v1.ContainerExtendedResourceRequest{ - ContainerName: "con0", - ResourceName: extendedResourceName + "1", - RequestName: "container-0-request-1", - } - cer2 := v1.ContainerExtendedResourceRequest{ - ContainerName: "con1", - ResourceName: extendedResourceName + "1", - RequestName: "container-1-request-0", - } - cer3 := v1.ContainerExtendedResourceRequest{ - ContainerName: "con0", - ResourceName: extendedResourceName, - RequestName: "container-1-request-0", - } - cer4 := v1.ContainerExtendedResourceRequest{ - ContainerName: "con0", - ResourceName: extendedResourceName, - RequestName: "container-2-request-0", - } - cer5 := v1.ContainerExtendedResourceRequest{ - ContainerName: "con0", - ResourceName: extendedResourceName, - RequestName: "container-3-request-0", - } - cerInit := v1.ContainerExtendedResourceRequest{ - ContainerName: "init-con0", - ResourceName: extendedResourceName + "init", - RequestName: "container-1-request-0", - } - cerInit0 := v1.ContainerExtendedResourceRequest{ - ContainerName: "init-con0", - ResourceName: extendedResourceName + "init", - RequestName: "container-0-request-0", - } - cerInit1 := v1.ContainerExtendedResourceRequest{ - ContainerName: "init-con0", - ResourceName: extendedResourceName + "init", - RequestName: "container-1-request-0", - } - cerInit2 := v1.ContainerExtendedResourceRequest{ - ContainerName: "init-con1", - ResourceName: extendedResourceName + "init", - RequestName: "container-1-request-0", - } - cerInit3 := v1.ContainerExtendedResourceRequest{ - ContainerName: "init-con2", - ResourceName: extendedResourceName + "init", - RequestName: "container-2-request-0", - } - cerSidecar := v1.ContainerExtendedResourceRequest{ - ContainerName: "sidecar-con1", - ResourceName: extendedResourceName + "init", - RequestName: "container-1-request-0", - } - - cerInitImplicit := v1.ContainerExtendedResourceRequest{ - ContainerName: "init-con0", - ResourceName: extendedResourceName + "init", - RequestName: "container-0-request-0", - } - cerInit4Implicit := v1.ContainerExtendedResourceRequest{ - ContainerName: "init-con0", - ResourceName: extendedResourceName + "init", - RequestName: "container-0-request-1", - } - cerInit2Implicit := v1.ContainerExtendedResourceRequest{ - ContainerName: "init-con1", - ResourceName: resourceapi.ResourceDeviceClassPrefix + "classInit", - RequestName: "container-1-request-0", - } - cerInit3Implicit := v1.ContainerExtendedResourceRequest{ - ContainerName: "init-con0", - ResourceName: resourceapi.ResourceDeviceClassPrefix + "classInit", - RequestName: "container-0-request-0", - } - - testcases := map[string]struct { - pod *v1.Pod - extnededResources map[v1.ResourceName]int64 - deviceClassMapping fwk.DeviceClassResolver - wantReqMappings []v1.ContainerExtendedResourceRequest - }{ - "one container, two requests": { - pod: pod1, - extnededResources: res, - deviceClassMapping: &mockDeviceClassResolver{devMap}, - wantReqMappings: []v1.ContainerExtendedResourceRequest{cer, cer1}, - }, - "one container, one explicit and one implicit request": { - pod: pod1InitImplicit, - extnededResources: resInitImplicit, - deviceClassMapping: &mockDeviceClassResolver{devMapInit}, - wantReqMappings: []v1.ContainerExtendedResourceRequest{cerInit3Implicit, cerInit4Implicit}, - }, - "two containers, two requests": { - pod: pod2, - extnededResources: res, - deviceClassMapping: &mockDeviceClassResolver{devMap}, - wantReqMappings: []v1.ContainerExtendedResourceRequest{cer, cer2}, - }, - "one init container, one regular container, one request": { - pod: podInit, - extnededResources: resInit, - deviceClassMapping: &mockDeviceClassResolver{devMapInit}, - wantReqMappings: []v1.ContainerExtendedResourceRequest{cerInit0, cer3}, - }, - "three containers (two are init container), two requests": { - pod: podInit2, - extnededResources: resInit, - deviceClassMapping: &mockDeviceClassResolver{devMapInit}, - wantReqMappings: []v1.ContainerExtendedResourceRequest{cerInit, cerInit2, cer4}, - }, - "three containers (two are init container), both explicit and implicit resources": { - pod: podInitImplicit, - extnededResources: resInitImplicit, - deviceClassMapping: &mockDeviceClassResolver{devMapInit}, - wantReqMappings: []v1.ContainerExtendedResourceRequest{cerInitImplicit, cerInit2Implicit, cer4}, - }, - "four containers (two are init container, one sidecar), three requests": { - pod: podInit3, - extnededResources: resInit, - deviceClassMapping: &mockDeviceClassResolver{devMapInit}, - wantReqMappings: []v1.ContainerExtendedResourceRequest{cerInit1, cerSidecar, cerInit3, cer5}, - }, - } - - for name, tc := range testcases { - t.Run(name, func(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) - _, gotReqMappings := createRequestsAndMappings(tc.pod, tc.extnededResources, logger, tc.deviceClassMapping) - if len(tc.wantReqMappings) != len(gotReqMappings) { - t.Fatalf("different length, want %#v, got %#v", tc.wantReqMappings, gotReqMappings) - } - sort.Slice(gotReqMappings, func(i, j int) bool { - if gotReqMappings[i].RequestName < gotReqMappings[j].RequestName { - return true - } - if gotReqMappings[i].RequestName > gotReqMappings[j].RequestName { - return false - } - return gotReqMappings[i].ContainerName < gotReqMappings[j].ContainerName - }) - for i, r := range tc.wantReqMappings { - if r.RequestName != gotReqMappings[i].RequestName { - t.Errorf("different request name, want %#v, got %#v", r, gotReqMappings[i]) - } - if r.ContainerName != gotReqMappings[i].ContainerName { - t.Errorf("different container name, want %#v, got %#v", r, gotReqMappings[i]) - } - if r.ResourceName != gotReqMappings[i].ResourceName { - t.Errorf("different resource name, want %#v, got %#v", r, gotReqMappings[i]) - } - } - }) - } -} - // TestAllocatorSelection covers the selection of a structured allocation implementation // based on actual Kubernetes feature gates. This test lives here instead of // k8s.io/dynamic-resource-allocation/structured because that code has no access diff --git a/pkg/scheduler/framework/plugins/dynamicresources/extendeddynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/extendeddynamicresources.go new file mode 100644 index 00000000000..1c677e728cd --- /dev/null +++ b/pkg/scheduler/framework/plugins/dynamicresources/extendeddynamicresources.go @@ -0,0 +1,616 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamicresources + +import ( + "context" + "errors" + "fmt" + "slices" + "sort" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/kubernetes/pkg/scheduler/metrics" + + v1 "k8s.io/api/core/v1" + resourceapi "k8s.io/api/resource/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/client-go/util/retry" + resourcehelper "k8s.io/component-helpers/resource" + "k8s.io/klog/v2" + fwk "k8s.io/kube-scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework" + schedutil "k8s.io/kubernetes/pkg/scheduler/util" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" + "k8s.io/kubernetes/pkg/util/slice" + "k8s.io/utils/ptr" +) + +// Extended Resources Backed by DRA - Scheduler Plugin Workflow by each extension points +// +// PreFilter - preFilterExtendedResources() +// - for pods using extended resources, find existing claim or create in-memory claim with temporary name "" +// - the in-memory claim is used to track and allocate resources, claim object is created in PreBind extension point. +// - store the claim in stateData for Filter extension point +// +// Filter - filterExtendedResources() +// - if stale claim with Spec is identified, return Unschedulable for PostFilter extension point to cleanup +// - check which resources satisfied by device plugin vs need DRA +// - if extended resources need to be allocated through DRA, create node-specific claim +// +// PostFilter +// - if extended resource claim has real name (not ""): +// - it's stale from prior cycle -> delete it -> trigger retry +// +// Reserve +// - Store allocation results from Filter in stateData +// - Mark the claim as "allocation in-flight" via SignalClaimPendingAllocation() +// +// Unreserve +// - Remove claim from in-flight allocations and restore assume cache +// - Delete claim from API server if it has real name +// +// PreBind - bindClaim() +// - For "" claims: create in API server and update stateData +// - Update claim status: add finalizer, allocation, and pod reservation +// - Store in assume cache (poll for extended resource claims) +// - Update pod.Status.ExtendedResourceClaimStatus with request mappings + +const ( + // specialClaimInMemName is the name of the special resource claim that + // exists only in memory. The claim will get a generated name when it is + // written to API server. + // + // It's intentionally not a valid ResourceClaim name to avoid conflicts with + // some actual ResourceClaim in the apiserver. + specialClaimInMemName = "" + + // AssumeExtendedResourceTimeoutDefaultSeconds is the default timeout for waiting + // for the extended resource claim to be updated in assumed cache. + AssumeExtendedResourceTimeoutDefaultSeconds = 120 +) + +// draExtendedResource stores data for extended resources backed by DRA. +// It will remain empty when the DRAExtendedResource feature is disabled. +type draExtendedResource struct { + // May have extended resource backed by DRA. + podScalarResources map[v1.ResourceName]int64 +} + +// hasDeviceClassMappedExtendedResource returns true when the given resource list has an extended resource, that has +// a mapping to a device class. +func hasDeviceClassMappedExtendedResource(reqs v1.ResourceList, cache fwk.DeviceClassResolver) bool { + for rName, rValue := range reqs { + if rValue.IsZero() { + // We only care about the resources requested by the pod we are trying to schedule. + continue + } + if schedutil.IsDRAExtendedResourceName(rName) { + if cache.GetDeviceClass(rName) != nil { + return true + } + } + } + return false +} + +// findExtendedResourceClaim looks for the extended resource claim, i.e., the claim with special annotation +// set to "true", and with the pod as owner. It must be called with all ResourceClaims in the cluster. +// The returned ResourceClaim is read-only. +func findExtendedResourceClaim(pod *v1.Pod, resourceClaims []*resourceapi.ResourceClaim) *resourceapi.ResourceClaim { + for _, c := range resourceClaims { + if c.Annotations[resourceapi.ExtendedResourceClaimAnnotation] == "true" { + for _, or := range c.OwnerReferences { + if or.Name == pod.Name && *or.Controller && or.UID == pod.UID { + return c + } + } + } + } + return nil +} + +// preFilterExtendedResources checks if there is any extended resource in the +// pod requests that has a device class mapping, i.e., there is a device class +// that has spec.ExtendedResourceName or its implicit extended resource name +// matching the given extended resource in that pod requests. +// +// It looks for the special resource claim for the pod created from prior scheduling +// cycle. If not found, it creates the special claim with no Requests in the Spec, +// with a temporary UID, and the specialClaimInMemName name. +// Either way, the special claim is stored in state.claims. +// +// In addition, draExtendedResource is also stored in the cycle state. +// +// It returns the special ResourceClaim and an error status. It returns nil for both +// if the feature is disabled or not required for the Pod. +func (pl *DynamicResources) preFilterExtendedResources(pod *v1.Pod, logger klog.Logger, s *stateData) (*resourceapi.ResourceClaim, *fwk.Status) { + if !pl.fts.EnableDRAExtendedResource { + return nil, nil + } + + // Try to build device class mapping from cache + cache := pl.draManager.DeviceClassResolver() + reqs := resourcehelper.PodRequests(pod, resourcehelper.PodResourcesOptions{}) + + hasExtendedResource := hasDeviceClassMappedExtendedResource(reqs, cache) + if !hasExtendedResource { + return nil, nil + } + r := framework.NewResource(reqs) + s.draExtendedResource.podScalarResources = r.ScalarResources + + resourceClaims, err := pl.draManager.ResourceClaims().List() + if err != nil { + return nil, statusError(logger, err, "listing ResourceClaims") + } + + // Check if the special resource claim has been created from prior scheduling cycle. + // + // If it was already allocated earlier, that allocation might not be valid anymore. + // We could try to check that, but it depends on various factors that are difficult to + // cover (basically needs to replicate allocator logic) and if it turns out that the + // allocation is stale, we would have to schedule with those allocated devices not + // available for a new allocation. This situation should be rare (= binding failure), + // so we solve it via brute-force + // - Kick off deallocation in the background. + // - Mark the pod as unschedulable. Successful deallocation will make it schedulable again. + extendedResourceClaim := findExtendedResourceClaim(pod, resourceClaims) + if extendedResourceClaim != nil { + return extendedResourceClaim, nil + } + // Create one special claim for all extended resources backed by DRA in the Pod. + // Create the ResourceClaim with pod as owner, with a generated name that uses + // -extended-resources- as base. The final name will get truncated if it + // would be too long. + return &resourceapi.ResourceClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: pod.Namespace, + Name: specialClaimInMemName, + // fake temporary UID for use in SignalClaimPendingAllocation + UID: types.UID(uuid.NewUUID()), + GenerateName: pod.Name + "-extended-resources-", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "v1", + Kind: "Pod", + Name: pod.Name, + UID: pod.UID, + Controller: ptr.To(true), + }, + }, + Annotations: map[string]string{ + resourceapi.ExtendedResourceClaimAnnotation: "true", + }, + }, + Spec: resourceapi.ResourceClaimSpec{}, + }, nil +} + +// filterExtendedResources computes the special claim's Requests based on the +// node's Allocatable. It returns: +// - nil if nothing needs to be allocated, all the extended resources are satisfied by device plugin, or +// - the special claim updated to match what needs to be allocated through DRA for the node +// +// It returns an error when the pod's extended resource requests cannot be allocated +// from node's Allocatable, nor matching any device class's explicit or implicit +// ExtendedResourceName. +func (pl *DynamicResources) filterExtendedResources(state *stateData, pod *v1.Pod, nodeInfo fwk.NodeInfo, logger klog.Logger) (*resourceapi.ResourceClaim, []v1.ContainerExtendedResourceRequest, *fwk.Status) { + extendedResourceClaim := state.claims.extendedResourceClaim() + if extendedResourceClaim == nil { + // Nothing to do. + return nil, nil, nil + } + + // The claim is from the prior scheduling cycle, return unschedulable such that it can be + // deleted at the PostFilter phase, and retry anew. + if extendedResourceClaim.Spec.Devices.Requests != nil { + return nil, nil, statusUnschedulable(logger, "cannot schedule extended resource claim", "pod", klog.KObj(pod), "node", klog.KObj(nodeInfo.Node()), "claim", klog.KObj(extendedResourceClaim)) + } + + extendedResources := make(map[v1.ResourceName]int64) + hasExtendedResource := false + cache := pl.draManager.DeviceClassResolver() + for rName, rQuant := range state.draExtendedResource.podScalarResources { + if !schedutil.IsDRAExtendedResourceName(rName) { + continue + } + // Skip in case request quantity is zero + if rQuant == 0 { + continue + } + allocatable, okScalar := nodeInfo.GetAllocatable().GetScalarResources()[rName] + isBackedByDRA := cache.GetDeviceClass(rName) != nil + if isBackedByDRA && allocatable == 0 { + // node needs to provide the resource via DRA + extendedResources[rName] = rQuant + hasExtendedResource = true + } else if !okScalar { + // has request neither provided by device plugin, nor backed by DRA, + // hence the pod does not fit the node. + return nil, nil, statusUnschedulable(logger, "cannot fit resource", "pod", klog.KObj(pod), "node", klog.KObj(nodeInfo.Node()), "resource", rName) + } + } + // No extended resources backed by DRA on this node. + // The pod may have extended resources, but they are all backed by device + // plugin, hence the noderesources plugin should have checked if the node + // can fit the pod. + // This dynamic resources plugin Filter phase has nothing left to do. + if state.claims.noUserClaim() && !hasExtendedResource { + // It cannot be allocated when reaching here, as the claim from prior scheduling cycle + // would return unschedulable earlier in this function. + return nil, nil, nil + } + + if extendedResourceClaim.Status.Allocation != nil { + // If it is already allocated, then we cannot simply allocate it again. + // + // It cannot be allocated when reaching here, as the claim found from prior scheduling cycle + // would return unschedulable earlier in this function. + return nil, nil, nil + } + + // Each node needs its own, potentially different variant of the claim. + nodeExtendedResourceClaim := extendedResourceClaim.DeepCopy() + reqs, mappings := createRequestsAndMappings(pod, extendedResources, logger, cache) + nodeExtendedResourceClaim.Spec.Devices.Requests = reqs + + return nodeExtendedResourceClaim, mappings, nil +} + +// isSpecialClaimName return true when the name is the specialClaimInMemName. +func isSpecialClaimName(name string) bool { + return name == specialClaimInMemName +} + +// deleteClaim deletes the claim after removing the finalizer from the claim, if there is any. +func (pl *DynamicResources) deleteClaim(ctx context.Context, claim *resourceapi.ResourceClaim, logger klog.Logger) error { + refreshClaim := false + retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + if refreshClaim { + updatedClaim, err := pl.clientset.ResourceV1().ResourceClaims(claim.Namespace).Get(ctx, claim.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("get resourceclaim %s/%s: %w", claim.Namespace, claim.Name, err) + } + claim = updatedClaim + } else { + refreshClaim = true + } + // Remove the finalizer to unblock removal first. + builtinControllerFinalizer := slices.Index(claim.Finalizers, resourceapi.Finalizer) + if builtinControllerFinalizer >= 0 { + claim.Finalizers = slices.Delete(claim.Finalizers, builtinControllerFinalizer, builtinControllerFinalizer+1) + } + + _, err := pl.clientset.ResourceV1().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("update resourceclaim %s/%s: %w", claim.Namespace, claim.Name, err) + } + return nil + }) + if retryErr != nil { + return retryErr + } + + logger.V(5).Info("Delete", "resourceclaim", klog.KObj(claim)) + err := pl.clientset.ResourceV1().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{}) + if err != nil { + return err + } + return nil +} + +func partitionContainerIndices(containers []v1.Container, numInitContainers int) ([]int, []int) { + longLivedContainerIndices := make([]int, 0, len(containers)) + shortLivedInitContainerIndices := make([]int, 0, numInitContainers) + for i, c := range containers { + isInit := i < numInitContainers + isSidecar := c.RestartPolicy != nil && *c.RestartPolicy == v1.ContainerRestartPolicyAlways + if isInit && !isSidecar { + shortLivedInitContainerIndices = append(shortLivedInitContainerIndices, i) + continue + } + longLivedContainerIndices = append(longLivedContainerIndices, i) + } + return longLivedContainerIndices, shortLivedInitContainerIndices +} + +// createResourceRequestAndMappings returns the request and mappings for the given container and resource. +// reusableRequests is a list of other DeviceRequests this container can use before requesting its own. +// items in reusableRequests may be nil. +// The returned request may be nil if no additional request was required. +// The returned mappings may be empty if this container does not use this resource. +func createResourceRequestAndMappings(containerIndex int, container *v1.Container, rName v1.ResourceName, className string, reusableRequests []*resourceapi.DeviceRequest) (*resourceapi.DeviceRequest, []v1.ContainerExtendedResourceRequest) { + var mappings []v1.ContainerExtendedResourceRequest + creqs := container.Resources.Requests + if creqs == nil { + return nil, nil + } + var rQuant resource.Quantity + var ok bool + if rQuant, ok = creqs[rName]; !ok { + return nil, nil + } + crq, ok := (&rQuant).AsInt64() + if !ok || crq == 0 { + return nil, nil + } + sum := int64(0) + for _, r := range reusableRequests { + if r != nil { + sum += r.Exactly.Count + mappings = append(mappings, v1.ContainerExtendedResourceRequest{ + ContainerName: container.Name, + ResourceName: rName.String(), + RequestName: r.Name, + }) + if sum >= crq { + return nil, mappings + } + } + } + keys := make([]string, 0, len(creqs)) + for k := range creqs { + keys = append(keys, k.String()) + } + // resource requests in a container is a map, their names must + // be sorted to determine the resource's index order. + slice.SortStrings(keys) + ridx := 0 + for j := range keys { + if keys[j] == rName.String() { + ridx = j + break + } + } + // containerIndex is the index of the container in the list of initContainers + containers. + // ridx is the index of the extended resource request in the sorted all requests in the container. + // crq is the quantity of the extended resource request. + reqName := fmt.Sprintf("container-%d-request-%d", containerIndex, ridx) + deviceReq := resourceapi.DeviceRequest{ + Name: reqName, // need to be container name index - extended resource name index + Exactly: &resourceapi.ExactDeviceRequest{ + DeviceClassName: className, + AllocationMode: resourceapi.DeviceAllocationModeExactCount, + Count: crq - sum, // the extra devices to request + }, + } + mappings = append(mappings, v1.ContainerExtendedResourceRequest{ + ContainerName: container.Name, + ResourceName: rName.String(), + RequestName: reqName, + }) + + return &deviceReq, mappings +} + +func createRequestsAndMappings(pod *v1.Pod, extendedResources map[v1.ResourceName]int64, logger klog.Logger, deviceClassMapping fwk.DeviceClassResolver) ([]resourceapi.DeviceRequest, []v1.ContainerExtendedResourceRequest) { + containers := slices.Clone(pod.Spec.InitContainers) + containers = append(containers, pod.Spec.Containers...) + longLivedContainerIndices, shortLivedInitContainerIndices := partitionContainerIndices(containers, len(pod.Spec.InitContainers)) + + // all requests across all containers and resource types + var deviceRequests []resourceapi.DeviceRequest + // all mappings across all containers and resource types + var mappings []v1.ContainerExtendedResourceRequest + + // Sort resource names to ensure deterministic ordering of device requests and mappings. + // Maps have non-deterministic iteration order in Go, so we extract and sort the keys. + resourceNames := make([]v1.ResourceName, 0, len(extendedResources)) + for resource := range extendedResources { + resourceNames = append(resourceNames, resource) + } + sort.Slice(resourceNames, func(i, j int) bool { + return resourceNames[i] < resourceNames[j] + }) + + for _, resource := range resourceNames { + class := deviceClassMapping.GetDeviceClass(resource) + // skip if the resource does not map to a device class + if class == nil { + continue + } + + // shortLivedResourceMappings is the mapping of container+resource→request for short lived containers (init non-sidecar container) + var shortLivedResourceMappings []v1.ContainerExtendedResourceRequest + // longLivedResourceMappings is the mapping of container+resource→request for long lived containers (init sidecar or regular container) + var longLivedResourceMappings []v1.ContainerExtendedResourceRequest + + // longLivedResourceRequests is the list of requests for a given resource by long-lived containers. + // The length of this list is the same as the length of containers. + // Entries may be nil if the container at that index did not produce a request for that resource. + // Requests at later indices are reusable by non-sidecar initContainers at earlier indices. + longLivedResourceRequests := make([]*resourceapi.DeviceRequest, len(containers)) + for _, i := range longLivedContainerIndices { + containerRequest, containerMappings := createResourceRequestAndMappings(i, &containers[i], resource, class.Name, nil) + longLivedResourceRequests[i] = containerRequest // might be nil + longLivedResourceMappings = append(longLivedResourceMappings, containerMappings...) // might be zero-length + } + + // maxShortLivedResourceRequest is the maximum request for a given resource by short-lived containers + var maxShortLivedResourceRequest *resourceapi.DeviceRequest + // shortLivedRequestNames is all request names for a given resource by short-lived containers. All mappings to any name in + // this set will be replaced by maxShortLivedResourceRequest.Name. + shortLivedRequestNames := sets.New[string]() + for _, i := range shortLivedInitContainerIndices { + containerRequest, containerMappings := createResourceRequestAndMappings(i, &containers[i], resource, class.Name, longLivedResourceRequests[i:]) + if containerRequest != nil { + shortLivedRequestNames.Insert(containerRequest.Name) + if maxShortLivedResourceRequest == nil || maxShortLivedResourceRequest.Exactly.Count < containerRequest.Exactly.Count { + maxShortLivedResourceRequest = containerRequest + } + } + shortLivedResourceMappings = append(shortLivedResourceMappings, containerMappings...) // might be zero-length + } + + // rewrite mappings to short-lived requests to use the maximum short-lived request name + if maxShortLivedResourceRequest != nil && len(shortLivedRequestNames) > 1 { + shortLivedRequestNames.Delete(maxShortLivedResourceRequest.Name) + for i := range shortLivedResourceMappings { + if shortLivedRequestNames.Has(shortLivedResourceMappings[i].RequestName) { + shortLivedResourceMappings[i].RequestName = maxShortLivedResourceRequest.Name + } + } + } + + // append non-nil requests + if maxShortLivedResourceRequest != nil { + deviceRequests = append(deviceRequests, *maxShortLivedResourceRequest) + } + for _, request := range longLivedResourceRequests { + if request != nil { + deviceRequests = append(deviceRequests, *request) + } + } + // append mappings + mappings = append(mappings, longLivedResourceMappings...) + mappings = append(mappings, shortLivedResourceMappings...) + } + + sort.Slice(deviceRequests, func(i, j int) bool { + return deviceRequests[i].Name < deviceRequests[j].Name + }) + return deviceRequests, mappings +} + +// waitForExtendedClaimInAssumeCache polls the assume cache until the extended resource claim +// becomes visible. This is necessary because extended resource claims are created in the API +// server, and the informer update may not have reached the assume cache yet. +// +// AssumeClaimAfterAPICall returns ErrNotFound when the informer update hasn't arrived, +// so we poll with a timeout. +func (pl *DynamicResources) waitForExtendedClaimInAssumeCache( + ctx context.Context, + logger klog.Logger, + claim *resourceapi.ResourceClaim, +) { + pollErr := wait.PollUntilContextTimeout( + ctx, + 1*time.Second, + time.Duration(AssumeExtendedResourceTimeoutDefaultSeconds)*time.Second, + true, + func(ctx context.Context) (bool, error) { + if err := pl.draManager.ResourceClaims().AssumeClaimAfterAPICall(claim); err != nil { + if errors.Is(err, assumecache.ErrNotFound) { + return false, nil + } + logger.V(5).Info("Claim not stored in assume cache", "claim", klog.KObj(claim), "err", err) + return false, err + } + return true, nil + }, + ) + + if pollErr != nil { + logger.V(5).Info("Claim not stored in assume cache after retries", "claim", klog.KObj(claim), "err", pollErr) + // Note: We log but don't fail - the claim was created successfully + } +} + +// createExtendedResourceClaimInAPI creates an extended resource claim in the API server. +func (pl *DynamicResources) createExtendedResourceClaimInAPI( + ctx context.Context, + logger klog.Logger, + pod *v1.Pod, + nodeName string, + state *stateData, +) (*resourceapi.ResourceClaim, error) { + logger.V(5).Info("preparing to create claim for extended resources", "pod", klog.KObj(pod), "node", nodeName) + // Get the node-specific claim that was prepared during Filter phase + nodeAllocation, ok := state.nodeAllocations[nodeName] + if !ok || nodeAllocation.extendedResourceClaim == nil { + return nil, fmt.Errorf("extended resource claim not found for node %s", nodeName) + } + claim := nodeAllocation.extendedResourceClaim.DeepCopy() + + logger.V(5).Info("create claim for extended resources", "pod", klog.KObj(pod), "node", nodeName, "resourceclaim", klog.Format(claim)) + // Clear fields which must or can not be set during creation. + claim.Status.Allocation = nil + claim.Name = "" + claim.UID = "" + + createdClaim, err := pl.clientset.ResourceV1().ResourceClaims(claim.Namespace).Create(ctx, claim, metav1.CreateOptions{}) + if err != nil { + metrics.ResourceClaimCreatesTotal.WithLabelValues("failure").Inc() + return nil, fmt.Errorf("create claim for extended resources %v: %w", klog.KObj(claim), err) + } + metrics.ResourceClaimCreatesTotal.WithLabelValues("success").Inc() + logger.V(5).Info("created claim for extended resources", "pod", klog.KObj(pod), "node", nodeName, "resourceclaim", klog.Format(createdClaim)) + + return createdClaim, nil +} + +// patchPodExtendedResourceClaimStatus updates the pod's status with information about +// the extended resource claim. +func (pl *DynamicResources) patchPodExtendedResourceClaimStatus( + ctx context.Context, + pod *v1.Pod, + claim *resourceapi.ResourceClaim, + nodeName string, + state *stateData, +) error { + var cer []v1.ContainerExtendedResourceRequest + if nodeAllocation, ok := state.nodeAllocations[nodeName]; ok { + cer = nodeAllocation.containerResourceRequestMappings + } + if len(cer) == 0 { + return fmt.Errorf("nil or empty request mappings, no update of pod %s/%s ExtendedResourceClaimStatus", pod.Namespace, pod.Name) + } + + podStatusCopy := pod.Status.DeepCopy() + podStatusCopy.ExtendedResourceClaimStatus = &v1.PodExtendedResourceClaimStatus{ + RequestMappings: cer, + ResourceClaimName: claim.Name, + } + err := schedutil.PatchPodStatus(ctx, pl.clientset, pod.Name, pod.Namespace, &pod.Status, podStatusCopy) + if err != nil { + return fmt.Errorf("update pod %s/%s ExtendedResourceClaimStatus: %w", pod.Namespace, pod.Name, err) + } + return nil +} + +// unreserveExtendedResourceClaim cleans up the scheduler-owned extended resource claim +// when scheduling fails. It reverts the assume cache, and deletes the claim from the API +// server if it was already created. +func (pl *DynamicResources) unreserveExtendedResourceClaim(ctx context.Context, logger klog.Logger, pod *v1.Pod, state *stateData) { + extendedResourceClaim := state.claims.extendedResourceClaim() + if extendedResourceClaim == nil { + // there is no extended resource claim + return + } + + // If the claim was marked as pending allocation (in-flight), remove that marker and restore + // the assumed claim state to what it was before this scheduling attempt. + if deleted := pl.draManager.ResourceClaims().RemoveClaimPendingAllocation(state.claims.getInitialExtendedResourceClaimUID()); deleted { + pl.draManager.ResourceClaims().AssumedClaimRestore(extendedResourceClaim.Namespace, extendedResourceClaim.Name) + } + if isSpecialClaimName(extendedResourceClaim.Name) { + // In memory temporary extended resource claim does not need to be deleted + return + } + // Claim was written to API server, need to delete it to prevent orphaned resources. + logger.V(5).Info("delete extended resource backed by DRA", "resourceclaim", klog.KObj(extendedResourceClaim), "pod", klog.KObj(pod), "claim.UID", extendedResourceClaim.UID) + extendedResourceClaim = extendedResourceClaim.DeepCopy() + if err := pl.deleteClaim(ctx, extendedResourceClaim, logger); err != nil { + logger.Error(err, "delete", "resourceclaim", klog.KObj(extendedResourceClaim)) + } +} diff --git a/pkg/scheduler/framework/plugins/dynamicresources/extendeddynamicresources_test.go b/pkg/scheduler/framework/plugins/dynamicresources/extendeddynamicresources_test.go new file mode 100644 index 00000000000..d3c39c6172e --- /dev/null +++ b/pkg/scheduler/framework/plugins/dynamicresources/extendeddynamicresources_test.go @@ -0,0 +1,565 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamicresources + +import ( + "sort" + "testing" + + v1 "k8s.io/api/core/v1" + resourceapi "k8s.io/api/resource/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + fwk "k8s.io/kube-scheduler/framework" + st "k8s.io/kubernetes/pkg/scheduler/testing" + "k8s.io/kubernetes/test/utils/ktesting" +) + +func Test_createRequestsAndMappings_requests(t *testing.T) { + pod1 := st.MakePod().Name(podName).Namespace(namespace). + UID(podUID). + Res(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName): "1", + v1.ResourceName(extendedResourceName + "1"): "2", + }). + Obj() + pod2 := st.MakePod().Name(podName).Namespace(namespace). + UID(podUID). + Res(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName): "1", + }). + Res(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName + "1"): "2", + }). + Obj() + + podInit := st.MakePod().Name(podName).Namespace(namespace). + UID(podUID). + Res(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName): "1", + }). + InitReq(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName + "init"): "2", + }). + Obj() + podInit2 := st.MakePod().Name(podName).Namespace(namespace). + UID(podUID). + Res(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName): "1", + }). + InitReq(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName + "init"): "1", + }). + InitReq(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName + "init"): "2", + }). + Obj() + podInit3 := st.MakePod().Name(podName).Namespace(namespace). + UID(podUID). + Res(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName): "1", + }). + InitReq(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName + "init"): "1", + }). + SidecarReq(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName + "init"): "1", + }). + InitReq(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName + "init"): "2", + }). + Obj() + + res := map[v1.ResourceName]int64{ + v1.ResourceName(extendedResourceName): 1, + } + res2 := map[v1.ResourceName]int64{ + v1.ResourceName(extendedResourceName): 1, + v1.ResourceName(extendedResourceName + "1"): 2, + } + resInit := map[v1.ResourceName]int64{ + v1.ResourceName(extendedResourceName): 1, + v1.ResourceName(extendedResourceName + "init"): 2, + } + devMap := map[v1.ResourceName]*resourceapi.DeviceClass{ + v1.ResourceName(extendedResourceName): { + ObjectMeta: metav1.ObjectMeta{ + Name: "class", + }, + }, + } + devMap2 := map[v1.ResourceName]*resourceapi.DeviceClass{ + v1.ResourceName(extendedResourceName): { + ObjectMeta: metav1.ObjectMeta{ + Name: "class", + }, + }, + v1.ResourceName(extendedResourceName + "1"): { + ObjectMeta: metav1.ObjectMeta{ + Name: "class1", + }, + }, + } + devMapInit := map[v1.ResourceName]*resourceapi.DeviceClass{ + v1.ResourceName(extendedResourceName): { + ObjectMeta: metav1.ObjectMeta{ + Name: "class", + }, + }, + v1.ResourceName(extendedResourceName + "init"): { + ObjectMeta: metav1.ObjectMeta{ + Name: "classInit", + }, + }, + } + devReq := resourceapi.DeviceRequest{ + Name: "container-0-request-0", + Exactly: &resourceapi.ExactDeviceRequest{ + DeviceClassName: "class", + AllocationMode: resourceapi.DeviceAllocationModeExactCount, + Count: 1, + }, + } + devReq2 := resourceapi.DeviceRequest{ + Name: "container-0-request-1", + Exactly: &resourceapi.ExactDeviceRequest{ + DeviceClassName: "class1", + AllocationMode: resourceapi.DeviceAllocationModeExactCount, + Count: 2, + }, + } + devReq3 := resourceapi.DeviceRequest{ + Name: "container-1-request-0", + Exactly: &resourceapi.ExactDeviceRequest{ + DeviceClassName: "class1", + AllocationMode: resourceapi.DeviceAllocationModeExactCount, + Count: 2, + }, + } + devReqInit := resourceapi.DeviceRequest{ + Name: "container-1-request-0", + Exactly: &resourceapi.ExactDeviceRequest{ + DeviceClassName: "class", + AllocationMode: resourceapi.DeviceAllocationModeExactCount, + Count: 1, + }, + } + devReqSidecar := resourceapi.DeviceRequest{ + Name: "container-1-request-0", + Exactly: &resourceapi.ExactDeviceRequest{ + DeviceClassName: "classInit", + AllocationMode: resourceapi.DeviceAllocationModeExactCount, + Count: 1, + }, + } + devReq2Init := resourceapi.DeviceRequest{ + Name: "container-1-request-0", + Exactly: &resourceapi.ExactDeviceRequest{ + DeviceClassName: "classInit", + AllocationMode: resourceapi.DeviceAllocationModeExactCount, + Count: 2, + }, + } + devReq6Init := resourceapi.DeviceRequest{ + Name: "container-0-request-0", + Exactly: &resourceapi.ExactDeviceRequest{ + DeviceClassName: "classInit", + AllocationMode: resourceapi.DeviceAllocationModeExactCount, + Count: 2, + }, + } + devReq3Init := resourceapi.DeviceRequest{ + Name: "container-2-request-0", + Exactly: &resourceapi.ExactDeviceRequest{ + DeviceClassName: "class", + AllocationMode: resourceapi.DeviceAllocationModeExactCount, + Count: 1, + }, + } + devReq4Init := resourceapi.DeviceRequest{ + Name: "container-3-request-0", + Exactly: &resourceapi.ExactDeviceRequest{ + DeviceClassName: "class", + AllocationMode: resourceapi.DeviceAllocationModeExactCount, + Count: 1, + }, + } + devReq5Init := resourceapi.DeviceRequest{ + Name: "container-2-request-0", + Exactly: &resourceapi.ExactDeviceRequest{ + DeviceClassName: "classInit", + AllocationMode: resourceapi.DeviceAllocationModeExactCount, + Count: 2, + }, + } + + testcases := map[string]struct { + pod *v1.Pod + extendedResources map[v1.ResourceName]int64 + cache fwk.DeviceClassResolver + wantDeviceRequests []resourceapi.DeviceRequest + }{ + "nil": { + pod: pod1, + wantDeviceRequests: nil, + }, + "one resource match": { + pod: pod1, + extendedResources: res, + cache: &mockDeviceClassResolver{mapping: devMap}, + wantDeviceRequests: []resourceapi.DeviceRequest{devReq}, + }, + "one resource match, one resource not match": { + pod: pod1, + extendedResources: res2, + cache: &mockDeviceClassResolver{mapping: devMap}, + wantDeviceRequests: []resourceapi.DeviceRequest{devReq}, + }, + "two resources match": { + pod: pod1, + extendedResources: res2, + cache: &mockDeviceClassResolver{mapping: devMap2}, + wantDeviceRequests: []resourceapi.DeviceRequest{devReq, devReq2}, + }, + "two containers match": { + pod: pod2, + extendedResources: res2, + cache: &mockDeviceClassResolver{mapping: devMap2}, + wantDeviceRequests: []resourceapi.DeviceRequest{devReq, devReq3}, + }, + "one init container, one regular container": { + pod: podInit, + extendedResources: resInit, + cache: &mockDeviceClassResolver{mapping: devMapInit}, + wantDeviceRequests: []resourceapi.DeviceRequest{devReq6Init, devReqInit}, + }, + "two init containers, one regular container": { + pod: podInit2, + extendedResources: resInit, + cache: &mockDeviceClassResolver{mapping: devMapInit}, + wantDeviceRequests: []resourceapi.DeviceRequest{devReq2Init, devReq3Init}, + }, + "three init containers, one sidecar, one regular container": { + pod: podInit3, + extendedResources: resInit, + cache: &mockDeviceClassResolver{mapping: devMapInit}, + wantDeviceRequests: []resourceapi.DeviceRequest{devReqSidecar, devReq5Init, devReq4Init}, + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + gotDeviceRequests, _ := createRequestsAndMappings(tc.pod, tc.extendedResources, logger, tc.cache) + if len(tc.wantDeviceRequests) != len(gotDeviceRequests) { + t.Fatalf("different length, want %#v, len=%v, got %#v, len=%v", tc.wantDeviceRequests, len(tc.wantDeviceRequests), gotDeviceRequests, len(gotDeviceRequests)) + } + // gotDeviceRequests should already be sorted by createRequestsAndMappings + for i, r := range tc.wantDeviceRequests { + if r.Name != gotDeviceRequests[i].Name { + t.Errorf("different name, want %#v, got %#v", r, gotDeviceRequests[i]) + } + if r.Exactly.DeviceClassName != gotDeviceRequests[i].Exactly.DeviceClassName { + t.Errorf("different deviceClassName, want %#v, got %#v", r, gotDeviceRequests[i]) + } + if r.Exactly.AllocationMode != gotDeviceRequests[i].Exactly.AllocationMode { + t.Errorf("different allocationMode, want %#v, got %#v", r, gotDeviceRequests[i]) + } + if r.Exactly.Count != gotDeviceRequests[i].Exactly.Count { + t.Errorf("different count, want %#v, got %#v", r.Exactly.Count, gotDeviceRequests[i].Exactly.Count) + } + } + }) + } +} + +func Test_createRequestsAndMappings_mappings(t *testing.T) { + pod1 := st.MakePod().Name(podName).Namespace(namespace). + UID(podUID). + Res(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName): "1", + v1.ResourceName(extendedResourceName + "1"): "2", + }). + Obj() + pod1InitImplicit := st.MakePod().Name(podName).Namespace(namespace). + UID(podUID). + InitReq(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName + "init"): "1", + v1.ResourceName(resourceapi.ResourceDeviceClassPrefix + "classInit"): "2", + }). + Obj() + pod2 := st.MakePod().Name(podName).Namespace(namespace). + UID(podUID). + Res(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName): "1", + }). + Res(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName + "1"): "2", + }). + Obj() + + podInit := st.MakePod().Name(podName).Namespace(namespace). + UID(podUID). + Res(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName): "1", + }). + InitReq(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName + "init"): "2", + }). + Obj() + podInit2 := st.MakePod().Name(podName).Namespace(namespace). + UID(podUID). + Res(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName): "1", + }). + InitReq(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName + "init"): "1", + }). + InitReq(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName + "init"): "2", + }). + Obj() + podInitImplicit := st.MakePod().Name(podName).Namespace(namespace). + UID(podUID). + Res(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName): "1", + }). + InitReq(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName + "init"): "1", + }). + InitReq(map[v1.ResourceName]string{ + v1.ResourceName(resourceapi.ResourceDeviceClassPrefix + "classInit"): "2", + }). + Obj() + podInit3 := st.MakePod().Name(podName).Namespace(namespace). + UID(podUID). + Res(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName): "1", + }). + InitReq(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName + "init"): "1", + }). + SidecarReq(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName + "init"): "1", + }). + InitReq(map[v1.ResourceName]string{ + v1.ResourceName(extendedResourceName + "init"): "2", + }). + Obj() + res := map[v1.ResourceName]int64{ + v1.ResourceName(extendedResourceName): 1, + v1.ResourceName(extendedResourceName + "1"): 2, + } + resInit := map[v1.ResourceName]int64{ + v1.ResourceName(extendedResourceName): 1, + v1.ResourceName(extendedResourceName + "init"): 2, + } + resInitImplicit := map[v1.ResourceName]int64{ + v1.ResourceName(extendedResourceName): 1, + v1.ResourceName(extendedResourceName + "init"): 2, + v1.ResourceName(resourceapi.ResourceDeviceClassPrefix + "classInit"): 2, + } + devMap := map[v1.ResourceName]*resourceapi.DeviceClass{ + v1.ResourceName(extendedResourceName): { + ObjectMeta: metav1.ObjectMeta{ + Name: "class", + }, + }, + v1.ResourceName(extendedResourceName + "1"): { + ObjectMeta: metav1.ObjectMeta{ + Name: "class1", + }, + }, + } + devMapInit := map[v1.ResourceName]*resourceapi.DeviceClass{ + v1.ResourceName(extendedResourceName): { + ObjectMeta: metav1.ObjectMeta{ + Name: "class", + }, + }, + v1.ResourceName(extendedResourceName + "init"): { + ObjectMeta: metav1.ObjectMeta{ + Name: "classInit", + }, + }, + v1.ResourceName(resourceapi.ResourceDeviceClassPrefix + "classInit"): { + ObjectMeta: metav1.ObjectMeta{ + Name: "classInit", + }, + }, + } + cer := v1.ContainerExtendedResourceRequest{ + ContainerName: "con0", + ResourceName: extendedResourceName, + RequestName: "container-0-request-0", + } + cer1 := v1.ContainerExtendedResourceRequest{ + ContainerName: "con0", + ResourceName: extendedResourceName + "1", + RequestName: "container-0-request-1", + } + cer2 := v1.ContainerExtendedResourceRequest{ + ContainerName: "con1", + ResourceName: extendedResourceName + "1", + RequestName: "container-1-request-0", + } + cer3 := v1.ContainerExtendedResourceRequest{ + ContainerName: "con0", + ResourceName: extendedResourceName, + RequestName: "container-1-request-0", + } + cer4 := v1.ContainerExtendedResourceRequest{ + ContainerName: "con0", + ResourceName: extendedResourceName, + RequestName: "container-2-request-0", + } + cer5 := v1.ContainerExtendedResourceRequest{ + ContainerName: "con0", + ResourceName: extendedResourceName, + RequestName: "container-3-request-0", + } + cerInit := v1.ContainerExtendedResourceRequest{ + ContainerName: "init-con0", + ResourceName: extendedResourceName + "init", + RequestName: "container-1-request-0", + } + cerInit0 := v1.ContainerExtendedResourceRequest{ + ContainerName: "init-con0", + ResourceName: extendedResourceName + "init", + RequestName: "container-0-request-0", + } + cerInit1 := v1.ContainerExtendedResourceRequest{ + ContainerName: "init-con0", + ResourceName: extendedResourceName + "init", + RequestName: "container-1-request-0", + } + cerInit2 := v1.ContainerExtendedResourceRequest{ + ContainerName: "init-con1", + ResourceName: extendedResourceName + "init", + RequestName: "container-1-request-0", + } + cerInit3 := v1.ContainerExtendedResourceRequest{ + ContainerName: "init-con2", + ResourceName: extendedResourceName + "init", + RequestName: "container-2-request-0", + } + cerSidecar := v1.ContainerExtendedResourceRequest{ + ContainerName: "sidecar-con1", + ResourceName: extendedResourceName + "init", + RequestName: "container-1-request-0", + } + + cerInitImplicit := v1.ContainerExtendedResourceRequest{ + ContainerName: "init-con0", + ResourceName: extendedResourceName + "init", + RequestName: "container-0-request-0", + } + cerInit4Implicit := v1.ContainerExtendedResourceRequest{ + ContainerName: "init-con0", + ResourceName: extendedResourceName + "init", + RequestName: "container-0-request-1", + } + cerInit2Implicit := v1.ContainerExtendedResourceRequest{ + ContainerName: "init-con1", + ResourceName: resourceapi.ResourceDeviceClassPrefix + "classInit", + RequestName: "container-1-request-0", + } + cerInit3Implicit := v1.ContainerExtendedResourceRequest{ + ContainerName: "init-con0", + ResourceName: resourceapi.ResourceDeviceClassPrefix + "classInit", + RequestName: "container-0-request-0", + } + + testcases := map[string]struct { + pod *v1.Pod + extnededResources map[v1.ResourceName]int64 + deviceClassMapping fwk.DeviceClassResolver + wantReqMappings []v1.ContainerExtendedResourceRequest + }{ + "one container, two requests": { + pod: pod1, + extnededResources: res, + deviceClassMapping: &mockDeviceClassResolver{devMap}, + wantReqMappings: []v1.ContainerExtendedResourceRequest{cer, cer1}, + }, + "one container, one explicit and one implicit request": { + pod: pod1InitImplicit, + extnededResources: resInitImplicit, + deviceClassMapping: &mockDeviceClassResolver{devMapInit}, + wantReqMappings: []v1.ContainerExtendedResourceRequest{cerInit3Implicit, cerInit4Implicit}, + }, + "two containers, two requests": { + pod: pod2, + extnededResources: res, + deviceClassMapping: &mockDeviceClassResolver{devMap}, + wantReqMappings: []v1.ContainerExtendedResourceRequest{cer, cer2}, + }, + "one init container, one regular container, one request": { + pod: podInit, + extnededResources: resInit, + deviceClassMapping: &mockDeviceClassResolver{devMapInit}, + wantReqMappings: []v1.ContainerExtendedResourceRequest{cerInit0, cer3}, + }, + "three containers (two are init container), two requests": { + pod: podInit2, + extnededResources: resInit, + deviceClassMapping: &mockDeviceClassResolver{devMapInit}, + wantReqMappings: []v1.ContainerExtendedResourceRequest{cerInit, cerInit2, cer4}, + }, + "three containers (two are init container), both explicit and implicit resources": { + pod: podInitImplicit, + extnededResources: resInitImplicit, + deviceClassMapping: &mockDeviceClassResolver{devMapInit}, + wantReqMappings: []v1.ContainerExtendedResourceRequest{cerInitImplicit, cerInit2Implicit, cer4}, + }, + "four containers (two are init container, one sidecar), three requests": { + pod: podInit3, + extnededResources: resInit, + deviceClassMapping: &mockDeviceClassResolver{devMapInit}, + wantReqMappings: []v1.ContainerExtendedResourceRequest{cerInit1, cerSidecar, cerInit3, cer5}, + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + _, gotReqMappings := createRequestsAndMappings(tc.pod, tc.extnededResources, logger, tc.deviceClassMapping) + if len(tc.wantReqMappings) != len(gotReqMappings) { + t.Fatalf("different length, want %#v, got %#v", tc.wantReqMappings, gotReqMappings) + } + sort.Slice(gotReqMappings, func(i, j int) bool { + if gotReqMappings[i].RequestName < gotReqMappings[j].RequestName { + return true + } + if gotReqMappings[i].RequestName > gotReqMappings[j].RequestName { + return false + } + return gotReqMappings[i].ContainerName < gotReqMappings[j].ContainerName + }) + for i, r := range tc.wantReqMappings { + if r.RequestName != gotReqMappings[i].RequestName { + t.Errorf("different request name, want %#v, got %#v", r, gotReqMappings[i]) + } + if r.ContainerName != gotReqMappings[i].ContainerName { + t.Errorf("different container name, want %#v, got %#v", r, gotReqMappings[i]) + } + if r.ResourceName != gotReqMappings[i].ResourceName { + t.Errorf("different resource name, want %#v, got %#v", r, gotReqMappings[i]) + } + } + }) + } +}