diff --git a/pkg/kubelet/allocation/allocation_manager.go b/pkg/kubelet/allocation/allocation_manager.go index 074601feca3..4f67d04d996 100644 --- a/pkg/kubelet/allocation/allocation_manager.go +++ b/pkg/kubelet/allocation/allocation_manager.go @@ -18,7 +18,6 @@ package allocation import ( "context" - "fmt" "path/filepath" "slices" "sync" @@ -34,21 +33,15 @@ import ( resourcehelper "k8s.io/component-helpers/resource" "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" - "k8s.io/kubernetes/pkg/api/v1/resource" v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/allocation/state" - "k8s.io/kubernetes/pkg/kubelet/cm" - "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" - "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/status" - kubetypes "k8s.io/kubernetes/pkg/kubelet/types" - "k8s.io/kubernetes/pkg/kubelet/util/format" ) // podStatusManagerStateFile is the file name where status manager stores its state @@ -121,12 +114,10 @@ type Manager interface { type manager struct { allocated state.State - admitHandlers lifecycle.PodAdmitHandlers - containerRuntime kubecontainer.Runtime - statusManager status.Manager - sourcesReady config.SourcesReady - nodeConfig cm.NodeConfig - nodeAllocatableAbsolute v1.ResourceList + admitHandlers lifecycle.PodAdmitHandlers + containerRuntime kubecontainer.Runtime + statusManager status.Manager + sourcesReady config.SourcesReady ticker *time.Ticker triggerPodSync func(pod *v1.Pod) @@ -139,10 +130,7 @@ type manager struct { recorder record.EventRecorderLogger } -func NewManager( - checkpointDirectory string, - nodeConfig cm.NodeConfig, - nodeAllocatableAbsolute v1.ResourceList, +func NewManager(checkpointDirectory string, statusManager status.Manager, triggerPodSync func(pod *v1.Pod), getActivePods func() []*v1.Pod, @@ -156,11 +144,9 @@ func NewManager( return &manager{ allocated: newStateImpl(logger, checkpointDirectory, allocatedPodsStateFile), - statusManager: statusManager, - admitHandlers: lifecycle.PodAdmitHandlers{}, - sourcesReady: sourcesReady, - nodeConfig: nodeConfig, - nodeAllocatableAbsolute: nodeAllocatableAbsolute, + statusManager: statusManager, + admitHandlers: lifecycle.PodAdmitHandlers{}, + sourcesReady: sourcesReady, ticker: time.NewTicker(initialRetryDelay), triggerPodSync: triggerPodSync, @@ -189,8 +175,6 @@ func newStateImpl(logger klog.Logger, checkpointDirectory, checkpointName string // NewInMemoryManager returns an allocation manager that doesn't persist state. // For testing purposes only! func NewInMemoryManager( - nodeConfig cm.NodeConfig, - nodeAllocatableAbsolute v1.ResourceList, statusManager status.Manager, triggerPodSync func(pod *v1.Pod), getActivePods func() []*v1.Pod, @@ -201,11 +185,9 @@ func NewInMemoryManager( return &manager{ allocated: state.NewStateMemory(nil), - statusManager: statusManager, - admitHandlers: lifecycle.PodAdmitHandlers{}, - sourcesReady: sourcesReady, - nodeConfig: nodeConfig, - nodeAllocatableAbsolute: nodeAllocatableAbsolute, + statusManager: statusManager, + admitHandlers: lifecycle.PodAdmitHandlers{}, + sourcesReady: sourcesReady, ticker: time.NewTicker(initialRetryDelay), triggerPodSync: triggerPodSync, @@ -554,7 +536,7 @@ func (m *manager) AddPod(activePods []*v1.Pod, pod *v1.Pod) (bool, string, strin // Check if we can admit the pod; if so, update the allocation. allocatedPods := m.getAllocatedPods(activePods) - ok, reason, message := m.canAdmitPod(logger, allocatedPods, pod) + ok, reason, message := m.canAdmitPod(logger, allocatedPods, pod, lifecycle.AddOperation) if ok && utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { // Checkpoint the resource values at which the Pod has been admitted or resized. @@ -582,42 +564,20 @@ func (m *manager) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) { } func (m *manager) handlePodResourcesResize(logger klog.Logger, pod *v1.Pod) (bool, error) { - allocatedPod, updated := m.UpdatePodFromAllocation(pod) + _, updated := m.UpdatePodFromAllocation(pod) if !updated { // Desired resources == allocated resources. Pod allocation does not need to be updated. m.statusManager.ClearPodResizePendingCondition(pod.UID) return false, nil - - } else if resizable, msg, reason := IsInPlacePodVerticalScalingAllowed(pod); !resizable { - // If there is a pending resize but the resize is not allowed, always use the allocated resources. - metrics.PodInfeasibleResizes.WithLabelValues(reason).Inc() - m.statusManager.SetPodResizePendingCondition(pod.UID, v1.PodReasonInfeasible, msg, pod.Generation) - return false, nil - - } else if resizeNotAllowed, msg := disallowResizeForSwappableContainers(m.containerRuntime, pod, allocatedPod); resizeNotAllowed { - // If this resize involve swap recalculation, set as infeasible, as IPPR with swap is not supported for beta. - metrics.PodInfeasibleResizes.WithLabelValues("swap_limitation").Inc() - m.statusManager.SetPodResizePendingCondition(pod.UID, v1.PodReasonInfeasible, msg, pod.Generation) - return false, nil - } - - if !apiequality.Semantic.DeepEqual(pod.Spec.Resources, allocatedPod.Spec.Resources) { - if resizable, msg, reason := IsInPlacePodLevelResourcesVerticalScalingAllowed(pod); !resizable { - // If there is a pending pod-level resources resize but the resize is not allowed, always use the allocated resources. - metrics.PodInfeasibleResizes.WithLabelValues(reason).Inc() - m.statusManager.SetPodResizePendingCondition(pod.UID, v1.PodReasonInfeasible, msg, pod.Generation) - return false, nil - } } // Desired resources != allocated resources. Can we update the allocation to the desired resources? - fit, reason, message := m.canResizePod(logger, m.getAllocatedPods(m.getActivePods()), pod) + fit, reason, message := m.canAdmitPod(logger, m.getAllocatedPods(m.getActivePods()), pod, lifecycle.ResizeOperation) if fit { // Update pod resource allocation checkpoint if err := m.SetAllocatedResources(pod); err != nil { return false, err } - allocatedPod = pod m.statusManager.ClearPodResizePendingCondition(pod.UID) // Clear any errors that may have been surfaced from a previous resize and update the @@ -625,9 +585,8 @@ func (m *manager) handlePodResourcesResize(logger klog.Logger, pod *v1.Pod) (boo m.statusManager.ClearPodResizeInProgressCondition(pod.UID) m.statusManager.SetPodResizeInProgressCondition(pod.UID, "", "", pod.Generation) - msg := events.PodResizeStartedMsg(logger, allocatedPod, pod.Generation) + msg := events.PodResizeStartedMsg(logger, pod, pod.Generation) m.recorder.WithLogger(logger).Eventf(pod, v1.EventTypeNormal, events.ResizeStarted, msg) - return true, nil } @@ -645,40 +604,6 @@ func (m *manager) handlePodResourcesResize(logger klog.Logger, pod *v1.Pod) (boo return false, nil } -func disallowResizeForSwappableContainers(runtime kubecontainer.Runtime, desiredPod, allocatedPod *v1.Pod) (bool, string) { - if desiredPod == nil || allocatedPod == nil { - return false, "" - } - restartableMemoryResizePolicy := func(resizePolicies []v1.ContainerResizePolicy) bool { - for _, policy := range resizePolicies { - if policy.ResourceName == v1.ResourceMemory { - return policy.RestartPolicy == v1.RestartContainer - } - } - return false - } - allocatedContainers := make(map[string]v1.Container) - for _, container := range append(allocatedPod.Spec.Containers, allocatedPod.Spec.InitContainers...) { - allocatedContainers[container.Name] = container - } - for _, desiredContainer := range append(desiredPod.Spec.Containers, desiredPod.Spec.InitContainers...) { - allocatedContainer, ok := allocatedContainers[desiredContainer.Name] - if !ok { - continue - } - origMemRequest := desiredContainer.Resources.Requests[v1.ResourceMemory] - newMemRequest := allocatedContainer.Resources.Requests[v1.ResourceMemory] - if !origMemRequest.Equal(newMemRequest) && !restartableMemoryResizePolicy(allocatedContainer.ResizePolicy) { - aSwapBehavior := runtime.GetContainerSwapBehavior(desiredPod, &desiredContainer) - bSwapBehavior := runtime.GetContainerSwapBehavior(allocatedPod, &allocatedContainer) - if aSwapBehavior != kubetypes.NoSwap || bSwapBehavior != kubetypes.NoSwap { - return true, "In-place resize of containers with swap is not supported." - } - } - } - return false, "" -} - // canAdmitPod determines if a pod can be admitted, and gives a reason if it // cannot. "pod" is new pod, while "pods" are all admitted pods // The function returns a boolean value indicating whether the pod @@ -686,15 +611,15 @@ func disallowResizeForSwappableContainers(runtime kubecontainer.Runtime, desired // the pod cannot be admitted. // allocatedPods should represent the pods that have already been admitted, along with their // admitted (allocated) resources. -func (m *manager) canAdmitPod(logger klog.Logger, allocatedPods []*v1.Pod, pod *v1.Pod) (bool, string, string) { +func (m *manager) canAdmitPod(logger klog.Logger, allocatedPods []*v1.Pod, pod *v1.Pod, operation lifecycle.Operation) (bool, string, string) { // Filter out the pod being evaluated. allocatedPods = slices.DeleteFunc(allocatedPods, func(p *v1.Pod) bool { return p.UID == pod.UID }) // If any handler rejects, the pod is rejected. - attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: allocatedPods} + attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: allocatedPods, Operation: operation} for _, podAdmitHandler := range m.admitHandlers { if result := podAdmitHandler.Admit(attrs); !result.Admit { - logger.Info("Pod admission denied", "podUID", attrs.Pod.UID, "pod", klog.KObj(attrs.Pod), "reason", result.Reason, "message", result.Message) + logger.Info("Pod admission denied", "podUID", attrs.Pod.UID, "pod", klog.KObj(attrs.Pod), "reason", result.Reason, "message", result.Message, "operation", operation) return false, result.Reason, result.Message } } @@ -702,73 +627,6 @@ func (m *manager) canAdmitPod(logger klog.Logger, allocatedPods []*v1.Pod, pod * return true, "", "" } -// canResizePod determines if the requested resize is currently feasible. -// pod should hold the desired (pre-allocated) spec. -// Returns true if the resize can proceed; returns a reason and message -// otherwise. -func (m *manager) canResizePod(logger klog.Logger, allocatedPods []*v1.Pod, pod *v1.Pod) (bool, string, string) { - // TODO: Move this logic into a PodAdmitHandler by introducing an operation field to - // lifecycle.PodAdmitAttributes, and combine canResizePod with canAdmitPod. - if v1qos.GetPodQOS(pod) == v1.PodQOSGuaranteed { - if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScalingExclusiveCPUs) && - m.nodeConfig.CPUManagerPolicy == string(cpumanager.PolicyStatic) && - m.guaranteedPodResourceResizeRequired(pod, v1.ResourceCPU) { - msg := fmt.Sprintf("Resize is infeasible for Guaranteed Pods alongside CPU Manager policy \"%s\"", string(cpumanager.PolicyStatic)) - logger.V(3).Info(msg, "pod", format.Pod(pod)) - metrics.PodInfeasibleResizes.WithLabelValues("guaranteed_pod_cpu_manager_static_policy").Inc() - return false, v1.PodReasonInfeasible, msg - } - if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScalingExclusiveMemory) && - m.nodeConfig.MemoryManagerPolicy == string(memorymanager.PolicyTypeStatic) && - m.guaranteedPodResourceResizeRequired(pod, v1.ResourceMemory) { - msg := fmt.Sprintf("Resize is infeasible for Guaranteed Pods alongside Memory Manager policy \"%s\"", string(memorymanager.PolicyTypeStatic)) - logger.V(3).Info(msg, "pod", format.Pod(pod)) - metrics.PodInfeasibleResizes.WithLabelValues("guaranteed_pod_memory_manager_static_policy").Inc() - return false, v1.PodReasonInfeasible, msg - } - } - - cpuAvailable := m.nodeAllocatableAbsolute.Cpu().MilliValue() - memAvailable := m.nodeAllocatableAbsolute.Memory().Value() - cpuRequests := resource.GetResourceRequest(pod, v1.ResourceCPU) - memRequests := resource.GetResourceRequest(pod, v1.ResourceMemory) - if cpuRequests > cpuAvailable || memRequests > memAvailable { - var msg string - if memRequests > memAvailable { - msg = fmt.Sprintf("memory, requested: %d, capacity: %d", memRequests, memAvailable) - } else { - msg = fmt.Sprintf("cpu, requested: %d, capacity: %d", cpuRequests, cpuAvailable) - } - msg = "Node didn't have enough capacity: " + msg - logger.V(3).Info(msg, "pod", klog.KObj(pod)) - metrics.PodInfeasibleResizes.WithLabelValues("insufficient_node_allocatable").Inc() - return false, v1.PodReasonInfeasible, msg - } - - if ok, failReason, failMessage := m.canAdmitPod(logger, allocatedPods, pod); !ok { - // Log reason and return. - logger.V(3).Info("Resize cannot be accommodated", "pod", klog.KObj(pod), "reason", failReason, "message", failMessage) - return false, v1.PodReasonDeferred, failMessage - } - - return true, "", "" -} - -func (m *manager) guaranteedPodResourceResizeRequired(pod *v1.Pod, resourceName v1.ResourceName) bool { - for container, containerType := range podutil.ContainerIter(&pod.Spec, podutil.InitContainers|podutil.Containers) { - if !IsResizableContainer(container, containerType) { - continue - } - requestedResources := container.Resources - allocatedresources, _ := m.GetContainerResourceAllocation(pod.UID, container.Name) - // For Guaranteed pods, requests must equal limits, so checking requests is sufficient. - if !requestedResources.Requests[resourceName].Equal(allocatedresources.Requests[resourceName]) { - return true - } - } - return false -} - func (m *manager) getAllocatedPods(activePods []*v1.Pod) []*v1.Pod { if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { return activePods diff --git a/pkg/kubelet/allocation/allocation_manager_test.go b/pkg/kubelet/allocation/allocation_manager_test.go index 3bb2442c656..75f0c847051 100644 --- a/pkg/kubelet/allocation/allocation_manager_test.go +++ b/pkg/kubelet/allocation/allocation_manager_test.go @@ -2390,6 +2390,7 @@ func TestRecordPodDeferredAcceptedResizes(t *testing.T) { func makeAllocationManager(t *testing.T, runtime *containertest.FakeRuntime, allocatedPods []*v1.Pod, nodeConfig *cm.NodeConfig) Manager { t.Helper() + logger, _ := ktesting.NewTestContext(t) statusManager := status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(), &statustest.FakePodDeletionSafetyProvider{}, kubeletutil.NewPodStartupLatencyTracker()) var containerManager *cm.FakeContainerManager if nodeConfig == nil { @@ -2398,8 +2399,6 @@ func makeAllocationManager(t *testing.T, runtime *containertest.FakeRuntime, all containerManager = cm.NewFakeContainerManagerWithNodeConfig(*nodeConfig) } allocationManager := NewInMemoryManager( - containerManager.GetNodeConfig(), - containerManager.GetNodeAllocatableAbsolute(), statusManager, func(pod *v1.Pod) { /* For testing, just mark the pod as having a pod sync triggered in an annotation. */ @@ -2437,9 +2436,10 @@ func makeAllocationManager(t *testing.T, runtime *containertest.FakeRuntime, all }, }, nil } - handler := lifecycle.NewPredicateAdmitHandler(getNode, lifecycle.NewAdmissionFailureHandlerStub(), containerManager.UpdatePluginResources) - allocationManager.AddPodAdmitHandlers(lifecycle.PodAdmitHandlers{handler}) + predicateHandler := lifecycle.NewPredicateAdmitHandler(getNode, lifecycle.NewAdmissionFailureHandlerStub(), containerManager.UpdatePluginResources) + resizeHandler := NewPodResizesAdmitHandler(containerManager, runtime, allocationManager, logger) + allocationManager.AddPodAdmitHandlers(lifecycle.PodAdmitHandlers{resizeHandler, predicateHandler}) return allocationManager } diff --git a/pkg/kubelet/allocation/handlers.go b/pkg/kubelet/allocation/handlers.go new file mode 100644 index 00000000000..77e7433d7a3 --- /dev/null +++ b/pkg/kubelet/allocation/handlers.go @@ -0,0 +1,171 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package allocation + +import ( + "fmt" + + v1 "k8s.io/api/core/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/klog/v2" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/kubernetes/pkg/api/v1/resource" + v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/kubelet/cm" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" + "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/lifecycle" + "k8s.io/kubernetes/pkg/kubelet/metrics" + kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/pkg/kubelet/util/format" +) + +// NewPodResizesAdmitHandler returns a PodAdmitHandler which is used to evaluate +// if a pod resize can be allocated by the kubelet. +func NewPodResizesAdmitHandler(containerManager cm.ContainerManager, containerRuntime kubecontainer.Runtime, allocationManager Manager, logger klog.Logger) lifecycle.PodAdmitHandler { + return &podResizesAdmitHandler{ + containerManager: containerManager, + containerRuntime: containerRuntime, + allocationManager: allocationManager, + logger: logger, + } +} + +type podResizesAdmitHandler struct { + containerManager cm.ContainerManager + containerRuntime kubecontainer.Runtime + allocationManager Manager + logger klog.Logger +} + +func (h *podResizesAdmitHandler) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult { + if attrs.Operation != lifecycle.ResizeOperation { + return lifecycle.PodAdmitResult{Admit: true} + } + + pod := attrs.Pod + allocatedPod, _ := h.allocationManager.UpdatePodFromAllocation(pod) + if resizable, msg, reason := IsInPlacePodVerticalScalingAllowed(pod); !resizable { + // If there is a pending resize but the resize is not allowed, always use the allocated resources. + metrics.PodInfeasibleResizes.WithLabelValues(reason).Inc() + return lifecycle.PodAdmitResult{Admit: false, Reason: v1.PodReasonInfeasible, Message: msg} + } + if resizeNotAllowed, msg := disallowResizeForSwappableContainers(h.containerRuntime, pod, allocatedPod); resizeNotAllowed { + // If this resize involve swap recalculation, set as infeasible, as IPPR with swap is not supported for beta. + metrics.PodInfeasibleResizes.WithLabelValues("swap_limitation").Inc() + return lifecycle.PodAdmitResult{Admit: false, Reason: v1.PodReasonInfeasible, Message: msg} + } + if !apiequality.Semantic.DeepEqual(pod.Spec.Resources, allocatedPod.Spec.Resources) { + if resizable, msg, reason := IsInPlacePodLevelResourcesVerticalScalingAllowed(pod); !resizable { + // If there is a pending pod-level resources resize but the resize is not allowed, always use the allocated resources. + metrics.PodInfeasibleResizes.WithLabelValues(reason).Inc() + return lifecycle.PodAdmitResult{Admit: false, Reason: v1.PodReasonInfeasible, Message: msg} + } + } + + if v1qos.GetPodQOS(pod) == v1.PodQOSGuaranteed { + if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScalingExclusiveCPUs) && + h.containerManager.GetNodeConfig().CPUManagerPolicy == string(cpumanager.PolicyStatic) && + h.guaranteedPodResourceResizeRequired(pod, v1.ResourceCPU) { + msg := fmt.Sprintf("Resize is infeasible for Guaranteed Pods alongside CPU Manager policy \"%s\"", string(cpumanager.PolicyStatic)) + h.logger.V(3).Info(msg, "pod", format.Pod(pod)) + metrics.PodInfeasibleResizes.WithLabelValues("guaranteed_pod_cpu_manager_static_policy").Inc() + return lifecycle.PodAdmitResult{Admit: false, Reason: v1.PodReasonInfeasible, Message: msg} + } + if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScalingExclusiveMemory) && + h.containerManager.GetNodeConfig().MemoryManagerPolicy == string(memorymanager.PolicyTypeStatic) && + h.guaranteedPodResourceResizeRequired(pod, v1.ResourceMemory) { + msg := fmt.Sprintf("Resize is infeasible for Guaranteed Pods alongside Memory Manager policy \"%s\"", string(memorymanager.PolicyTypeStatic)) + h.logger.V(3).Info(msg, "pod", format.Pod(pod)) + metrics.PodInfeasibleResizes.WithLabelValues("guaranteed_pod_memory_manager_static_policy").Inc() + return lifecycle.PodAdmitResult{Admit: false, Reason: v1.PodReasonInfeasible, Message: msg} + } + } + + allocatable := h.containerManager.GetNodeAllocatableAbsolute() + cpuAvailable := allocatable.Cpu().MilliValue() + memAvailable := allocatable.Memory().Value() + cpuRequests := resource.GetResourceRequest(pod, v1.ResourceCPU) + memRequests := resource.GetResourceRequest(pod, v1.ResourceMemory) + + if cpuRequests > cpuAvailable || memRequests > memAvailable { + var msg string + if memRequests > memAvailable { + msg = fmt.Sprintf("memory, requested: %d, capacity: %d", memRequests, memAvailable) + } else { + msg = fmt.Sprintf("cpu, requested: %d, capacity: %d", cpuRequests, cpuAvailable) + } + msg = "Node didn't have enough capacity: " + msg + h.logger.V(3).Info(msg, "pod", klog.KObj(pod)) + metrics.PodInfeasibleResizes.WithLabelValues("insufficient_node_allocatable").Inc() + return lifecycle.PodAdmitResult{Admit: false, Reason: v1.PodReasonInfeasible, Message: msg} + } + + return lifecycle.PodAdmitResult{Admit: true} +} + +func disallowResizeForSwappableContainers(runtime kubecontainer.Runtime, desiredPod, allocatedPod *v1.Pod) (bool, string) { + if desiredPod == nil || allocatedPod == nil { + return false, "" + } + restartableMemoryResizePolicy := func(resizePolicies []v1.ContainerResizePolicy) bool { + for _, policy := range resizePolicies { + if policy.ResourceName == v1.ResourceMemory { + return policy.RestartPolicy == v1.RestartContainer + } + } + return false + } + allocatedContainers := make(map[string]v1.Container) + for _, container := range append(allocatedPod.Spec.Containers, allocatedPod.Spec.InitContainers...) { + allocatedContainers[container.Name] = container + } + for _, desiredContainer := range append(desiredPod.Spec.Containers, desiredPod.Spec.InitContainers...) { + allocatedContainer, ok := allocatedContainers[desiredContainer.Name] + if !ok { + continue + } + origMemRequest := desiredContainer.Resources.Requests[v1.ResourceMemory] + newMemRequest := allocatedContainer.Resources.Requests[v1.ResourceMemory] + if !origMemRequest.Equal(newMemRequest) && !restartableMemoryResizePolicy(allocatedContainer.ResizePolicy) { + aSwapBehavior := runtime.GetContainerSwapBehavior(desiredPod, &desiredContainer) + bSwapBehavior := runtime.GetContainerSwapBehavior(allocatedPod, &allocatedContainer) + if aSwapBehavior != kubetypes.NoSwap || bSwapBehavior != kubetypes.NoSwap { + return true, "In-place resize of containers with swap is not supported." + } + } + } + return false, "" +} + +func (h *podResizesAdmitHandler) guaranteedPodResourceResizeRequired(pod *v1.Pod, resourceName v1.ResourceName) bool { + for container, containerType := range podutil.ContainerIter(&pod.Spec, podutil.InitContainers|podutil.Containers) { + if !IsResizableContainer(container, containerType) { + continue + } + requestedResources := container.Resources + allocatedresources, _ := h.allocationManager.GetContainerResourceAllocation(pod.UID, container.Name) + // For Guaranteed pods, requests must equal limits, so checking requests is sufficient. + if !requestedResources.Requests[resourceName].Equal(allocatedresources.Requests[resourceName]) { + return true + } + } + return false +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 5a11e482496..7af7f860839 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -689,8 +689,6 @@ func NewMainKubelet(ctx context.Context, klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet, kubeDeps.PodStartupLatencyTracker) klet.allocationManager = allocation.NewManager( klet.getRootDir(), - klet.containerManager.GetNodeConfig(), - klet.containerManager.GetNodeAllocatableAbsolute(), klet.statusManager, func(pod *v1.Pod) { klet.HandlePodSyncs(ctx, []*v1.Pod{pod}) }, klet.GetActivePods, @@ -700,7 +698,6 @@ func NewMainKubelet(ctx context.Context, ) klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(ctx, klet, kubeCfg.VolumeStatsAggPeriod.Duration, kubeDeps.Recorder) - klet.runtimeService = kubeDeps.RemoteRuntimeService if kubeDeps.KubeClient != nil { @@ -807,6 +804,7 @@ func NewMainKubelet(ctx context.Context, klet.streamingRuntime = runtime klet.runner = runtime klet.allocationManager.SetContainerRuntime(runtime) + resizeAdmitHandler := allocation.NewPodResizesAdmitHandler(klet.containerManager, runtime, klet.allocationManager, logger) runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime, runtimeCacheRefreshPeriod) if err != nil { @@ -1023,6 +1021,8 @@ func NewMainKubelet(ctx context.Context, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock, kubeCfg.LocalStorageCapacityIsolation) klet.evictionManager = evictionManager + handlers := []lifecycle.PodAdmitHandler{} + handlers = append(handlers, evictionAdmitHandler) if utilfeature.DefaultFeatureGate.Enabled(features.NodeDeclaredFeatures) { v, err := versionutil.Parse(version.Get().String()) @@ -1039,9 +1039,6 @@ func NewMainKubelet(ctx context.Context, klet.nodeDeclaredFeaturesSet = ndf.NewFeatureSet(klet.nodeDeclaredFeatures...) } - handlers := []lifecycle.PodAdmitHandler{} - handlers = append(handlers, evictionAdmitHandler) - // Safe, allowed sysctls can always be used as unsafe sysctls in the spec. // Hence, we concatenate those two lists. safeAndUnsafeSysctls := append(sysctl.SafeSysctlAllowlist(ctx), allowedUnsafeSysctls...) @@ -1109,7 +1106,8 @@ func NewMainKubelet(ctx context.Context, }) klet.shutdownManager = shutdownManager handlers = append(handlers, shutdownManager) - klet.allocationManager.AddPodAdmitHandlers(handlers) + + klet.allocationManager.AddPodAdmitHandlers(append([]lifecycle.PodAdmitHandler{resizeAdmitHandler}, handlers...)) var usernsIDsPerPod *int64 if kubeCfg.UserNamespaces != nil { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 69103b8b226..4111ba7c3af 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -332,8 +332,6 @@ func newTestKubeletWithImageList( } kubelet.allocationManager = allocation.NewInMemoryManager( - kubelet.containerManager.GetNodeConfig(), - kubelet.containerManager.GetNodeAllocatableAbsolute(), kubelet.statusManager, func(pod *v1.Pod) { kubelet.HandlePodSyncs(tCtx, []*v1.Pod{pod}) }, kubelet.GetActivePods, @@ -398,6 +396,7 @@ func newTestKubeletWithImageList( kubelet.evictionManager = evictionManager handlers := []lifecycle.PodAdmitHandler{} handlers = append(handlers, evictionAdmitHandler) + handlers = append(handlers, allocation.NewPodResizesAdmitHandler(kubelet.containerManager, fakeRuntime, kubelet.allocationManager, logger)) // setup shutdown manager shutdownManager := nodeshutdown.NewManager(&nodeshutdown.Config{ diff --git a/pkg/kubelet/lifecycle/interfaces.go b/pkg/kubelet/lifecycle/interfaces.go index 3be7adade54..864c69dc8fc 100644 --- a/pkg/kubelet/lifecycle/interfaces.go +++ b/pkg/kubelet/lifecycle/interfaces.go @@ -16,7 +16,7 @@ limitations under the License. package lifecycle -import "k8s.io/api/core/v1" +import v1 "k8s.io/api/core/v1" // PodAdmitAttributes is the context for a pod admission decision. // The member fields of this struct should never be mutated. @@ -25,8 +25,20 @@ type PodAdmitAttributes struct { Pod *v1.Pod // all pods bound to the kubelet excluding the pod being evaluated OtherPods []*v1.Pod + // the operation being performed; either "add" or "resize" + Operation Operation } +// Operation represents the type of operation being performed on a pod. +type Operation string + +const ( + // AddOperation indicates that the pod is being added to the kubelet. + AddOperation Operation = "add" + // ResizeOperation indicates that the pod is being resized. + ResizeOperation Operation = "resize" +) + // PodAdmitResult provides the result of a pod admission decision. type PodAdmitResult struct { // if true, the pod should be admitted. diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index 4dc418744a0..8892f2daf59 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -35,7 +35,6 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/kubelet/allocation" - "k8s.io/kubernetes/pkg/kubelet/cm" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" @@ -455,7 +454,7 @@ func createPodWorkers() (*podWorkers, *containertest.FakeRuntime, map[types.UID] time.Second, time.Millisecond, fakeCache, - allocation.NewInMemoryManager(cm.NodeConfig{}, nil, nil, nil, nil, nil, nil, nil), + allocation.NewInMemoryManager(nil, nil, nil, nil, nil, nil), ) workers := w.(*podWorkers) workers.clock = clock @@ -2135,7 +2134,7 @@ func TestFakePodWorkers(t *testing.T) { time.Second, time.Second, fakeCache, - allocation.NewInMemoryManager(cm.NodeConfig{}, nil, nil, nil, nil, nil, nil, nil), + allocation.NewInMemoryManager(nil, nil, nil, nil, nil, nil), ) fakePodWorkers := &fakePodWorkers{ syncPodFn: kubeletForFakeWorkers.SyncPod, diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index c7d611ab536..865cfa3678a 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -281,6 +281,11 @@ func (m *manager) SetPodResizePendingCondition(podUID types.UID, reason, message previousCondition := m.podResizeConditions[podUID].PodResizePending + if reason != v1.PodReasonInfeasible { + // For all other "pending" reasons, we set the reason to "Deferred". + reason = v1.PodReasonDeferred + } + m.podResizeConditions[podUID] = podResizeConditions{ PodResizePending: updatedPodResizeCondition(v1.PodResizePending, m.podResizeConditions[podUID].PodResizePending, reason, message, observedGeneration), PodResizeInProgress: m.podResizeConditions[podUID].PodResizeInProgress, diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index 0ce640ad61d..abeca21915c 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -2161,7 +2161,7 @@ func TestPodResizeConditions(t *testing.T) { { name: "set pod resize pending condition to deferred with message", updateFunc: func(podUID types.UID) bool { - return m.SetPodResizePendingCondition(podUID, v1.PodReasonDeferred, "some-message", 1) + return m.SetPodResizePendingCondition(podUID, "some-reason", "some-message", 1) }, expectedUpdateFuncReturnVal: true, expected: []*v1.PodCondition{