mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-06-10 09:22:55 -04:00
Merge pull request #134935 from alaypatel07/refactor-dra-extended-resources
refactor dra extended resources implementation in scheduler plugin
This commit is contained in:
commit
6232175b94
4 changed files with 1188 additions and 1017 deletions
|
|
@ -22,7 +22,6 @@ import (
|
|||
"fmt"
|
||||
"iter"
|
||||
"slices"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
|
@ -32,17 +31,14 @@ import (
|
|||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/diff"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/util/retry"
|
||||
resourcehelper "k8s.io/component-helpers/resource"
|
||||
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
|
||||
"k8s.io/dynamic-resource-allocation/cel"
|
||||
"k8s.io/dynamic-resource-allocation/resourceclaim"
|
||||
|
|
@ -51,14 +47,10 @@ import (
|
|||
fwk "k8s.io/kube-scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
|
||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
|
||||
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
|
||||
"k8s.io/kubernetes/pkg/util/slice"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
|
|
@ -67,18 +59,6 @@ const (
|
|||
Name = names.DynamicResources
|
||||
|
||||
stateKey fwk.StateKey = Name
|
||||
|
||||
// specialClaimInMemName is the name of the special resource claim that
|
||||
// exists only in memory. The claim will get a generated name when it is
|
||||
// written to API server.
|
||||
//
|
||||
// It's intentionally not a valid ResourceClaim name to avoid conflicts with
|
||||
// some actual ResourceClaim in the apiserver.
|
||||
specialClaimInMemName = "<extended-resources>"
|
||||
|
||||
// AssumeExtendedResourceTimeoutDefaultSeconds is the default timeout for waiting
|
||||
// for the extended resource claim to be updated in assumed cache.
|
||||
AssumeExtendedResourceTimeoutDefaultSeconds = 120
|
||||
)
|
||||
|
||||
// The state is initialized in PreFilter phase. Because we save the pointer in
|
||||
|
|
@ -133,13 +113,6 @@ func (d *stateData) Clone() fwk.StateData {
|
|||
return d
|
||||
}
|
||||
|
||||
// draExtendedResource stores data for extended resources backed by DRA.
|
||||
// It will remain empty when the DRAExtendedResource feature is disabled.
|
||||
type draExtendedResource struct {
|
||||
// May have extended resource backed by DRA.
|
||||
podScalarResources map[v1.ResourceName]int64
|
||||
}
|
||||
|
||||
type informationForClaim struct {
|
||||
// Node selector based on the claim status if allocated.
|
||||
availableOnNodes *nodeaffinity.NodeSelector
|
||||
|
|
@ -412,116 +385,6 @@ func (pl *DynamicResources) foreachPodResourceClaim(pod *v1.Pod, cb func(podReso
|
|||
return nil
|
||||
}
|
||||
|
||||
// hasDeviceClassMappedExtendedResource returns true when the given resource list has an extended resource, that has
|
||||
// a mapping to a device class.
|
||||
func hasDeviceClassMappedExtendedResource(reqs v1.ResourceList, cache fwk.DeviceClassResolver) bool {
|
||||
for rName, rValue := range reqs {
|
||||
if rValue.IsZero() {
|
||||
// We only care about the resources requested by the pod we are trying to schedule.
|
||||
continue
|
||||
}
|
||||
if schedutil.IsDRAExtendedResourceName(rName) {
|
||||
if cache.GetDeviceClass(rName) != nil {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// findExtendedResourceClaim looks for the extended resource claim, i.e., the claim with special annotation
|
||||
// set to "true", and with the pod as owner. It must be called with all ResourceClaims in the cluster.
|
||||
// The returned ResourceClaim is read-only.
|
||||
func findExtendedResourceClaim(pod *v1.Pod, resourceClaims []*resourceapi.ResourceClaim) *resourceapi.ResourceClaim {
|
||||
for _, c := range resourceClaims {
|
||||
if c.Annotations[resourceapi.ExtendedResourceClaimAnnotation] == "true" {
|
||||
for _, or := range c.OwnerReferences {
|
||||
if or.Name == pod.Name && *or.Controller && or.UID == pod.UID {
|
||||
return c
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// preFilterExtendedResources checks if there is any extended resource in the
|
||||
// pod requests that has a device class mapping, i.e., there is a device class
|
||||
// that has spec.ExtendedResourceName or its implicit extended resource name
|
||||
// matching the given extended resource in that pod requests.
|
||||
//
|
||||
// It looks for the special resource claim for the pod created from prior scheduling
|
||||
// cycle. If not found, it creates the special claim with no Requests in the Spec,
|
||||
// with a temporary UID, and the specialClaimInMemName name.
|
||||
// Either way, the special claim is stored in state.claims.
|
||||
//
|
||||
// In addition, draExtendedResource is also stored in the cycle state.
|
||||
//
|
||||
// It returns the special ResourceClaim and an error status. It returns nil for both
|
||||
// if the feature is disabled or not required for the Pod.
|
||||
func (pl *DynamicResources) preFilterExtendedResources(pod *v1.Pod, logger klog.Logger, s *stateData) (*resourceapi.ResourceClaim, *fwk.Status) {
|
||||
if !pl.fts.EnableDRAExtendedResource {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Try to build device class mapping from cache
|
||||
cache := pl.draManager.DeviceClassResolver()
|
||||
reqs := resourcehelper.PodRequests(pod, resourcehelper.PodResourcesOptions{})
|
||||
|
||||
hasExtendedResource := hasDeviceClassMappedExtendedResource(reqs, cache)
|
||||
if !hasExtendedResource {
|
||||
return nil, nil
|
||||
}
|
||||
r := framework.NewResource(reqs)
|
||||
s.draExtendedResource.podScalarResources = r.ScalarResources
|
||||
|
||||
resourceClaims, err := pl.draManager.ResourceClaims().List()
|
||||
if err != nil {
|
||||
return nil, statusError(logger, err, "listing ResourceClaims")
|
||||
}
|
||||
|
||||
// Check if the special resource claim has been created from prior scheduling cycle.
|
||||
//
|
||||
// If it was already allocated earlier, that allocation might not be valid anymore.
|
||||
// We could try to check that, but it depends on various factors that are difficult to
|
||||
// cover (basically needs to replicate allocator logic) and if it turns out that the
|
||||
// allocation is stale, we would have to schedule with those allocated devices not
|
||||
// available for a new allocation. This situation should be rare (= binding failure),
|
||||
// so we solve it via brute-force
|
||||
// - Kick off deallocation in the background.
|
||||
// - Mark the pod as unschedulable. Successful deallocation will make it schedulable again.
|
||||
extendedResourceClaim := findExtendedResourceClaim(pod, resourceClaims)
|
||||
if extendedResourceClaim == nil {
|
||||
// Create one special claim for all extended resources backed by DRA in the Pod.
|
||||
// Create the ResourceClaim with pod as owner, with a generated name that uses
|
||||
// <pod name>-extended-resources- as base. The final name will get truncated if it
|
||||
// would be too long.
|
||||
extendedResourceClaim = &resourceapi.ResourceClaim{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: pod.Namespace,
|
||||
Name: specialClaimInMemName,
|
||||
// fake temporary UID for use in SignalClaimPendingAllocation
|
||||
UID: types.UID(uuid.NewUUID()),
|
||||
GenerateName: pod.Name + "-extended-resources-",
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
{
|
||||
APIVersion: "v1",
|
||||
Kind: "Pod",
|
||||
Name: pod.Name,
|
||||
UID: pod.UID,
|
||||
Controller: ptr.To(true),
|
||||
},
|
||||
},
|
||||
Annotations: map[string]string{
|
||||
resourceapi.ExtendedResourceClaimAnnotation: "true",
|
||||
},
|
||||
},
|
||||
Spec: resourceapi.ResourceClaimSpec{},
|
||||
}
|
||||
}
|
||||
return extendedResourceClaim, nil
|
||||
}
|
||||
|
||||
// PreFilter invoked at the prefilter extension point to check if pod has all
|
||||
// immediate claims bound. UnschedulableAndUnresolvable is returned if
|
||||
// the pod cannot be scheduled at the moment on any node.
|
||||
|
|
@ -725,76 +588,6 @@ func getStateData(cs fwk.CycleState) (*stateData, error) {
|
|||
return s, nil
|
||||
}
|
||||
|
||||
// filterExtendedResources computes the special claim's Requests based on the
|
||||
// node's Allocatable. It returns the special claim updated to match what needs
|
||||
// to be allocated through DRA for the node or nil if nothing needs to be allocated.
|
||||
//
|
||||
// It returns an error when the pod's extended resource requests cannot be allocated
|
||||
// from node's Allocatable, nor matching any device class's explicit or implicit
|
||||
// ExtendedResourceName.
|
||||
func (pl *DynamicResources) filterExtendedResources(state *stateData, pod *v1.Pod, nodeInfo fwk.NodeInfo, logger klog.Logger) (*resourceapi.ResourceClaim, []v1.ContainerExtendedResourceRequest, *fwk.Status) {
|
||||
extendedResourceClaim := state.claims.extendedResourceClaim()
|
||||
if extendedResourceClaim == nil {
|
||||
// Nothing to do.
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
// The claim is from the prior scheduling cycle, return unschedulable such that it can be
|
||||
// deleted at the PostFilter phase, and retry anew.
|
||||
if extendedResourceClaim.Spec.Devices.Requests != nil {
|
||||
return nil, nil, statusUnschedulable(logger, "cannot schedule extended resource claim", "pod", klog.KObj(pod), "node", klog.KObj(nodeInfo.Node()), "claim", klog.KObj(extendedResourceClaim))
|
||||
}
|
||||
|
||||
extendedResources := make(map[v1.ResourceName]int64)
|
||||
hasExtendedResource := false
|
||||
cache := pl.draManager.DeviceClassResolver()
|
||||
for rName, rQuant := range state.draExtendedResource.podScalarResources {
|
||||
if !schedutil.IsDRAExtendedResourceName(rName) {
|
||||
continue
|
||||
}
|
||||
// Skip in case request quantity is zero
|
||||
if rQuant == 0 {
|
||||
continue
|
||||
}
|
||||
allocatable, okScalar := nodeInfo.GetAllocatable().GetScalarResources()[rName]
|
||||
isBackedByDRA := cache.GetDeviceClass(rName) != nil
|
||||
if isBackedByDRA && allocatable == 0 {
|
||||
// node needs to provide the resource via DRA
|
||||
extendedResources[rName] = rQuant
|
||||
hasExtendedResource = true
|
||||
} else if !okScalar {
|
||||
// has request neither provided by device plugin, nor backed by DRA,
|
||||
// hence the pod does not fit the node.
|
||||
return nil, nil, statusUnschedulable(logger, "cannot fit resource", "pod", klog.KObj(pod), "node", klog.KObj(nodeInfo.Node()), "resource", rName)
|
||||
}
|
||||
}
|
||||
// No extended resources backed by DRA on this node.
|
||||
// The pod may have extended resources, but they are all backed by device
|
||||
// plugin, hence the noderesources plugin should have checked if the node
|
||||
// can fit the pod.
|
||||
// This dynamic resources plugin Filter phase has nothing left to do.
|
||||
if state.claims.noUserClaim() && !hasExtendedResource {
|
||||
// It cannot be allocated when reaching here, as the claim from prior scheduling cycle
|
||||
// would return unschedulable earlier in this function.
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
if extendedResourceClaim.Status.Allocation != nil {
|
||||
// If it is already allocated, then we cannot simply allocate it again.
|
||||
//
|
||||
// It cannot be allocated when reaching here, as the claim found from prior scheduling cycle
|
||||
// would return unschedulable earlier in this function.
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
// Each node needs its own, potentially different variant of the claim.
|
||||
nodeExtendedResourceClaim := extendedResourceClaim.DeepCopy()
|
||||
reqs, mappings := createRequestsAndMappings(pod, extendedResources, logger, cache)
|
||||
nodeExtendedResourceClaim.Spec.Devices.Requests = reqs
|
||||
|
||||
return nodeExtendedResourceClaim, mappings, nil
|
||||
}
|
||||
|
||||
// Filter invoked at the filter extension point.
|
||||
// It evaluates if a pod can fit due to the resources it requests,
|
||||
// for both allocated and unallocated claims.
|
||||
|
|
@ -942,11 +735,6 @@ func (pl *DynamicResources) Filter(ctx context.Context, cs fwk.CycleState, pod *
|
|||
return nil
|
||||
}
|
||||
|
||||
// isSpecialClaimName return true when the name is the specialClaimInMemName.
|
||||
func isSpecialClaimName(name string) bool {
|
||||
return name == specialClaimInMemName
|
||||
}
|
||||
|
||||
// PostFilter checks whether there are allocated claims that could get
|
||||
// deallocated to help get the Pod schedulable. If yes, it picks one and
|
||||
// requests its deallocation. This only gets called when filtering found no
|
||||
|
|
@ -1215,62 +1003,7 @@ func (pl *DynamicResources) Unreserve(ctx context.Context, cs fwk.CycleState, po
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
extendedResourceClaim := state.claims.extendedResourceClaim()
|
||||
if extendedResourceClaim == nil {
|
||||
// there is no extended resource claim
|
||||
return
|
||||
}
|
||||
|
||||
if deleted := pl.draManager.ResourceClaims().RemoveClaimPendingAllocation(state.claims.getInitialExtendedResourceClaimUID()); deleted {
|
||||
pl.draManager.ResourceClaims().AssumedClaimRestore(extendedResourceClaim.Namespace, extendedResourceClaim.Name)
|
||||
}
|
||||
if isSpecialClaimName(extendedResourceClaim.Name) {
|
||||
// In memory temporary extended resource claim does not need to be deleted
|
||||
return
|
||||
}
|
||||
logger.V(5).Info("delete extended resource backed by DRA", "resourceclaim", klog.KObj(extendedResourceClaim), "pod", klog.KObj(pod), "claim.UID", extendedResourceClaim.UID)
|
||||
extendedResourceClaim = extendedResourceClaim.DeepCopy()
|
||||
if err := pl.deleteClaim(ctx, extendedResourceClaim, logger); err != nil {
|
||||
logger.Error(err, "delete", "resourceclaim", klog.KObj(extendedResourceClaim))
|
||||
}
|
||||
}
|
||||
|
||||
// deleteClaim deletes the claim after removing the finalizer from the claim, if there is any.
|
||||
func (pl *DynamicResources) deleteClaim(ctx context.Context, claim *resourceapi.ResourceClaim, logger klog.Logger) error {
|
||||
refreshClaim := false
|
||||
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
if refreshClaim {
|
||||
updatedClaim, err := pl.clientset.ResourceV1().ResourceClaims(claim.Namespace).Get(ctx, claim.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("get resourceclaim %s/%s: %w", claim.Namespace, claim.Name, err)
|
||||
}
|
||||
claim = updatedClaim
|
||||
} else {
|
||||
refreshClaim = true
|
||||
}
|
||||
// Remove the finalizer to unblock removal first.
|
||||
builtinControllerFinalizer := slices.Index(claim.Finalizers, resourceapi.Finalizer)
|
||||
if builtinControllerFinalizer >= 0 {
|
||||
claim.Finalizers = slices.Delete(claim.Finalizers, builtinControllerFinalizer, builtinControllerFinalizer+1)
|
||||
}
|
||||
|
||||
_, err := pl.clientset.ResourceV1().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("update resourceclaim %s/%s: %w", claim.Namespace, claim.Name, err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if retryErr != nil {
|
||||
return retryErr
|
||||
}
|
||||
|
||||
logger.V(5).Info("Delete", "resourceclaim", klog.KObj(claim))
|
||||
err := pl.clientset.ResourceV1().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
pl.unreserveExtendedResourceClaim(ctx, logger, pod, state)
|
||||
}
|
||||
|
||||
// PreBind gets called in a separate goroutine after it has been determined
|
||||
|
|
@ -1357,169 +1090,6 @@ func (pl *DynamicResources) PreBindPreFlight(ctx context.Context, cs fwk.CycleSt
|
|||
return nil
|
||||
}
|
||||
|
||||
func partitionContainerIndices(containers []v1.Container, numInitContainers int) ([]int, []int) {
|
||||
longLivedContainerIndices := make([]int, 0, len(containers))
|
||||
shortLivedInitContainerIndices := make([]int, 0, numInitContainers)
|
||||
for i, c := range containers {
|
||||
isInit := i < numInitContainers
|
||||
isSidecar := c.RestartPolicy != nil && *c.RestartPolicy == v1.ContainerRestartPolicyAlways
|
||||
if isInit && !isSidecar {
|
||||
shortLivedInitContainerIndices = append(shortLivedInitContainerIndices, i)
|
||||
continue
|
||||
}
|
||||
longLivedContainerIndices = append(longLivedContainerIndices, i)
|
||||
}
|
||||
return longLivedContainerIndices, shortLivedInitContainerIndices
|
||||
}
|
||||
|
||||
// createResourceRequestAndMappings returns the request and mappings for the given container and resource.
|
||||
// reusableRequests is a list of other DeviceRequests this container can use before requesting its own.
|
||||
// items in reusableRequests may be nil.
|
||||
// The returned request may be nil if no additional request was required.
|
||||
// The returned mappings may be empty if this container does not use this resource.
|
||||
func createResourceRequestAndMappings(containerIndex int, container *v1.Container, rName v1.ResourceName, className string, reusableRequests []*resourceapi.DeviceRequest) (*resourceapi.DeviceRequest, []v1.ContainerExtendedResourceRequest) {
|
||||
var mappings []v1.ContainerExtendedResourceRequest
|
||||
creqs := container.Resources.Requests
|
||||
if creqs == nil {
|
||||
return nil, nil
|
||||
}
|
||||
var rQuant resource.Quantity
|
||||
var ok bool
|
||||
if rQuant, ok = creqs[rName]; !ok {
|
||||
return nil, nil
|
||||
}
|
||||
crq, ok := (&rQuant).AsInt64()
|
||||
if !ok || crq == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
sum := int64(0)
|
||||
for _, r := range reusableRequests {
|
||||
if r != nil {
|
||||
sum += r.Exactly.Count
|
||||
mappings = append(mappings, v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: container.Name,
|
||||
ResourceName: rName.String(),
|
||||
RequestName: r.Name,
|
||||
})
|
||||
if sum >= crq {
|
||||
return nil, mappings
|
||||
}
|
||||
}
|
||||
}
|
||||
keys := make([]string, 0, len(creqs))
|
||||
for k := range creqs {
|
||||
keys = append(keys, k.String())
|
||||
}
|
||||
// resource requests in a container is a map, their names must
|
||||
// be sorted to determine the resource's index order.
|
||||
slice.SortStrings(keys)
|
||||
ridx := 0
|
||||
for j := range keys {
|
||||
if keys[j] == rName.String() {
|
||||
ridx = j
|
||||
break
|
||||
}
|
||||
}
|
||||
// containerIndex is the index of the container in the list of initContainers + containers.
|
||||
// ridx is the index of the extended resource request in the sorted all requests in the container.
|
||||
// crq is the quantity of the extended resource request.
|
||||
reqName := fmt.Sprintf("container-%d-request-%d", containerIndex, ridx)
|
||||
deviceReq := resourceapi.DeviceRequest{
|
||||
Name: reqName, // need to be container name index - extended resource name index
|
||||
Exactly: &resourceapi.ExactDeviceRequest{
|
||||
DeviceClassName: className,
|
||||
AllocationMode: resourceapi.DeviceAllocationModeExactCount,
|
||||
Count: crq - sum, // the extra devices to request
|
||||
},
|
||||
}
|
||||
mappings = append(mappings, v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: container.Name,
|
||||
ResourceName: rName.String(),
|
||||
RequestName: reqName,
|
||||
})
|
||||
|
||||
return &deviceReq, mappings
|
||||
}
|
||||
|
||||
func createRequestsAndMappings(pod *v1.Pod, extendedResources map[v1.ResourceName]int64, logger klog.Logger, deviceClassMapping fwk.DeviceClassResolver) ([]resourceapi.DeviceRequest, []v1.ContainerExtendedResourceRequest) {
|
||||
containers := slices.Clone(pod.Spec.InitContainers)
|
||||
containers = append(containers, pod.Spec.Containers...)
|
||||
longLivedContainerIndices, shortLivedInitContainerIndices := partitionContainerIndices(containers, len(pod.Spec.InitContainers))
|
||||
|
||||
// all requests across all containers and resource types
|
||||
var deviceRequests []resourceapi.DeviceRequest
|
||||
// all mappings across all containers and resource types
|
||||
var mappings []v1.ContainerExtendedResourceRequest
|
||||
|
||||
for resource := range extendedResources {
|
||||
class := deviceClassMapping.GetDeviceClass(resource)
|
||||
// skip if the resource does not map to a device class
|
||||
if class == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// shortLivedResourceMappings is the mapping of container+resource→request for short lived containers (init non-sidecar container)
|
||||
var shortLivedResourceMappings []v1.ContainerExtendedResourceRequest
|
||||
// longLivedResourceMappings is the mapping of container+resource→request for long lived containers (init sidecar or regular container)
|
||||
var longLivedResourceMappings []v1.ContainerExtendedResourceRequest
|
||||
|
||||
// longLivedResourceRequests is the list of requests for a given resource by long-lived containers.
|
||||
// The length of this list is the same as the length of containers.
|
||||
// Entries may be nil if the container at that index did not produce a request for that resource.
|
||||
// Requests at later indices are reusable by non-sidecar initContainers at earlier indices.
|
||||
longLivedResourceRequests := make([]*resourceapi.DeviceRequest, len(containers))
|
||||
for _, i := range longLivedContainerIndices {
|
||||
containerRequest, containerMappings := createResourceRequestAndMappings(i, &containers[i], resource, class.Name, nil)
|
||||
longLivedResourceRequests[i] = containerRequest // might be nil
|
||||
longLivedResourceMappings = append(longLivedResourceMappings, containerMappings...) // might be zero-length
|
||||
}
|
||||
|
||||
// maxShortLivedResourceRequest is the maximum request for a given resource by short-lived containers
|
||||
var maxShortLivedResourceRequest *resourceapi.DeviceRequest
|
||||
// shortLivedRequestNames is all request names for a given resource by short-lived containers. All mappings to any name in
|
||||
// this set will be replaced by maxShortLivedResourceRequest.Name.
|
||||
shortLivedRequestNames := sets.New[string]()
|
||||
for _, i := range shortLivedInitContainerIndices {
|
||||
containerRequest, containerMappings := createResourceRequestAndMappings(i, &containers[i], resource, class.Name, longLivedResourceRequests[i:])
|
||||
if containerRequest != nil {
|
||||
shortLivedRequestNames.Insert(containerRequest.Name)
|
||||
if maxShortLivedResourceRequest == nil || maxShortLivedResourceRequest.Exactly.Count < containerRequest.Exactly.Count {
|
||||
maxShortLivedResourceRequest = containerRequest
|
||||
}
|
||||
}
|
||||
shortLivedResourceMappings = append(shortLivedResourceMappings, containerMappings...) // might be zero-length
|
||||
}
|
||||
|
||||
// rewrite mappings to short-lived requests to use the maximum short-lived request name
|
||||
if maxShortLivedResourceRequest != nil && len(shortLivedRequestNames) > 1 {
|
||||
shortLivedRequestNames.Delete(maxShortLivedResourceRequest.Name)
|
||||
for i := range shortLivedResourceMappings {
|
||||
if shortLivedRequestNames.Has(shortLivedResourceMappings[i].RequestName) {
|
||||
shortLivedResourceMappings[i].RequestName = maxShortLivedResourceRequest.Name
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// append non-nil requests
|
||||
if maxShortLivedResourceRequest != nil {
|
||||
deviceRequests = append(deviceRequests, *maxShortLivedResourceRequest)
|
||||
}
|
||||
for _, request := range longLivedResourceRequests {
|
||||
if request != nil {
|
||||
deviceRequests = append(deviceRequests, *request)
|
||||
}
|
||||
}
|
||||
// append mappings
|
||||
mappings = append(mappings, longLivedResourceMappings...)
|
||||
mappings = append(mappings, shortLivedResourceMappings...)
|
||||
}
|
||||
|
||||
sort.Slice(deviceRequests, func(i, j int) bool {
|
||||
return deviceRequests[i].Name < deviceRequests[j].Name
|
||||
})
|
||||
return deviceRequests, mappings
|
||||
}
|
||||
|
||||
// bindClaim gets called by PreBind for claim which is not reserved for the pod yet.
|
||||
// It might not even be allocated. bindClaim then ensures that the allocation
|
||||
// and reservation are recorded. This finishes the work started in Reserve.
|
||||
|
|
@ -1542,23 +1112,7 @@ func (pl *DynamicResources) bindClaim(ctx context.Context, state *stateData, ind
|
|||
// completed, either successfully or with a failure.
|
||||
if resourceClaimModified {
|
||||
if isExtendedResourceClaim {
|
||||
// Unlike other claims, extended resource claim is created in API server below.
|
||||
// AssumeClaimAfterAPICall returns ErrNotFound when the informer update has not reached assumed cache yet.
|
||||
// Hence we must poll and wait for it.
|
||||
pollErr := wait.PollUntilContextTimeout(ctx, 1*time.Second, time.Duration(AssumeExtendedResourceTimeoutDefaultSeconds)*time.Second, true,
|
||||
func(ctx context.Context) (bool, error) {
|
||||
if err := pl.draManager.ResourceClaims().AssumeClaimAfterAPICall(claim); err != nil {
|
||||
if errors.Is(err, assumecache.ErrNotFound) {
|
||||
return false, nil
|
||||
}
|
||||
logger.V(5).Info("Claim not stored in assume cache", "claim", klog.KObj(claim), "err", err)
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if pollErr != nil {
|
||||
logger.V(5).Info("Claim not stored in assume cache after retries", "claim", klog.KObj(claim), "err", pollErr)
|
||||
}
|
||||
pl.waitForExtendedClaimInAssumeCache(ctx, logger, claim)
|
||||
} else {
|
||||
// This can fail, but only for reasons that are okay (concurrent delete or update).
|
||||
// Shouldn't happen in this case.
|
||||
|
|
@ -1576,28 +1130,13 @@ func (pl *DynamicResources) bindClaim(ctx context.Context, state *stateData, ind
|
|||
|
||||
// Create the special claim for extended resource backed by DRA
|
||||
if isExtendedResourceClaim && isSpecialClaimName(claim.Name) {
|
||||
logger.V(5).Info("preparing to create claim for extended resources", "pod", klog.KObj(pod), "node", nodeName, "resourceclaim", klog.Format(claim))
|
||||
// Replace claim template with instantiated claim for the node.
|
||||
if nodeAllocation, ok := state.nodeAllocations[nodeName]; ok && nodeAllocation.extendedResourceClaim != nil {
|
||||
claim = nodeAllocation.extendedResourceClaim.DeepCopy()
|
||||
} else {
|
||||
return nil, fmt.Errorf("extended resource claim not found for node %s", nodeName)
|
||||
}
|
||||
logger.V(5).Info("create claim for extended resources", "pod", klog.KObj(pod), "node", nodeName, "resourceclaim", klog.Format(claim))
|
||||
// Clear fields which must or can not be set during creation.
|
||||
claim.Status.Allocation = nil
|
||||
claim.Name = ""
|
||||
claim.UID = ""
|
||||
var err error
|
||||
claim, err = pl.clientset.ResourceV1().ResourceClaims(claim.Namespace).Create(ctx, claim, metav1.CreateOptions{})
|
||||
claim, err = pl.createExtendedResourceClaimInAPI(ctx, logger, pod, nodeName, state)
|
||||
if err != nil {
|
||||
metrics.ResourceClaimCreatesTotal.WithLabelValues("failure").Inc()
|
||||
return nil, fmt.Errorf("create claim for extended resources %v: %w", klog.KObj(claim), err)
|
||||
return nil, err
|
||||
}
|
||||
metrics.ResourceClaimCreatesTotal.WithLabelValues("success").Inc()
|
||||
resourceClaimModified = true
|
||||
logger.V(5).Info("created claim for extended resources", "pod", klog.KObj(pod), "node", nodeName, "resourceclaim", klog.Format(claim))
|
||||
|
||||
resourceClaimModified = true
|
||||
// Track the actual extended ResourceClaim from now.
|
||||
// Relevant if we need to delete again in Unreserve.
|
||||
if err := state.claims.updateExtendedResourceClaim(claim); err != nil {
|
||||
|
|
@ -1676,22 +1215,9 @@ func (pl *DynamicResources) bindClaim(ctx context.Context, state *stateData, ind
|
|||
// Patch the pod status with the new information about the generated
|
||||
// special resource claim.
|
||||
if isExtendedResourceClaim {
|
||||
var cer []v1.ContainerExtendedResourceRequest
|
||||
if nodeAllocation, ok := state.nodeAllocations[nodeName]; ok {
|
||||
cer = nodeAllocation.containerResourceRequestMappings
|
||||
}
|
||||
if len(cer) == 0 {
|
||||
return nil, fmt.Errorf("nil or empty request mappings, no update of pod %s/%s ExtendedResourceClaimStatus", pod.Namespace, pod.Name)
|
||||
}
|
||||
|
||||
podStatusCopy := pod.Status.DeepCopy()
|
||||
podStatusCopy.ExtendedResourceClaimStatus = &v1.PodExtendedResourceClaimStatus{
|
||||
RequestMappings: cer,
|
||||
ResourceClaimName: claim.Name,
|
||||
}
|
||||
err := schedutil.PatchPodStatus(ctx, pl.clientset, pod.Name, pod.Namespace, &pod.Status, podStatusCopy)
|
||||
err := pl.patchPodExtendedResourceClaimStatus(ctx, pod, claim, nodeName, state)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("update pod %s/%s ExtendedResourceClaimStatus: %w", pod.Namespace, pod.Name, err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -3239,542 +3239,6 @@ func (m *mockDeviceClassResolver) GetDeviceClass(resourceName v1.ResourceName) *
|
|||
return m.mapping[resourceName]
|
||||
}
|
||||
|
||||
func Test_createRequestsAndMappings_requests(t *testing.T) {
|
||||
pod1 := st.MakePod().Name(podName).Namespace(namespace).
|
||||
UID(podUID).
|
||||
Res(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName): "1",
|
||||
v1.ResourceName(extendedResourceName + "1"): "2",
|
||||
}).
|
||||
Obj()
|
||||
pod2 := st.MakePod().Name(podName).Namespace(namespace).
|
||||
UID(podUID).
|
||||
Res(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName): "1",
|
||||
}).
|
||||
Res(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "1"): "2",
|
||||
}).
|
||||
Obj()
|
||||
|
||||
podInit := st.MakePod().Name(podName).Namespace(namespace).
|
||||
UID(podUID).
|
||||
Res(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName): "1",
|
||||
}).
|
||||
InitReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "2",
|
||||
}).
|
||||
Obj()
|
||||
podInit2 := st.MakePod().Name(podName).Namespace(namespace).
|
||||
UID(podUID).
|
||||
Res(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName): "1",
|
||||
}).
|
||||
InitReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "1",
|
||||
}).
|
||||
InitReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "2",
|
||||
}).
|
||||
Obj()
|
||||
podInit3 := st.MakePod().Name(podName).Namespace(namespace).
|
||||
UID(podUID).
|
||||
Res(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName): "1",
|
||||
}).
|
||||
InitReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "1",
|
||||
}).
|
||||
SidecarReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "1",
|
||||
}).
|
||||
InitReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "2",
|
||||
}).
|
||||
Obj()
|
||||
|
||||
res := map[v1.ResourceName]int64{
|
||||
v1.ResourceName(extendedResourceName): 1,
|
||||
}
|
||||
res2 := map[v1.ResourceName]int64{
|
||||
v1.ResourceName(extendedResourceName): 1,
|
||||
v1.ResourceName(extendedResourceName + "1"): 2,
|
||||
}
|
||||
resInit := map[v1.ResourceName]int64{
|
||||
v1.ResourceName(extendedResourceName): 1,
|
||||
v1.ResourceName(extendedResourceName + "init"): 2,
|
||||
}
|
||||
devMap := map[v1.ResourceName]*resourceapi.DeviceClass{
|
||||
v1.ResourceName(extendedResourceName): {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "class",
|
||||
},
|
||||
},
|
||||
}
|
||||
devMap2 := map[v1.ResourceName]*resourceapi.DeviceClass{
|
||||
v1.ResourceName(extendedResourceName): {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "class",
|
||||
},
|
||||
},
|
||||
v1.ResourceName(extendedResourceName + "1"): {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "class1",
|
||||
},
|
||||
},
|
||||
}
|
||||
devMapInit := map[v1.ResourceName]*resourceapi.DeviceClass{
|
||||
v1.ResourceName(extendedResourceName): {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "class",
|
||||
},
|
||||
},
|
||||
v1.ResourceName(extendedResourceName + "init"): {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "classInit",
|
||||
},
|
||||
},
|
||||
}
|
||||
devReq := resourceapi.DeviceRequest{
|
||||
Name: "container-0-request-0",
|
||||
Exactly: &resourceapi.ExactDeviceRequest{
|
||||
DeviceClassName: "class",
|
||||
AllocationMode: resourceapi.DeviceAllocationModeExactCount,
|
||||
Count: 1,
|
||||
},
|
||||
}
|
||||
devReq2 := resourceapi.DeviceRequest{
|
||||
Name: "container-0-request-1",
|
||||
Exactly: &resourceapi.ExactDeviceRequest{
|
||||
DeviceClassName: "class1",
|
||||
AllocationMode: resourceapi.DeviceAllocationModeExactCount,
|
||||
Count: 2,
|
||||
},
|
||||
}
|
||||
devReq3 := resourceapi.DeviceRequest{
|
||||
Name: "container-1-request-0",
|
||||
Exactly: &resourceapi.ExactDeviceRequest{
|
||||
DeviceClassName: "class1",
|
||||
AllocationMode: resourceapi.DeviceAllocationModeExactCount,
|
||||
Count: 2,
|
||||
},
|
||||
}
|
||||
devReqInit := resourceapi.DeviceRequest{
|
||||
Name: "container-1-request-0",
|
||||
Exactly: &resourceapi.ExactDeviceRequest{
|
||||
DeviceClassName: "class",
|
||||
AllocationMode: resourceapi.DeviceAllocationModeExactCount,
|
||||
Count: 1,
|
||||
},
|
||||
}
|
||||
devReqSidecar := resourceapi.DeviceRequest{
|
||||
Name: "container-1-request-0",
|
||||
Exactly: &resourceapi.ExactDeviceRequest{
|
||||
DeviceClassName: "classInit",
|
||||
AllocationMode: resourceapi.DeviceAllocationModeExactCount,
|
||||
Count: 1,
|
||||
},
|
||||
}
|
||||
devReq2Init := resourceapi.DeviceRequest{
|
||||
Name: "container-1-request-0",
|
||||
Exactly: &resourceapi.ExactDeviceRequest{
|
||||
DeviceClassName: "classInit",
|
||||
AllocationMode: resourceapi.DeviceAllocationModeExactCount,
|
||||
Count: 2,
|
||||
},
|
||||
}
|
||||
devReq6Init := resourceapi.DeviceRequest{
|
||||
Name: "container-0-request-0",
|
||||
Exactly: &resourceapi.ExactDeviceRequest{
|
||||
DeviceClassName: "classInit",
|
||||
AllocationMode: resourceapi.DeviceAllocationModeExactCount,
|
||||
Count: 2,
|
||||
},
|
||||
}
|
||||
devReq3Init := resourceapi.DeviceRequest{
|
||||
Name: "container-2-request-0",
|
||||
Exactly: &resourceapi.ExactDeviceRequest{
|
||||
DeviceClassName: "class",
|
||||
AllocationMode: resourceapi.DeviceAllocationModeExactCount,
|
||||
Count: 1,
|
||||
},
|
||||
}
|
||||
devReq4Init := resourceapi.DeviceRequest{
|
||||
Name: "container-3-request-0",
|
||||
Exactly: &resourceapi.ExactDeviceRequest{
|
||||
DeviceClassName: "class",
|
||||
AllocationMode: resourceapi.DeviceAllocationModeExactCount,
|
||||
Count: 1,
|
||||
},
|
||||
}
|
||||
devReq5Init := resourceapi.DeviceRequest{
|
||||
Name: "container-2-request-0",
|
||||
Exactly: &resourceapi.ExactDeviceRequest{
|
||||
DeviceClassName: "classInit",
|
||||
AllocationMode: resourceapi.DeviceAllocationModeExactCount,
|
||||
Count: 2,
|
||||
},
|
||||
}
|
||||
|
||||
testcases := map[string]struct {
|
||||
pod *v1.Pod
|
||||
extendedResources map[v1.ResourceName]int64
|
||||
cache fwk.DeviceClassResolver
|
||||
wantDeviceRequests []resourceapi.DeviceRequest
|
||||
}{
|
||||
"nil": {
|
||||
pod: pod1,
|
||||
wantDeviceRequests: nil,
|
||||
},
|
||||
"one resource match": {
|
||||
pod: pod1,
|
||||
extendedResources: res,
|
||||
cache: &mockDeviceClassResolver{mapping: devMap},
|
||||
wantDeviceRequests: []resourceapi.DeviceRequest{devReq},
|
||||
},
|
||||
"one resource match, one resource not match": {
|
||||
pod: pod1,
|
||||
extendedResources: res2,
|
||||
cache: &mockDeviceClassResolver{mapping: devMap},
|
||||
wantDeviceRequests: []resourceapi.DeviceRequest{devReq},
|
||||
},
|
||||
"two resources match": {
|
||||
pod: pod1,
|
||||
extendedResources: res2,
|
||||
cache: &mockDeviceClassResolver{mapping: devMap2},
|
||||
wantDeviceRequests: []resourceapi.DeviceRequest{devReq, devReq2},
|
||||
},
|
||||
"two containers match": {
|
||||
pod: pod2,
|
||||
extendedResources: res2,
|
||||
cache: &mockDeviceClassResolver{mapping: devMap2},
|
||||
wantDeviceRequests: []resourceapi.DeviceRequest{devReq, devReq3},
|
||||
},
|
||||
"one init container, one regular container": {
|
||||
pod: podInit,
|
||||
extendedResources: resInit,
|
||||
cache: &mockDeviceClassResolver{mapping: devMapInit},
|
||||
wantDeviceRequests: []resourceapi.DeviceRequest{devReq6Init, devReqInit},
|
||||
},
|
||||
"two init containers, one regular container": {
|
||||
pod: podInit2,
|
||||
extendedResources: resInit,
|
||||
cache: &mockDeviceClassResolver{mapping: devMapInit},
|
||||
wantDeviceRequests: []resourceapi.DeviceRequest{devReq2Init, devReq3Init},
|
||||
},
|
||||
"three init containers, one sidecar, one regular container": {
|
||||
pod: podInit3,
|
||||
extendedResources: resInit,
|
||||
cache: &mockDeviceClassResolver{mapping: devMapInit},
|
||||
wantDeviceRequests: []resourceapi.DeviceRequest{devReqSidecar, devReq5Init, devReq4Init},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testcases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
gotDeviceRequests, _ := createRequestsAndMappings(tc.pod, tc.extendedResources, logger, tc.cache)
|
||||
if len(tc.wantDeviceRequests) != len(gotDeviceRequests) {
|
||||
t.Fatalf("different length, want %#v, len=%v, got %#v, len=%v", tc.wantDeviceRequests, len(tc.wantDeviceRequests), gotDeviceRequests, len(gotDeviceRequests))
|
||||
}
|
||||
sort.Slice(gotDeviceRequests, func(i, j int) bool { return gotDeviceRequests[i].Name < gotDeviceRequests[j].Name })
|
||||
for i, r := range tc.wantDeviceRequests {
|
||||
if r.Name != gotDeviceRequests[i].Name {
|
||||
t.Errorf("different name, want %#v, got %#v", r, gotDeviceRequests[i])
|
||||
}
|
||||
if r.Exactly.DeviceClassName != gotDeviceRequests[i].Exactly.DeviceClassName {
|
||||
t.Errorf("different deviceClassName, want %#v, got %#v", r, gotDeviceRequests[i])
|
||||
}
|
||||
if r.Exactly.AllocationMode != gotDeviceRequests[i].Exactly.AllocationMode {
|
||||
t.Errorf("different allocationMode, want %#v, got %#v", r, gotDeviceRequests[i])
|
||||
}
|
||||
if r.Exactly.Count != gotDeviceRequests[i].Exactly.Count {
|
||||
t.Errorf("different count, want %#v, got %#v", r.Exactly.Count, gotDeviceRequests[i].Exactly.Count)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_createRequestsAndMappings_mappings(t *testing.T) {
|
||||
pod1 := st.MakePod().Name(podName).Namespace(namespace).
|
||||
UID(podUID).
|
||||
Res(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName): "1",
|
||||
v1.ResourceName(extendedResourceName + "1"): "2",
|
||||
}).
|
||||
Obj()
|
||||
pod1InitImplicit := st.MakePod().Name(podName).Namespace(namespace).
|
||||
UID(podUID).
|
||||
InitReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "1",
|
||||
v1.ResourceName(resourceapi.ResourceDeviceClassPrefix + "classInit"): "2",
|
||||
}).
|
||||
Obj()
|
||||
pod2 := st.MakePod().Name(podName).Namespace(namespace).
|
||||
UID(podUID).
|
||||
Res(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName): "1",
|
||||
}).
|
||||
Res(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "1"): "2",
|
||||
}).
|
||||
Obj()
|
||||
|
||||
podInit := st.MakePod().Name(podName).Namespace(namespace).
|
||||
UID(podUID).
|
||||
Res(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName): "1",
|
||||
}).
|
||||
InitReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "2",
|
||||
}).
|
||||
Obj()
|
||||
podInit2 := st.MakePod().Name(podName).Namespace(namespace).
|
||||
UID(podUID).
|
||||
Res(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName): "1",
|
||||
}).
|
||||
InitReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "1",
|
||||
}).
|
||||
InitReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "2",
|
||||
}).
|
||||
Obj()
|
||||
podInitImplicit := st.MakePod().Name(podName).Namespace(namespace).
|
||||
UID(podUID).
|
||||
Res(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName): "1",
|
||||
}).
|
||||
InitReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "1",
|
||||
}).
|
||||
InitReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(resourceapi.ResourceDeviceClassPrefix + "classInit"): "2",
|
||||
}).
|
||||
Obj()
|
||||
podInit3 := st.MakePod().Name(podName).Namespace(namespace).
|
||||
UID(podUID).
|
||||
Res(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName): "1",
|
||||
}).
|
||||
InitReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "1",
|
||||
}).
|
||||
SidecarReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "1",
|
||||
}).
|
||||
InitReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "2",
|
||||
}).
|
||||
Obj()
|
||||
res := map[v1.ResourceName]int64{
|
||||
v1.ResourceName(extendedResourceName): 1,
|
||||
v1.ResourceName(extendedResourceName + "1"): 2,
|
||||
}
|
||||
resInit := map[v1.ResourceName]int64{
|
||||
v1.ResourceName(extendedResourceName): 1,
|
||||
v1.ResourceName(extendedResourceName + "init"): 2,
|
||||
}
|
||||
resInitImplicit := map[v1.ResourceName]int64{
|
||||
v1.ResourceName(extendedResourceName): 1,
|
||||
v1.ResourceName(extendedResourceName + "init"): 2,
|
||||
v1.ResourceName(resourceapi.ResourceDeviceClassPrefix + "classInit"): 2,
|
||||
}
|
||||
devMap := map[v1.ResourceName]*resourceapi.DeviceClass{
|
||||
v1.ResourceName(extendedResourceName): {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "class",
|
||||
},
|
||||
},
|
||||
v1.ResourceName(extendedResourceName + "1"): {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "class1",
|
||||
},
|
||||
},
|
||||
}
|
||||
devMapInit := map[v1.ResourceName]*resourceapi.DeviceClass{
|
||||
v1.ResourceName(extendedResourceName): {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "class",
|
||||
},
|
||||
},
|
||||
v1.ResourceName(extendedResourceName + "init"): {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "classInit",
|
||||
},
|
||||
},
|
||||
v1.ResourceName(resourceapi.ResourceDeviceClassPrefix + "classInit"): {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "classInit",
|
||||
},
|
||||
},
|
||||
}
|
||||
cer := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "con0",
|
||||
ResourceName: extendedResourceName,
|
||||
RequestName: "container-0-request-0",
|
||||
}
|
||||
cer1 := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "con0",
|
||||
ResourceName: extendedResourceName + "1",
|
||||
RequestName: "container-0-request-1",
|
||||
}
|
||||
cer2 := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "con1",
|
||||
ResourceName: extendedResourceName + "1",
|
||||
RequestName: "container-1-request-0",
|
||||
}
|
||||
cer3 := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "con0",
|
||||
ResourceName: extendedResourceName,
|
||||
RequestName: "container-1-request-0",
|
||||
}
|
||||
cer4 := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "con0",
|
||||
ResourceName: extendedResourceName,
|
||||
RequestName: "container-2-request-0",
|
||||
}
|
||||
cer5 := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "con0",
|
||||
ResourceName: extendedResourceName,
|
||||
RequestName: "container-3-request-0",
|
||||
}
|
||||
cerInit := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "init-con0",
|
||||
ResourceName: extendedResourceName + "init",
|
||||
RequestName: "container-1-request-0",
|
||||
}
|
||||
cerInit0 := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "init-con0",
|
||||
ResourceName: extendedResourceName + "init",
|
||||
RequestName: "container-0-request-0",
|
||||
}
|
||||
cerInit1 := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "init-con0",
|
||||
ResourceName: extendedResourceName + "init",
|
||||
RequestName: "container-1-request-0",
|
||||
}
|
||||
cerInit2 := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "init-con1",
|
||||
ResourceName: extendedResourceName + "init",
|
||||
RequestName: "container-1-request-0",
|
||||
}
|
||||
cerInit3 := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "init-con2",
|
||||
ResourceName: extendedResourceName + "init",
|
||||
RequestName: "container-2-request-0",
|
||||
}
|
||||
cerSidecar := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "sidecar-con1",
|
||||
ResourceName: extendedResourceName + "init",
|
||||
RequestName: "container-1-request-0",
|
||||
}
|
||||
|
||||
cerInitImplicit := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "init-con0",
|
||||
ResourceName: extendedResourceName + "init",
|
||||
RequestName: "container-0-request-0",
|
||||
}
|
||||
cerInit4Implicit := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "init-con0",
|
||||
ResourceName: extendedResourceName + "init",
|
||||
RequestName: "container-0-request-1",
|
||||
}
|
||||
cerInit2Implicit := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "init-con1",
|
||||
ResourceName: resourceapi.ResourceDeviceClassPrefix + "classInit",
|
||||
RequestName: "container-1-request-0",
|
||||
}
|
||||
cerInit3Implicit := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "init-con0",
|
||||
ResourceName: resourceapi.ResourceDeviceClassPrefix + "classInit",
|
||||
RequestName: "container-0-request-0",
|
||||
}
|
||||
|
||||
testcases := map[string]struct {
|
||||
pod *v1.Pod
|
||||
extnededResources map[v1.ResourceName]int64
|
||||
deviceClassMapping fwk.DeviceClassResolver
|
||||
wantReqMappings []v1.ContainerExtendedResourceRequest
|
||||
}{
|
||||
"one container, two requests": {
|
||||
pod: pod1,
|
||||
extnededResources: res,
|
||||
deviceClassMapping: &mockDeviceClassResolver{devMap},
|
||||
wantReqMappings: []v1.ContainerExtendedResourceRequest{cer, cer1},
|
||||
},
|
||||
"one container, one explicit and one implicit request": {
|
||||
pod: pod1InitImplicit,
|
||||
extnededResources: resInitImplicit,
|
||||
deviceClassMapping: &mockDeviceClassResolver{devMapInit},
|
||||
wantReqMappings: []v1.ContainerExtendedResourceRequest{cerInit3Implicit, cerInit4Implicit},
|
||||
},
|
||||
"two containers, two requests": {
|
||||
pod: pod2,
|
||||
extnededResources: res,
|
||||
deviceClassMapping: &mockDeviceClassResolver{devMap},
|
||||
wantReqMappings: []v1.ContainerExtendedResourceRequest{cer, cer2},
|
||||
},
|
||||
"one init container, one regular container, one request": {
|
||||
pod: podInit,
|
||||
extnededResources: resInit,
|
||||
deviceClassMapping: &mockDeviceClassResolver{devMapInit},
|
||||
wantReqMappings: []v1.ContainerExtendedResourceRequest{cerInit0, cer3},
|
||||
},
|
||||
"three containers (two are init container), two requests": {
|
||||
pod: podInit2,
|
||||
extnededResources: resInit,
|
||||
deviceClassMapping: &mockDeviceClassResolver{devMapInit},
|
||||
wantReqMappings: []v1.ContainerExtendedResourceRequest{cerInit, cerInit2, cer4},
|
||||
},
|
||||
"three containers (two are init container), both explicit and implicit resources": {
|
||||
pod: podInitImplicit,
|
||||
extnededResources: resInitImplicit,
|
||||
deviceClassMapping: &mockDeviceClassResolver{devMapInit},
|
||||
wantReqMappings: []v1.ContainerExtendedResourceRequest{cerInitImplicit, cerInit2Implicit, cer4},
|
||||
},
|
||||
"four containers (two are init container, one sidecar), three requests": {
|
||||
pod: podInit3,
|
||||
extnededResources: resInit,
|
||||
deviceClassMapping: &mockDeviceClassResolver{devMapInit},
|
||||
wantReqMappings: []v1.ContainerExtendedResourceRequest{cerInit1, cerSidecar, cerInit3, cer5},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testcases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
_, gotReqMappings := createRequestsAndMappings(tc.pod, tc.extnededResources, logger, tc.deviceClassMapping)
|
||||
if len(tc.wantReqMappings) != len(gotReqMappings) {
|
||||
t.Fatalf("different length, want %#v, got %#v", tc.wantReqMappings, gotReqMappings)
|
||||
}
|
||||
sort.Slice(gotReqMappings, func(i, j int) bool {
|
||||
if gotReqMappings[i].RequestName < gotReqMappings[j].RequestName {
|
||||
return true
|
||||
}
|
||||
if gotReqMappings[i].RequestName > gotReqMappings[j].RequestName {
|
||||
return false
|
||||
}
|
||||
return gotReqMappings[i].ContainerName < gotReqMappings[j].ContainerName
|
||||
})
|
||||
for i, r := range tc.wantReqMappings {
|
||||
if r.RequestName != gotReqMappings[i].RequestName {
|
||||
t.Errorf("different request name, want %#v, got %#v", r, gotReqMappings[i])
|
||||
}
|
||||
if r.ContainerName != gotReqMappings[i].ContainerName {
|
||||
t.Errorf("different container name, want %#v, got %#v", r, gotReqMappings[i])
|
||||
}
|
||||
if r.ResourceName != gotReqMappings[i].ResourceName {
|
||||
t.Errorf("different resource name, want %#v, got %#v", r, gotReqMappings[i])
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestAllocatorSelection covers the selection of a structured allocation implementation
|
||||
// based on actual Kubernetes feature gates. This test lives here instead of
|
||||
// k8s.io/dynamic-resource-allocation/structured because that code has no access
|
||||
|
|
|
|||
|
|
@ -0,0 +1,616 @@
|
|||
/*
|
||||
Copyright 2025 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package dynamicresources
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"slices"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
resourceapi "k8s.io/api/resource/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/client-go/util/retry"
|
||||
resourcehelper "k8s.io/component-helpers/resource"
|
||||
"k8s.io/klog/v2"
|
||||
fwk "k8s.io/kube-scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
|
||||
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
|
||||
"k8s.io/kubernetes/pkg/util/slice"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
// Extended Resources Backed by DRA - Scheduler Plugin Workflow by each extension points
|
||||
//
|
||||
// PreFilter - preFilterExtendedResources()
|
||||
// - for pods using extended resources, find existing claim or create in-memory claim with temporary name "<extended-resources>"
|
||||
// - the in-memory claim is used to track and allocate resources, claim object is created in PreBind extension point.
|
||||
// - store the claim in stateData for Filter extension point
|
||||
//
|
||||
// Filter - filterExtendedResources()
|
||||
// - if stale claim with Spec is identified, return Unschedulable for PostFilter extension point to cleanup
|
||||
// - check which resources satisfied by device plugin vs need DRA
|
||||
// - if extended resources need to be allocated through DRA, create node-specific claim
|
||||
//
|
||||
// PostFilter
|
||||
// - if extended resource claim has real name (not "<extended-resources>"):
|
||||
// - it's stale from prior cycle -> delete it -> trigger retry
|
||||
//
|
||||
// Reserve
|
||||
// - Store allocation results from Filter in stateData
|
||||
// - Mark the claim as "allocation in-flight" via SignalClaimPendingAllocation()
|
||||
//
|
||||
// Unreserve
|
||||
// - Remove claim from in-flight allocations and restore assume cache
|
||||
// - Delete claim from API server if it has real name
|
||||
//
|
||||
// PreBind - bindClaim()
|
||||
// - For "<extended-resources>" claims: create in API server and update stateData
|
||||
// - Update claim status: add finalizer, allocation, and pod reservation
|
||||
// - Store in assume cache (poll for extended resource claims)
|
||||
// - Update pod.Status.ExtendedResourceClaimStatus with request mappings
|
||||
|
||||
const (
|
||||
// specialClaimInMemName is the name of the special resource claim that
|
||||
// exists only in memory. The claim will get a generated name when it is
|
||||
// written to API server.
|
||||
//
|
||||
// It's intentionally not a valid ResourceClaim name to avoid conflicts with
|
||||
// some actual ResourceClaim in the apiserver.
|
||||
specialClaimInMemName = "<extended-resources>"
|
||||
|
||||
// AssumeExtendedResourceTimeoutDefaultSeconds is the default timeout for waiting
|
||||
// for the extended resource claim to be updated in assumed cache.
|
||||
AssumeExtendedResourceTimeoutDefaultSeconds = 120
|
||||
)
|
||||
|
||||
// draExtendedResource stores data for extended resources backed by DRA.
|
||||
// It will remain empty when the DRAExtendedResource feature is disabled.
|
||||
type draExtendedResource struct {
|
||||
// May have extended resource backed by DRA.
|
||||
podScalarResources map[v1.ResourceName]int64
|
||||
}
|
||||
|
||||
// hasDeviceClassMappedExtendedResource returns true when the given resource list has an extended resource, that has
|
||||
// a mapping to a device class.
|
||||
func hasDeviceClassMappedExtendedResource(reqs v1.ResourceList, cache fwk.DeviceClassResolver) bool {
|
||||
for rName, rValue := range reqs {
|
||||
if rValue.IsZero() {
|
||||
// We only care about the resources requested by the pod we are trying to schedule.
|
||||
continue
|
||||
}
|
||||
if schedutil.IsDRAExtendedResourceName(rName) {
|
||||
if cache.GetDeviceClass(rName) != nil {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// findExtendedResourceClaim looks for the extended resource claim, i.e., the claim with special annotation
|
||||
// set to "true", and with the pod as owner. It must be called with all ResourceClaims in the cluster.
|
||||
// The returned ResourceClaim is read-only.
|
||||
func findExtendedResourceClaim(pod *v1.Pod, resourceClaims []*resourceapi.ResourceClaim) *resourceapi.ResourceClaim {
|
||||
for _, c := range resourceClaims {
|
||||
if c.Annotations[resourceapi.ExtendedResourceClaimAnnotation] == "true" {
|
||||
for _, or := range c.OwnerReferences {
|
||||
if or.Name == pod.Name && *or.Controller && or.UID == pod.UID {
|
||||
return c
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// preFilterExtendedResources checks if there is any extended resource in the
|
||||
// pod requests that has a device class mapping, i.e., there is a device class
|
||||
// that has spec.ExtendedResourceName or its implicit extended resource name
|
||||
// matching the given extended resource in that pod requests.
|
||||
//
|
||||
// It looks for the special resource claim for the pod created from prior scheduling
|
||||
// cycle. If not found, it creates the special claim with no Requests in the Spec,
|
||||
// with a temporary UID, and the specialClaimInMemName name.
|
||||
// Either way, the special claim is stored in state.claims.
|
||||
//
|
||||
// In addition, draExtendedResource is also stored in the cycle state.
|
||||
//
|
||||
// It returns the special ResourceClaim and an error status. It returns nil for both
|
||||
// if the feature is disabled or not required for the Pod.
|
||||
func (pl *DynamicResources) preFilterExtendedResources(pod *v1.Pod, logger klog.Logger, s *stateData) (*resourceapi.ResourceClaim, *fwk.Status) {
|
||||
if !pl.fts.EnableDRAExtendedResource {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Try to build device class mapping from cache
|
||||
cache := pl.draManager.DeviceClassResolver()
|
||||
reqs := resourcehelper.PodRequests(pod, resourcehelper.PodResourcesOptions{})
|
||||
|
||||
hasExtendedResource := hasDeviceClassMappedExtendedResource(reqs, cache)
|
||||
if !hasExtendedResource {
|
||||
return nil, nil
|
||||
}
|
||||
r := framework.NewResource(reqs)
|
||||
s.draExtendedResource.podScalarResources = r.ScalarResources
|
||||
|
||||
resourceClaims, err := pl.draManager.ResourceClaims().List()
|
||||
if err != nil {
|
||||
return nil, statusError(logger, err, "listing ResourceClaims")
|
||||
}
|
||||
|
||||
// Check if the special resource claim has been created from prior scheduling cycle.
|
||||
//
|
||||
// If it was already allocated earlier, that allocation might not be valid anymore.
|
||||
// We could try to check that, but it depends on various factors that are difficult to
|
||||
// cover (basically needs to replicate allocator logic) and if it turns out that the
|
||||
// allocation is stale, we would have to schedule with those allocated devices not
|
||||
// available for a new allocation. This situation should be rare (= binding failure),
|
||||
// so we solve it via brute-force
|
||||
// - Kick off deallocation in the background.
|
||||
// - Mark the pod as unschedulable. Successful deallocation will make it schedulable again.
|
||||
extendedResourceClaim := findExtendedResourceClaim(pod, resourceClaims)
|
||||
if extendedResourceClaim != nil {
|
||||
return extendedResourceClaim, nil
|
||||
}
|
||||
// Create one special claim for all extended resources backed by DRA in the Pod.
|
||||
// Create the ResourceClaim with pod as owner, with a generated name that uses
|
||||
// <pod name>-extended-resources- as base. The final name will get truncated if it
|
||||
// would be too long.
|
||||
return &resourceapi.ResourceClaim{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: pod.Namespace,
|
||||
Name: specialClaimInMemName,
|
||||
// fake temporary UID for use in SignalClaimPendingAllocation
|
||||
UID: types.UID(uuid.NewUUID()),
|
||||
GenerateName: pod.Name + "-extended-resources-",
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
{
|
||||
APIVersion: "v1",
|
||||
Kind: "Pod",
|
||||
Name: pod.Name,
|
||||
UID: pod.UID,
|
||||
Controller: ptr.To(true),
|
||||
},
|
||||
},
|
||||
Annotations: map[string]string{
|
||||
resourceapi.ExtendedResourceClaimAnnotation: "true",
|
||||
},
|
||||
},
|
||||
Spec: resourceapi.ResourceClaimSpec{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// filterExtendedResources computes the special claim's Requests based on the
|
||||
// node's Allocatable. It returns:
|
||||
// - nil if nothing needs to be allocated, all the extended resources are satisfied by device plugin, or
|
||||
// - the special claim updated to match what needs to be allocated through DRA for the node
|
||||
//
|
||||
// It returns an error when the pod's extended resource requests cannot be allocated
|
||||
// from node's Allocatable, nor matching any device class's explicit or implicit
|
||||
// ExtendedResourceName.
|
||||
func (pl *DynamicResources) filterExtendedResources(state *stateData, pod *v1.Pod, nodeInfo fwk.NodeInfo, logger klog.Logger) (*resourceapi.ResourceClaim, []v1.ContainerExtendedResourceRequest, *fwk.Status) {
|
||||
extendedResourceClaim := state.claims.extendedResourceClaim()
|
||||
if extendedResourceClaim == nil {
|
||||
// Nothing to do.
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
// The claim is from the prior scheduling cycle, return unschedulable such that it can be
|
||||
// deleted at the PostFilter phase, and retry anew.
|
||||
if extendedResourceClaim.Spec.Devices.Requests != nil {
|
||||
return nil, nil, statusUnschedulable(logger, "cannot schedule extended resource claim", "pod", klog.KObj(pod), "node", klog.KObj(nodeInfo.Node()), "claim", klog.KObj(extendedResourceClaim))
|
||||
}
|
||||
|
||||
extendedResources := make(map[v1.ResourceName]int64)
|
||||
hasExtendedResource := false
|
||||
cache := pl.draManager.DeviceClassResolver()
|
||||
for rName, rQuant := range state.draExtendedResource.podScalarResources {
|
||||
if !schedutil.IsDRAExtendedResourceName(rName) {
|
||||
continue
|
||||
}
|
||||
// Skip in case request quantity is zero
|
||||
if rQuant == 0 {
|
||||
continue
|
||||
}
|
||||
allocatable, okScalar := nodeInfo.GetAllocatable().GetScalarResources()[rName]
|
||||
isBackedByDRA := cache.GetDeviceClass(rName) != nil
|
||||
if isBackedByDRA && allocatable == 0 {
|
||||
// node needs to provide the resource via DRA
|
||||
extendedResources[rName] = rQuant
|
||||
hasExtendedResource = true
|
||||
} else if !okScalar {
|
||||
// has request neither provided by device plugin, nor backed by DRA,
|
||||
// hence the pod does not fit the node.
|
||||
return nil, nil, statusUnschedulable(logger, "cannot fit resource", "pod", klog.KObj(pod), "node", klog.KObj(nodeInfo.Node()), "resource", rName)
|
||||
}
|
||||
}
|
||||
// No extended resources backed by DRA on this node.
|
||||
// The pod may have extended resources, but they are all backed by device
|
||||
// plugin, hence the noderesources plugin should have checked if the node
|
||||
// can fit the pod.
|
||||
// This dynamic resources plugin Filter phase has nothing left to do.
|
||||
if state.claims.noUserClaim() && !hasExtendedResource {
|
||||
// It cannot be allocated when reaching here, as the claim from prior scheduling cycle
|
||||
// would return unschedulable earlier in this function.
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
if extendedResourceClaim.Status.Allocation != nil {
|
||||
// If it is already allocated, then we cannot simply allocate it again.
|
||||
//
|
||||
// It cannot be allocated when reaching here, as the claim found from prior scheduling cycle
|
||||
// would return unschedulable earlier in this function.
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
// Each node needs its own, potentially different variant of the claim.
|
||||
nodeExtendedResourceClaim := extendedResourceClaim.DeepCopy()
|
||||
reqs, mappings := createRequestsAndMappings(pod, extendedResources, logger, cache)
|
||||
nodeExtendedResourceClaim.Spec.Devices.Requests = reqs
|
||||
|
||||
return nodeExtendedResourceClaim, mappings, nil
|
||||
}
|
||||
|
||||
// isSpecialClaimName return true when the name is the specialClaimInMemName.
|
||||
func isSpecialClaimName(name string) bool {
|
||||
return name == specialClaimInMemName
|
||||
}
|
||||
|
||||
// deleteClaim deletes the claim after removing the finalizer from the claim, if there is any.
|
||||
func (pl *DynamicResources) deleteClaim(ctx context.Context, claim *resourceapi.ResourceClaim, logger klog.Logger) error {
|
||||
refreshClaim := false
|
||||
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
if refreshClaim {
|
||||
updatedClaim, err := pl.clientset.ResourceV1().ResourceClaims(claim.Namespace).Get(ctx, claim.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("get resourceclaim %s/%s: %w", claim.Namespace, claim.Name, err)
|
||||
}
|
||||
claim = updatedClaim
|
||||
} else {
|
||||
refreshClaim = true
|
||||
}
|
||||
// Remove the finalizer to unblock removal first.
|
||||
builtinControllerFinalizer := slices.Index(claim.Finalizers, resourceapi.Finalizer)
|
||||
if builtinControllerFinalizer >= 0 {
|
||||
claim.Finalizers = slices.Delete(claim.Finalizers, builtinControllerFinalizer, builtinControllerFinalizer+1)
|
||||
}
|
||||
|
||||
_, err := pl.clientset.ResourceV1().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("update resourceclaim %s/%s: %w", claim.Namespace, claim.Name, err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if retryErr != nil {
|
||||
return retryErr
|
||||
}
|
||||
|
||||
logger.V(5).Info("Delete", "resourceclaim", klog.KObj(claim))
|
||||
err := pl.clientset.ResourceV1().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func partitionContainerIndices(containers []v1.Container, numInitContainers int) ([]int, []int) {
|
||||
longLivedContainerIndices := make([]int, 0, len(containers))
|
||||
shortLivedInitContainerIndices := make([]int, 0, numInitContainers)
|
||||
for i, c := range containers {
|
||||
isInit := i < numInitContainers
|
||||
isSidecar := c.RestartPolicy != nil && *c.RestartPolicy == v1.ContainerRestartPolicyAlways
|
||||
if isInit && !isSidecar {
|
||||
shortLivedInitContainerIndices = append(shortLivedInitContainerIndices, i)
|
||||
continue
|
||||
}
|
||||
longLivedContainerIndices = append(longLivedContainerIndices, i)
|
||||
}
|
||||
return longLivedContainerIndices, shortLivedInitContainerIndices
|
||||
}
|
||||
|
||||
// createResourceRequestAndMappings returns the request and mappings for the given container and resource.
|
||||
// reusableRequests is a list of other DeviceRequests this container can use before requesting its own.
|
||||
// items in reusableRequests may be nil.
|
||||
// The returned request may be nil if no additional request was required.
|
||||
// The returned mappings may be empty if this container does not use this resource.
|
||||
func createResourceRequestAndMappings(containerIndex int, container *v1.Container, rName v1.ResourceName, className string, reusableRequests []*resourceapi.DeviceRequest) (*resourceapi.DeviceRequest, []v1.ContainerExtendedResourceRequest) {
|
||||
var mappings []v1.ContainerExtendedResourceRequest
|
||||
creqs := container.Resources.Requests
|
||||
if creqs == nil {
|
||||
return nil, nil
|
||||
}
|
||||
var rQuant resource.Quantity
|
||||
var ok bool
|
||||
if rQuant, ok = creqs[rName]; !ok {
|
||||
return nil, nil
|
||||
}
|
||||
crq, ok := (&rQuant).AsInt64()
|
||||
if !ok || crq == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
sum := int64(0)
|
||||
for _, r := range reusableRequests {
|
||||
if r != nil {
|
||||
sum += r.Exactly.Count
|
||||
mappings = append(mappings, v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: container.Name,
|
||||
ResourceName: rName.String(),
|
||||
RequestName: r.Name,
|
||||
})
|
||||
if sum >= crq {
|
||||
return nil, mappings
|
||||
}
|
||||
}
|
||||
}
|
||||
keys := make([]string, 0, len(creqs))
|
||||
for k := range creqs {
|
||||
keys = append(keys, k.String())
|
||||
}
|
||||
// resource requests in a container is a map, their names must
|
||||
// be sorted to determine the resource's index order.
|
||||
slice.SortStrings(keys)
|
||||
ridx := 0
|
||||
for j := range keys {
|
||||
if keys[j] == rName.String() {
|
||||
ridx = j
|
||||
break
|
||||
}
|
||||
}
|
||||
// containerIndex is the index of the container in the list of initContainers + containers.
|
||||
// ridx is the index of the extended resource request in the sorted all requests in the container.
|
||||
// crq is the quantity of the extended resource request.
|
||||
reqName := fmt.Sprintf("container-%d-request-%d", containerIndex, ridx)
|
||||
deviceReq := resourceapi.DeviceRequest{
|
||||
Name: reqName, // need to be container name index - extended resource name index
|
||||
Exactly: &resourceapi.ExactDeviceRequest{
|
||||
DeviceClassName: className,
|
||||
AllocationMode: resourceapi.DeviceAllocationModeExactCount,
|
||||
Count: crq - sum, // the extra devices to request
|
||||
},
|
||||
}
|
||||
mappings = append(mappings, v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: container.Name,
|
||||
ResourceName: rName.String(),
|
||||
RequestName: reqName,
|
||||
})
|
||||
|
||||
return &deviceReq, mappings
|
||||
}
|
||||
|
||||
func createRequestsAndMappings(pod *v1.Pod, extendedResources map[v1.ResourceName]int64, logger klog.Logger, deviceClassMapping fwk.DeviceClassResolver) ([]resourceapi.DeviceRequest, []v1.ContainerExtendedResourceRequest) {
|
||||
containers := slices.Clone(pod.Spec.InitContainers)
|
||||
containers = append(containers, pod.Spec.Containers...)
|
||||
longLivedContainerIndices, shortLivedInitContainerIndices := partitionContainerIndices(containers, len(pod.Spec.InitContainers))
|
||||
|
||||
// all requests across all containers and resource types
|
||||
var deviceRequests []resourceapi.DeviceRequest
|
||||
// all mappings across all containers and resource types
|
||||
var mappings []v1.ContainerExtendedResourceRequest
|
||||
|
||||
// Sort resource names to ensure deterministic ordering of device requests and mappings.
|
||||
// Maps have non-deterministic iteration order in Go, so we extract and sort the keys.
|
||||
resourceNames := make([]v1.ResourceName, 0, len(extendedResources))
|
||||
for resource := range extendedResources {
|
||||
resourceNames = append(resourceNames, resource)
|
||||
}
|
||||
sort.Slice(resourceNames, func(i, j int) bool {
|
||||
return resourceNames[i] < resourceNames[j]
|
||||
})
|
||||
|
||||
for _, resource := range resourceNames {
|
||||
class := deviceClassMapping.GetDeviceClass(resource)
|
||||
// skip if the resource does not map to a device class
|
||||
if class == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// shortLivedResourceMappings is the mapping of container+resource→request for short lived containers (init non-sidecar container)
|
||||
var shortLivedResourceMappings []v1.ContainerExtendedResourceRequest
|
||||
// longLivedResourceMappings is the mapping of container+resource→request for long lived containers (init sidecar or regular container)
|
||||
var longLivedResourceMappings []v1.ContainerExtendedResourceRequest
|
||||
|
||||
// longLivedResourceRequests is the list of requests for a given resource by long-lived containers.
|
||||
// The length of this list is the same as the length of containers.
|
||||
// Entries may be nil if the container at that index did not produce a request for that resource.
|
||||
// Requests at later indices are reusable by non-sidecar initContainers at earlier indices.
|
||||
longLivedResourceRequests := make([]*resourceapi.DeviceRequest, len(containers))
|
||||
for _, i := range longLivedContainerIndices {
|
||||
containerRequest, containerMappings := createResourceRequestAndMappings(i, &containers[i], resource, class.Name, nil)
|
||||
longLivedResourceRequests[i] = containerRequest // might be nil
|
||||
longLivedResourceMappings = append(longLivedResourceMappings, containerMappings...) // might be zero-length
|
||||
}
|
||||
|
||||
// maxShortLivedResourceRequest is the maximum request for a given resource by short-lived containers
|
||||
var maxShortLivedResourceRequest *resourceapi.DeviceRequest
|
||||
// shortLivedRequestNames is all request names for a given resource by short-lived containers. All mappings to any name in
|
||||
// this set will be replaced by maxShortLivedResourceRequest.Name.
|
||||
shortLivedRequestNames := sets.New[string]()
|
||||
for _, i := range shortLivedInitContainerIndices {
|
||||
containerRequest, containerMappings := createResourceRequestAndMappings(i, &containers[i], resource, class.Name, longLivedResourceRequests[i:])
|
||||
if containerRequest != nil {
|
||||
shortLivedRequestNames.Insert(containerRequest.Name)
|
||||
if maxShortLivedResourceRequest == nil || maxShortLivedResourceRequest.Exactly.Count < containerRequest.Exactly.Count {
|
||||
maxShortLivedResourceRequest = containerRequest
|
||||
}
|
||||
}
|
||||
shortLivedResourceMappings = append(shortLivedResourceMappings, containerMappings...) // might be zero-length
|
||||
}
|
||||
|
||||
// rewrite mappings to short-lived requests to use the maximum short-lived request name
|
||||
if maxShortLivedResourceRequest != nil && len(shortLivedRequestNames) > 1 {
|
||||
shortLivedRequestNames.Delete(maxShortLivedResourceRequest.Name)
|
||||
for i := range shortLivedResourceMappings {
|
||||
if shortLivedRequestNames.Has(shortLivedResourceMappings[i].RequestName) {
|
||||
shortLivedResourceMappings[i].RequestName = maxShortLivedResourceRequest.Name
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// append non-nil requests
|
||||
if maxShortLivedResourceRequest != nil {
|
||||
deviceRequests = append(deviceRequests, *maxShortLivedResourceRequest)
|
||||
}
|
||||
for _, request := range longLivedResourceRequests {
|
||||
if request != nil {
|
||||
deviceRequests = append(deviceRequests, *request)
|
||||
}
|
||||
}
|
||||
// append mappings
|
||||
mappings = append(mappings, longLivedResourceMappings...)
|
||||
mappings = append(mappings, shortLivedResourceMappings...)
|
||||
}
|
||||
|
||||
sort.Slice(deviceRequests, func(i, j int) bool {
|
||||
return deviceRequests[i].Name < deviceRequests[j].Name
|
||||
})
|
||||
return deviceRequests, mappings
|
||||
}
|
||||
|
||||
// waitForExtendedClaimInAssumeCache polls the assume cache until the extended resource claim
|
||||
// becomes visible. This is necessary because extended resource claims are created in the API
|
||||
// server, and the informer update may not have reached the assume cache yet.
|
||||
//
|
||||
// AssumeClaimAfterAPICall returns ErrNotFound when the informer update hasn't arrived,
|
||||
// so we poll with a timeout.
|
||||
func (pl *DynamicResources) waitForExtendedClaimInAssumeCache(
|
||||
ctx context.Context,
|
||||
logger klog.Logger,
|
||||
claim *resourceapi.ResourceClaim,
|
||||
) {
|
||||
pollErr := wait.PollUntilContextTimeout(
|
||||
ctx,
|
||||
1*time.Second,
|
||||
time.Duration(AssumeExtendedResourceTimeoutDefaultSeconds)*time.Second,
|
||||
true,
|
||||
func(ctx context.Context) (bool, error) {
|
||||
if err := pl.draManager.ResourceClaims().AssumeClaimAfterAPICall(claim); err != nil {
|
||||
if errors.Is(err, assumecache.ErrNotFound) {
|
||||
return false, nil
|
||||
}
|
||||
logger.V(5).Info("Claim not stored in assume cache", "claim", klog.KObj(claim), "err", err)
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
},
|
||||
)
|
||||
|
||||
if pollErr != nil {
|
||||
logger.V(5).Info("Claim not stored in assume cache after retries", "claim", klog.KObj(claim), "err", pollErr)
|
||||
// Note: We log but don't fail - the claim was created successfully
|
||||
}
|
||||
}
|
||||
|
||||
// createExtendedResourceClaimInAPI creates an extended resource claim in the API server.
|
||||
func (pl *DynamicResources) createExtendedResourceClaimInAPI(
|
||||
ctx context.Context,
|
||||
logger klog.Logger,
|
||||
pod *v1.Pod,
|
||||
nodeName string,
|
||||
state *stateData,
|
||||
) (*resourceapi.ResourceClaim, error) {
|
||||
logger.V(5).Info("preparing to create claim for extended resources", "pod", klog.KObj(pod), "node", nodeName)
|
||||
// Get the node-specific claim that was prepared during Filter phase
|
||||
nodeAllocation, ok := state.nodeAllocations[nodeName]
|
||||
if !ok || nodeAllocation.extendedResourceClaim == nil {
|
||||
return nil, fmt.Errorf("extended resource claim not found for node %s", nodeName)
|
||||
}
|
||||
claim := nodeAllocation.extendedResourceClaim.DeepCopy()
|
||||
|
||||
logger.V(5).Info("create claim for extended resources", "pod", klog.KObj(pod), "node", nodeName, "resourceclaim", klog.Format(claim))
|
||||
// Clear fields which must or can not be set during creation.
|
||||
claim.Status.Allocation = nil
|
||||
claim.Name = ""
|
||||
claim.UID = ""
|
||||
|
||||
createdClaim, err := pl.clientset.ResourceV1().ResourceClaims(claim.Namespace).Create(ctx, claim, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
metrics.ResourceClaimCreatesTotal.WithLabelValues("failure").Inc()
|
||||
return nil, fmt.Errorf("create claim for extended resources %v: %w", klog.KObj(claim), err)
|
||||
}
|
||||
metrics.ResourceClaimCreatesTotal.WithLabelValues("success").Inc()
|
||||
logger.V(5).Info("created claim for extended resources", "pod", klog.KObj(pod), "node", nodeName, "resourceclaim", klog.Format(createdClaim))
|
||||
|
||||
return createdClaim, nil
|
||||
}
|
||||
|
||||
// patchPodExtendedResourceClaimStatus updates the pod's status with information about
|
||||
// the extended resource claim.
|
||||
func (pl *DynamicResources) patchPodExtendedResourceClaimStatus(
|
||||
ctx context.Context,
|
||||
pod *v1.Pod,
|
||||
claim *resourceapi.ResourceClaim,
|
||||
nodeName string,
|
||||
state *stateData,
|
||||
) error {
|
||||
var cer []v1.ContainerExtendedResourceRequest
|
||||
if nodeAllocation, ok := state.nodeAllocations[nodeName]; ok {
|
||||
cer = nodeAllocation.containerResourceRequestMappings
|
||||
}
|
||||
if len(cer) == 0 {
|
||||
return fmt.Errorf("nil or empty request mappings, no update of pod %s/%s ExtendedResourceClaimStatus", pod.Namespace, pod.Name)
|
||||
}
|
||||
|
||||
podStatusCopy := pod.Status.DeepCopy()
|
||||
podStatusCopy.ExtendedResourceClaimStatus = &v1.PodExtendedResourceClaimStatus{
|
||||
RequestMappings: cer,
|
||||
ResourceClaimName: claim.Name,
|
||||
}
|
||||
err := schedutil.PatchPodStatus(ctx, pl.clientset, pod.Name, pod.Namespace, &pod.Status, podStatusCopy)
|
||||
if err != nil {
|
||||
return fmt.Errorf("update pod %s/%s ExtendedResourceClaimStatus: %w", pod.Namespace, pod.Name, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// unreserveExtendedResourceClaim cleans up the scheduler-owned extended resource claim
|
||||
// when scheduling fails. It reverts the assume cache, and deletes the claim from the API
|
||||
// server if it was already created.
|
||||
func (pl *DynamicResources) unreserveExtendedResourceClaim(ctx context.Context, logger klog.Logger, pod *v1.Pod, state *stateData) {
|
||||
extendedResourceClaim := state.claims.extendedResourceClaim()
|
||||
if extendedResourceClaim == nil {
|
||||
// there is no extended resource claim
|
||||
return
|
||||
}
|
||||
|
||||
// If the claim was marked as pending allocation (in-flight), remove that marker and restore
|
||||
// the assumed claim state to what it was before this scheduling attempt.
|
||||
if deleted := pl.draManager.ResourceClaims().RemoveClaimPendingAllocation(state.claims.getInitialExtendedResourceClaimUID()); deleted {
|
||||
pl.draManager.ResourceClaims().AssumedClaimRestore(extendedResourceClaim.Namespace, extendedResourceClaim.Name)
|
||||
}
|
||||
if isSpecialClaimName(extendedResourceClaim.Name) {
|
||||
// In memory temporary extended resource claim does not need to be deleted
|
||||
return
|
||||
}
|
||||
// Claim was written to API server, need to delete it to prevent orphaned resources.
|
||||
logger.V(5).Info("delete extended resource backed by DRA", "resourceclaim", klog.KObj(extendedResourceClaim), "pod", klog.KObj(pod), "claim.UID", extendedResourceClaim.UID)
|
||||
extendedResourceClaim = extendedResourceClaim.DeepCopy()
|
||||
if err := pl.deleteClaim(ctx, extendedResourceClaim, logger); err != nil {
|
||||
logger.Error(err, "delete", "resourceclaim", klog.KObj(extendedResourceClaim))
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,565 @@
|
|||
/*
|
||||
Copyright 2025 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package dynamicresources
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
resourceapi "k8s.io/api/resource/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
fwk "k8s.io/kube-scheduler/framework"
|
||||
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
)
|
||||
|
||||
func Test_createRequestsAndMappings_requests(t *testing.T) {
|
||||
pod1 := st.MakePod().Name(podName).Namespace(namespace).
|
||||
UID(podUID).
|
||||
Res(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName): "1",
|
||||
v1.ResourceName(extendedResourceName + "1"): "2",
|
||||
}).
|
||||
Obj()
|
||||
pod2 := st.MakePod().Name(podName).Namespace(namespace).
|
||||
UID(podUID).
|
||||
Res(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName): "1",
|
||||
}).
|
||||
Res(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "1"): "2",
|
||||
}).
|
||||
Obj()
|
||||
|
||||
podInit := st.MakePod().Name(podName).Namespace(namespace).
|
||||
UID(podUID).
|
||||
Res(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName): "1",
|
||||
}).
|
||||
InitReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "2",
|
||||
}).
|
||||
Obj()
|
||||
podInit2 := st.MakePod().Name(podName).Namespace(namespace).
|
||||
UID(podUID).
|
||||
Res(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName): "1",
|
||||
}).
|
||||
InitReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "1",
|
||||
}).
|
||||
InitReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "2",
|
||||
}).
|
||||
Obj()
|
||||
podInit3 := st.MakePod().Name(podName).Namespace(namespace).
|
||||
UID(podUID).
|
||||
Res(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName): "1",
|
||||
}).
|
||||
InitReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "1",
|
||||
}).
|
||||
SidecarReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "1",
|
||||
}).
|
||||
InitReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "2",
|
||||
}).
|
||||
Obj()
|
||||
|
||||
res := map[v1.ResourceName]int64{
|
||||
v1.ResourceName(extendedResourceName): 1,
|
||||
}
|
||||
res2 := map[v1.ResourceName]int64{
|
||||
v1.ResourceName(extendedResourceName): 1,
|
||||
v1.ResourceName(extendedResourceName + "1"): 2,
|
||||
}
|
||||
resInit := map[v1.ResourceName]int64{
|
||||
v1.ResourceName(extendedResourceName): 1,
|
||||
v1.ResourceName(extendedResourceName + "init"): 2,
|
||||
}
|
||||
devMap := map[v1.ResourceName]*resourceapi.DeviceClass{
|
||||
v1.ResourceName(extendedResourceName): {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "class",
|
||||
},
|
||||
},
|
||||
}
|
||||
devMap2 := map[v1.ResourceName]*resourceapi.DeviceClass{
|
||||
v1.ResourceName(extendedResourceName): {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "class",
|
||||
},
|
||||
},
|
||||
v1.ResourceName(extendedResourceName + "1"): {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "class1",
|
||||
},
|
||||
},
|
||||
}
|
||||
devMapInit := map[v1.ResourceName]*resourceapi.DeviceClass{
|
||||
v1.ResourceName(extendedResourceName): {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "class",
|
||||
},
|
||||
},
|
||||
v1.ResourceName(extendedResourceName + "init"): {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "classInit",
|
||||
},
|
||||
},
|
||||
}
|
||||
devReq := resourceapi.DeviceRequest{
|
||||
Name: "container-0-request-0",
|
||||
Exactly: &resourceapi.ExactDeviceRequest{
|
||||
DeviceClassName: "class",
|
||||
AllocationMode: resourceapi.DeviceAllocationModeExactCount,
|
||||
Count: 1,
|
||||
},
|
||||
}
|
||||
devReq2 := resourceapi.DeviceRequest{
|
||||
Name: "container-0-request-1",
|
||||
Exactly: &resourceapi.ExactDeviceRequest{
|
||||
DeviceClassName: "class1",
|
||||
AllocationMode: resourceapi.DeviceAllocationModeExactCount,
|
||||
Count: 2,
|
||||
},
|
||||
}
|
||||
devReq3 := resourceapi.DeviceRequest{
|
||||
Name: "container-1-request-0",
|
||||
Exactly: &resourceapi.ExactDeviceRequest{
|
||||
DeviceClassName: "class1",
|
||||
AllocationMode: resourceapi.DeviceAllocationModeExactCount,
|
||||
Count: 2,
|
||||
},
|
||||
}
|
||||
devReqInit := resourceapi.DeviceRequest{
|
||||
Name: "container-1-request-0",
|
||||
Exactly: &resourceapi.ExactDeviceRequest{
|
||||
DeviceClassName: "class",
|
||||
AllocationMode: resourceapi.DeviceAllocationModeExactCount,
|
||||
Count: 1,
|
||||
},
|
||||
}
|
||||
devReqSidecar := resourceapi.DeviceRequest{
|
||||
Name: "container-1-request-0",
|
||||
Exactly: &resourceapi.ExactDeviceRequest{
|
||||
DeviceClassName: "classInit",
|
||||
AllocationMode: resourceapi.DeviceAllocationModeExactCount,
|
||||
Count: 1,
|
||||
},
|
||||
}
|
||||
devReq2Init := resourceapi.DeviceRequest{
|
||||
Name: "container-1-request-0",
|
||||
Exactly: &resourceapi.ExactDeviceRequest{
|
||||
DeviceClassName: "classInit",
|
||||
AllocationMode: resourceapi.DeviceAllocationModeExactCount,
|
||||
Count: 2,
|
||||
},
|
||||
}
|
||||
devReq6Init := resourceapi.DeviceRequest{
|
||||
Name: "container-0-request-0",
|
||||
Exactly: &resourceapi.ExactDeviceRequest{
|
||||
DeviceClassName: "classInit",
|
||||
AllocationMode: resourceapi.DeviceAllocationModeExactCount,
|
||||
Count: 2,
|
||||
},
|
||||
}
|
||||
devReq3Init := resourceapi.DeviceRequest{
|
||||
Name: "container-2-request-0",
|
||||
Exactly: &resourceapi.ExactDeviceRequest{
|
||||
DeviceClassName: "class",
|
||||
AllocationMode: resourceapi.DeviceAllocationModeExactCount,
|
||||
Count: 1,
|
||||
},
|
||||
}
|
||||
devReq4Init := resourceapi.DeviceRequest{
|
||||
Name: "container-3-request-0",
|
||||
Exactly: &resourceapi.ExactDeviceRequest{
|
||||
DeviceClassName: "class",
|
||||
AllocationMode: resourceapi.DeviceAllocationModeExactCount,
|
||||
Count: 1,
|
||||
},
|
||||
}
|
||||
devReq5Init := resourceapi.DeviceRequest{
|
||||
Name: "container-2-request-0",
|
||||
Exactly: &resourceapi.ExactDeviceRequest{
|
||||
DeviceClassName: "classInit",
|
||||
AllocationMode: resourceapi.DeviceAllocationModeExactCount,
|
||||
Count: 2,
|
||||
},
|
||||
}
|
||||
|
||||
testcases := map[string]struct {
|
||||
pod *v1.Pod
|
||||
extendedResources map[v1.ResourceName]int64
|
||||
cache fwk.DeviceClassResolver
|
||||
wantDeviceRequests []resourceapi.DeviceRequest
|
||||
}{
|
||||
"nil": {
|
||||
pod: pod1,
|
||||
wantDeviceRequests: nil,
|
||||
},
|
||||
"one resource match": {
|
||||
pod: pod1,
|
||||
extendedResources: res,
|
||||
cache: &mockDeviceClassResolver{mapping: devMap},
|
||||
wantDeviceRequests: []resourceapi.DeviceRequest{devReq},
|
||||
},
|
||||
"one resource match, one resource not match": {
|
||||
pod: pod1,
|
||||
extendedResources: res2,
|
||||
cache: &mockDeviceClassResolver{mapping: devMap},
|
||||
wantDeviceRequests: []resourceapi.DeviceRequest{devReq},
|
||||
},
|
||||
"two resources match": {
|
||||
pod: pod1,
|
||||
extendedResources: res2,
|
||||
cache: &mockDeviceClassResolver{mapping: devMap2},
|
||||
wantDeviceRequests: []resourceapi.DeviceRequest{devReq, devReq2},
|
||||
},
|
||||
"two containers match": {
|
||||
pod: pod2,
|
||||
extendedResources: res2,
|
||||
cache: &mockDeviceClassResolver{mapping: devMap2},
|
||||
wantDeviceRequests: []resourceapi.DeviceRequest{devReq, devReq3},
|
||||
},
|
||||
"one init container, one regular container": {
|
||||
pod: podInit,
|
||||
extendedResources: resInit,
|
||||
cache: &mockDeviceClassResolver{mapping: devMapInit},
|
||||
wantDeviceRequests: []resourceapi.DeviceRequest{devReq6Init, devReqInit},
|
||||
},
|
||||
"two init containers, one regular container": {
|
||||
pod: podInit2,
|
||||
extendedResources: resInit,
|
||||
cache: &mockDeviceClassResolver{mapping: devMapInit},
|
||||
wantDeviceRequests: []resourceapi.DeviceRequest{devReq2Init, devReq3Init},
|
||||
},
|
||||
"three init containers, one sidecar, one regular container": {
|
||||
pod: podInit3,
|
||||
extendedResources: resInit,
|
||||
cache: &mockDeviceClassResolver{mapping: devMapInit},
|
||||
wantDeviceRequests: []resourceapi.DeviceRequest{devReqSidecar, devReq5Init, devReq4Init},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testcases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
gotDeviceRequests, _ := createRequestsAndMappings(tc.pod, tc.extendedResources, logger, tc.cache)
|
||||
if len(tc.wantDeviceRequests) != len(gotDeviceRequests) {
|
||||
t.Fatalf("different length, want %#v, len=%v, got %#v, len=%v", tc.wantDeviceRequests, len(tc.wantDeviceRequests), gotDeviceRequests, len(gotDeviceRequests))
|
||||
}
|
||||
// gotDeviceRequests should already be sorted by createRequestsAndMappings
|
||||
for i, r := range tc.wantDeviceRequests {
|
||||
if r.Name != gotDeviceRequests[i].Name {
|
||||
t.Errorf("different name, want %#v, got %#v", r, gotDeviceRequests[i])
|
||||
}
|
||||
if r.Exactly.DeviceClassName != gotDeviceRequests[i].Exactly.DeviceClassName {
|
||||
t.Errorf("different deviceClassName, want %#v, got %#v", r, gotDeviceRequests[i])
|
||||
}
|
||||
if r.Exactly.AllocationMode != gotDeviceRequests[i].Exactly.AllocationMode {
|
||||
t.Errorf("different allocationMode, want %#v, got %#v", r, gotDeviceRequests[i])
|
||||
}
|
||||
if r.Exactly.Count != gotDeviceRequests[i].Exactly.Count {
|
||||
t.Errorf("different count, want %#v, got %#v", r.Exactly.Count, gotDeviceRequests[i].Exactly.Count)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_createRequestsAndMappings_mappings(t *testing.T) {
|
||||
pod1 := st.MakePod().Name(podName).Namespace(namespace).
|
||||
UID(podUID).
|
||||
Res(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName): "1",
|
||||
v1.ResourceName(extendedResourceName + "1"): "2",
|
||||
}).
|
||||
Obj()
|
||||
pod1InitImplicit := st.MakePod().Name(podName).Namespace(namespace).
|
||||
UID(podUID).
|
||||
InitReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "1",
|
||||
v1.ResourceName(resourceapi.ResourceDeviceClassPrefix + "classInit"): "2",
|
||||
}).
|
||||
Obj()
|
||||
pod2 := st.MakePod().Name(podName).Namespace(namespace).
|
||||
UID(podUID).
|
||||
Res(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName): "1",
|
||||
}).
|
||||
Res(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "1"): "2",
|
||||
}).
|
||||
Obj()
|
||||
|
||||
podInit := st.MakePod().Name(podName).Namespace(namespace).
|
||||
UID(podUID).
|
||||
Res(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName): "1",
|
||||
}).
|
||||
InitReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "2",
|
||||
}).
|
||||
Obj()
|
||||
podInit2 := st.MakePod().Name(podName).Namespace(namespace).
|
||||
UID(podUID).
|
||||
Res(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName): "1",
|
||||
}).
|
||||
InitReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "1",
|
||||
}).
|
||||
InitReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "2",
|
||||
}).
|
||||
Obj()
|
||||
podInitImplicit := st.MakePod().Name(podName).Namespace(namespace).
|
||||
UID(podUID).
|
||||
Res(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName): "1",
|
||||
}).
|
||||
InitReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "1",
|
||||
}).
|
||||
InitReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(resourceapi.ResourceDeviceClassPrefix + "classInit"): "2",
|
||||
}).
|
||||
Obj()
|
||||
podInit3 := st.MakePod().Name(podName).Namespace(namespace).
|
||||
UID(podUID).
|
||||
Res(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName): "1",
|
||||
}).
|
||||
InitReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "1",
|
||||
}).
|
||||
SidecarReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "1",
|
||||
}).
|
||||
InitReq(map[v1.ResourceName]string{
|
||||
v1.ResourceName(extendedResourceName + "init"): "2",
|
||||
}).
|
||||
Obj()
|
||||
res := map[v1.ResourceName]int64{
|
||||
v1.ResourceName(extendedResourceName): 1,
|
||||
v1.ResourceName(extendedResourceName + "1"): 2,
|
||||
}
|
||||
resInit := map[v1.ResourceName]int64{
|
||||
v1.ResourceName(extendedResourceName): 1,
|
||||
v1.ResourceName(extendedResourceName + "init"): 2,
|
||||
}
|
||||
resInitImplicit := map[v1.ResourceName]int64{
|
||||
v1.ResourceName(extendedResourceName): 1,
|
||||
v1.ResourceName(extendedResourceName + "init"): 2,
|
||||
v1.ResourceName(resourceapi.ResourceDeviceClassPrefix + "classInit"): 2,
|
||||
}
|
||||
devMap := map[v1.ResourceName]*resourceapi.DeviceClass{
|
||||
v1.ResourceName(extendedResourceName): {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "class",
|
||||
},
|
||||
},
|
||||
v1.ResourceName(extendedResourceName + "1"): {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "class1",
|
||||
},
|
||||
},
|
||||
}
|
||||
devMapInit := map[v1.ResourceName]*resourceapi.DeviceClass{
|
||||
v1.ResourceName(extendedResourceName): {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "class",
|
||||
},
|
||||
},
|
||||
v1.ResourceName(extendedResourceName + "init"): {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "classInit",
|
||||
},
|
||||
},
|
||||
v1.ResourceName(resourceapi.ResourceDeviceClassPrefix + "classInit"): {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "classInit",
|
||||
},
|
||||
},
|
||||
}
|
||||
cer := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "con0",
|
||||
ResourceName: extendedResourceName,
|
||||
RequestName: "container-0-request-0",
|
||||
}
|
||||
cer1 := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "con0",
|
||||
ResourceName: extendedResourceName + "1",
|
||||
RequestName: "container-0-request-1",
|
||||
}
|
||||
cer2 := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "con1",
|
||||
ResourceName: extendedResourceName + "1",
|
||||
RequestName: "container-1-request-0",
|
||||
}
|
||||
cer3 := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "con0",
|
||||
ResourceName: extendedResourceName,
|
||||
RequestName: "container-1-request-0",
|
||||
}
|
||||
cer4 := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "con0",
|
||||
ResourceName: extendedResourceName,
|
||||
RequestName: "container-2-request-0",
|
||||
}
|
||||
cer5 := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "con0",
|
||||
ResourceName: extendedResourceName,
|
||||
RequestName: "container-3-request-0",
|
||||
}
|
||||
cerInit := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "init-con0",
|
||||
ResourceName: extendedResourceName + "init",
|
||||
RequestName: "container-1-request-0",
|
||||
}
|
||||
cerInit0 := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "init-con0",
|
||||
ResourceName: extendedResourceName + "init",
|
||||
RequestName: "container-0-request-0",
|
||||
}
|
||||
cerInit1 := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "init-con0",
|
||||
ResourceName: extendedResourceName + "init",
|
||||
RequestName: "container-1-request-0",
|
||||
}
|
||||
cerInit2 := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "init-con1",
|
||||
ResourceName: extendedResourceName + "init",
|
||||
RequestName: "container-1-request-0",
|
||||
}
|
||||
cerInit3 := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "init-con2",
|
||||
ResourceName: extendedResourceName + "init",
|
||||
RequestName: "container-2-request-0",
|
||||
}
|
||||
cerSidecar := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "sidecar-con1",
|
||||
ResourceName: extendedResourceName + "init",
|
||||
RequestName: "container-1-request-0",
|
||||
}
|
||||
|
||||
cerInitImplicit := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "init-con0",
|
||||
ResourceName: extendedResourceName + "init",
|
||||
RequestName: "container-0-request-0",
|
||||
}
|
||||
cerInit4Implicit := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "init-con0",
|
||||
ResourceName: extendedResourceName + "init",
|
||||
RequestName: "container-0-request-1",
|
||||
}
|
||||
cerInit2Implicit := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "init-con1",
|
||||
ResourceName: resourceapi.ResourceDeviceClassPrefix + "classInit",
|
||||
RequestName: "container-1-request-0",
|
||||
}
|
||||
cerInit3Implicit := v1.ContainerExtendedResourceRequest{
|
||||
ContainerName: "init-con0",
|
||||
ResourceName: resourceapi.ResourceDeviceClassPrefix + "classInit",
|
||||
RequestName: "container-0-request-0",
|
||||
}
|
||||
|
||||
testcases := map[string]struct {
|
||||
pod *v1.Pod
|
||||
extnededResources map[v1.ResourceName]int64
|
||||
deviceClassMapping fwk.DeviceClassResolver
|
||||
wantReqMappings []v1.ContainerExtendedResourceRequest
|
||||
}{
|
||||
"one container, two requests": {
|
||||
pod: pod1,
|
||||
extnededResources: res,
|
||||
deviceClassMapping: &mockDeviceClassResolver{devMap},
|
||||
wantReqMappings: []v1.ContainerExtendedResourceRequest{cer, cer1},
|
||||
},
|
||||
"one container, one explicit and one implicit request": {
|
||||
pod: pod1InitImplicit,
|
||||
extnededResources: resInitImplicit,
|
||||
deviceClassMapping: &mockDeviceClassResolver{devMapInit},
|
||||
wantReqMappings: []v1.ContainerExtendedResourceRequest{cerInit3Implicit, cerInit4Implicit},
|
||||
},
|
||||
"two containers, two requests": {
|
||||
pod: pod2,
|
||||
extnededResources: res,
|
||||
deviceClassMapping: &mockDeviceClassResolver{devMap},
|
||||
wantReqMappings: []v1.ContainerExtendedResourceRequest{cer, cer2},
|
||||
},
|
||||
"one init container, one regular container, one request": {
|
||||
pod: podInit,
|
||||
extnededResources: resInit,
|
||||
deviceClassMapping: &mockDeviceClassResolver{devMapInit},
|
||||
wantReqMappings: []v1.ContainerExtendedResourceRequest{cerInit0, cer3},
|
||||
},
|
||||
"three containers (two are init container), two requests": {
|
||||
pod: podInit2,
|
||||
extnededResources: resInit,
|
||||
deviceClassMapping: &mockDeviceClassResolver{devMapInit},
|
||||
wantReqMappings: []v1.ContainerExtendedResourceRequest{cerInit, cerInit2, cer4},
|
||||
},
|
||||
"three containers (two are init container), both explicit and implicit resources": {
|
||||
pod: podInitImplicit,
|
||||
extnededResources: resInitImplicit,
|
||||
deviceClassMapping: &mockDeviceClassResolver{devMapInit},
|
||||
wantReqMappings: []v1.ContainerExtendedResourceRequest{cerInitImplicit, cerInit2Implicit, cer4},
|
||||
},
|
||||
"four containers (two are init container, one sidecar), three requests": {
|
||||
pod: podInit3,
|
||||
extnededResources: resInit,
|
||||
deviceClassMapping: &mockDeviceClassResolver{devMapInit},
|
||||
wantReqMappings: []v1.ContainerExtendedResourceRequest{cerInit1, cerSidecar, cerInit3, cer5},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testcases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
_, gotReqMappings := createRequestsAndMappings(tc.pod, tc.extnededResources, logger, tc.deviceClassMapping)
|
||||
if len(tc.wantReqMappings) != len(gotReqMappings) {
|
||||
t.Fatalf("different length, want %#v, got %#v", tc.wantReqMappings, gotReqMappings)
|
||||
}
|
||||
sort.Slice(gotReqMappings, func(i, j int) bool {
|
||||
if gotReqMappings[i].RequestName < gotReqMappings[j].RequestName {
|
||||
return true
|
||||
}
|
||||
if gotReqMappings[i].RequestName > gotReqMappings[j].RequestName {
|
||||
return false
|
||||
}
|
||||
return gotReqMappings[i].ContainerName < gotReqMappings[j].ContainerName
|
||||
})
|
||||
for i, r := range tc.wantReqMappings {
|
||||
if r.RequestName != gotReqMappings[i].RequestName {
|
||||
t.Errorf("different request name, want %#v, got %#v", r, gotReqMappings[i])
|
||||
}
|
||||
if r.ContainerName != gotReqMappings[i].ContainerName {
|
||||
t.Errorf("different container name, want %#v, got %#v", r, gotReqMappings[i])
|
||||
}
|
||||
if r.ResourceName != gotReqMappings[i].ResourceName {
|
||||
t.Errorf("different resource name, want %#v, got %#v", r, gotReqMappings[i])
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue