mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-02-18 18:28:18 -05:00
Merge pull request #133427 from natasha41575/admitHandler
[FG:InPlacePodVerticalScaling] refactor allocation feasibility check into its own admitHandler
This commit is contained in:
commit
311071d300
9 changed files with 220 additions and 178 deletions
|
|
@ -18,7 +18,6 @@ package allocation
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
"sync"
|
||||
|
|
@ -34,21 +33,15 @@ import (
|
|||
resourcehelper "k8s.io/component-helpers/resource"
|
||||
"k8s.io/klog/v2"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/api/v1/resource"
|
||||
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/kubelet/allocation/state"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/events"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
||||
)
|
||||
|
||||
// podStatusManagerStateFile is the file name where status manager stores its state
|
||||
|
|
@ -121,12 +114,10 @@ type Manager interface {
|
|||
type manager struct {
|
||||
allocated state.State
|
||||
|
||||
admitHandlers lifecycle.PodAdmitHandlers
|
||||
containerRuntime kubecontainer.Runtime
|
||||
statusManager status.Manager
|
||||
sourcesReady config.SourcesReady
|
||||
nodeConfig cm.NodeConfig
|
||||
nodeAllocatableAbsolute v1.ResourceList
|
||||
admitHandlers lifecycle.PodAdmitHandlers
|
||||
containerRuntime kubecontainer.Runtime
|
||||
statusManager status.Manager
|
||||
sourcesReady config.SourcesReady
|
||||
|
||||
ticker *time.Ticker
|
||||
triggerPodSync func(pod *v1.Pod)
|
||||
|
|
@ -139,10 +130,7 @@ type manager struct {
|
|||
recorder record.EventRecorderLogger
|
||||
}
|
||||
|
||||
func NewManager(
|
||||
checkpointDirectory string,
|
||||
nodeConfig cm.NodeConfig,
|
||||
nodeAllocatableAbsolute v1.ResourceList,
|
||||
func NewManager(checkpointDirectory string,
|
||||
statusManager status.Manager,
|
||||
triggerPodSync func(pod *v1.Pod),
|
||||
getActivePods func() []*v1.Pod,
|
||||
|
|
@ -156,11 +144,9 @@ func NewManager(
|
|||
return &manager{
|
||||
allocated: newStateImpl(logger, checkpointDirectory, allocatedPodsStateFile),
|
||||
|
||||
statusManager: statusManager,
|
||||
admitHandlers: lifecycle.PodAdmitHandlers{},
|
||||
sourcesReady: sourcesReady,
|
||||
nodeConfig: nodeConfig,
|
||||
nodeAllocatableAbsolute: nodeAllocatableAbsolute,
|
||||
statusManager: statusManager,
|
||||
admitHandlers: lifecycle.PodAdmitHandlers{},
|
||||
sourcesReady: sourcesReady,
|
||||
|
||||
ticker: time.NewTicker(initialRetryDelay),
|
||||
triggerPodSync: triggerPodSync,
|
||||
|
|
@ -189,8 +175,6 @@ func newStateImpl(logger klog.Logger, checkpointDirectory, checkpointName string
|
|||
// NewInMemoryManager returns an allocation manager that doesn't persist state.
|
||||
// For testing purposes only!
|
||||
func NewInMemoryManager(
|
||||
nodeConfig cm.NodeConfig,
|
||||
nodeAllocatableAbsolute v1.ResourceList,
|
||||
statusManager status.Manager,
|
||||
triggerPodSync func(pod *v1.Pod),
|
||||
getActivePods func() []*v1.Pod,
|
||||
|
|
@ -201,11 +185,9 @@ func NewInMemoryManager(
|
|||
return &manager{
|
||||
allocated: state.NewStateMemory(nil),
|
||||
|
||||
statusManager: statusManager,
|
||||
admitHandlers: lifecycle.PodAdmitHandlers{},
|
||||
sourcesReady: sourcesReady,
|
||||
nodeConfig: nodeConfig,
|
||||
nodeAllocatableAbsolute: nodeAllocatableAbsolute,
|
||||
statusManager: statusManager,
|
||||
admitHandlers: lifecycle.PodAdmitHandlers{},
|
||||
sourcesReady: sourcesReady,
|
||||
|
||||
ticker: time.NewTicker(initialRetryDelay),
|
||||
triggerPodSync: triggerPodSync,
|
||||
|
|
@ -554,7 +536,7 @@ func (m *manager) AddPod(activePods []*v1.Pod, pod *v1.Pod) (bool, string, strin
|
|||
|
||||
// Check if we can admit the pod; if so, update the allocation.
|
||||
allocatedPods := m.getAllocatedPods(activePods)
|
||||
ok, reason, message := m.canAdmitPod(logger, allocatedPods, pod)
|
||||
ok, reason, message := m.canAdmitPod(logger, allocatedPods, pod, lifecycle.AddOperation)
|
||||
|
||||
if ok && utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
|
||||
// Checkpoint the resource values at which the Pod has been admitted or resized.
|
||||
|
|
@ -582,42 +564,20 @@ func (m *manager) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) {
|
|||
}
|
||||
|
||||
func (m *manager) handlePodResourcesResize(logger klog.Logger, pod *v1.Pod) (bool, error) {
|
||||
allocatedPod, updated := m.UpdatePodFromAllocation(pod)
|
||||
_, updated := m.UpdatePodFromAllocation(pod)
|
||||
if !updated {
|
||||
// Desired resources == allocated resources. Pod allocation does not need to be updated.
|
||||
m.statusManager.ClearPodResizePendingCondition(pod.UID)
|
||||
return false, nil
|
||||
|
||||
} else if resizable, msg, reason := IsInPlacePodVerticalScalingAllowed(pod); !resizable {
|
||||
// If there is a pending resize but the resize is not allowed, always use the allocated resources.
|
||||
metrics.PodInfeasibleResizes.WithLabelValues(reason).Inc()
|
||||
m.statusManager.SetPodResizePendingCondition(pod.UID, v1.PodReasonInfeasible, msg, pod.Generation)
|
||||
return false, nil
|
||||
|
||||
} else if resizeNotAllowed, msg := disallowResizeForSwappableContainers(m.containerRuntime, pod, allocatedPod); resizeNotAllowed {
|
||||
// If this resize involve swap recalculation, set as infeasible, as IPPR with swap is not supported for beta.
|
||||
metrics.PodInfeasibleResizes.WithLabelValues("swap_limitation").Inc()
|
||||
m.statusManager.SetPodResizePendingCondition(pod.UID, v1.PodReasonInfeasible, msg, pod.Generation)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if !apiequality.Semantic.DeepEqual(pod.Spec.Resources, allocatedPod.Spec.Resources) {
|
||||
if resizable, msg, reason := IsInPlacePodLevelResourcesVerticalScalingAllowed(pod); !resizable {
|
||||
// If there is a pending pod-level resources resize but the resize is not allowed, always use the allocated resources.
|
||||
metrics.PodInfeasibleResizes.WithLabelValues(reason).Inc()
|
||||
m.statusManager.SetPodResizePendingCondition(pod.UID, v1.PodReasonInfeasible, msg, pod.Generation)
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Desired resources != allocated resources. Can we update the allocation to the desired resources?
|
||||
fit, reason, message := m.canResizePod(logger, m.getAllocatedPods(m.getActivePods()), pod)
|
||||
fit, reason, message := m.canAdmitPod(logger, m.getAllocatedPods(m.getActivePods()), pod, lifecycle.ResizeOperation)
|
||||
if fit {
|
||||
// Update pod resource allocation checkpoint
|
||||
if err := m.SetAllocatedResources(pod); err != nil {
|
||||
return false, err
|
||||
}
|
||||
allocatedPod = pod
|
||||
m.statusManager.ClearPodResizePendingCondition(pod.UID)
|
||||
|
||||
// Clear any errors that may have been surfaced from a previous resize and update the
|
||||
|
|
@ -625,9 +585,8 @@ func (m *manager) handlePodResourcesResize(logger klog.Logger, pod *v1.Pod) (boo
|
|||
m.statusManager.ClearPodResizeInProgressCondition(pod.UID)
|
||||
m.statusManager.SetPodResizeInProgressCondition(pod.UID, "", "", pod.Generation)
|
||||
|
||||
msg := events.PodResizeStartedMsg(logger, allocatedPod, pod.Generation)
|
||||
msg := events.PodResizeStartedMsg(logger, pod, pod.Generation)
|
||||
m.recorder.WithLogger(logger).Eventf(pod, v1.EventTypeNormal, events.ResizeStarted, msg)
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
|
|
@ -645,40 +604,6 @@ func (m *manager) handlePodResourcesResize(logger klog.Logger, pod *v1.Pod) (boo
|
|||
return false, nil
|
||||
}
|
||||
|
||||
func disallowResizeForSwappableContainers(runtime kubecontainer.Runtime, desiredPod, allocatedPod *v1.Pod) (bool, string) {
|
||||
if desiredPod == nil || allocatedPod == nil {
|
||||
return false, ""
|
||||
}
|
||||
restartableMemoryResizePolicy := func(resizePolicies []v1.ContainerResizePolicy) bool {
|
||||
for _, policy := range resizePolicies {
|
||||
if policy.ResourceName == v1.ResourceMemory {
|
||||
return policy.RestartPolicy == v1.RestartContainer
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
allocatedContainers := make(map[string]v1.Container)
|
||||
for _, container := range append(allocatedPod.Spec.Containers, allocatedPod.Spec.InitContainers...) {
|
||||
allocatedContainers[container.Name] = container
|
||||
}
|
||||
for _, desiredContainer := range append(desiredPod.Spec.Containers, desiredPod.Spec.InitContainers...) {
|
||||
allocatedContainer, ok := allocatedContainers[desiredContainer.Name]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
origMemRequest := desiredContainer.Resources.Requests[v1.ResourceMemory]
|
||||
newMemRequest := allocatedContainer.Resources.Requests[v1.ResourceMemory]
|
||||
if !origMemRequest.Equal(newMemRequest) && !restartableMemoryResizePolicy(allocatedContainer.ResizePolicy) {
|
||||
aSwapBehavior := runtime.GetContainerSwapBehavior(desiredPod, &desiredContainer)
|
||||
bSwapBehavior := runtime.GetContainerSwapBehavior(allocatedPod, &allocatedContainer)
|
||||
if aSwapBehavior != kubetypes.NoSwap || bSwapBehavior != kubetypes.NoSwap {
|
||||
return true, "In-place resize of containers with swap is not supported."
|
||||
}
|
||||
}
|
||||
}
|
||||
return false, ""
|
||||
}
|
||||
|
||||
// canAdmitPod determines if a pod can be admitted, and gives a reason if it
|
||||
// cannot. "pod" is new pod, while "pods" are all admitted pods
|
||||
// The function returns a boolean value indicating whether the pod
|
||||
|
|
@ -686,15 +611,15 @@ func disallowResizeForSwappableContainers(runtime kubecontainer.Runtime, desired
|
|||
// the pod cannot be admitted.
|
||||
// allocatedPods should represent the pods that have already been admitted, along with their
|
||||
// admitted (allocated) resources.
|
||||
func (m *manager) canAdmitPod(logger klog.Logger, allocatedPods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
|
||||
func (m *manager) canAdmitPod(logger klog.Logger, allocatedPods []*v1.Pod, pod *v1.Pod, operation lifecycle.Operation) (bool, string, string) {
|
||||
// Filter out the pod being evaluated.
|
||||
allocatedPods = slices.DeleteFunc(allocatedPods, func(p *v1.Pod) bool { return p.UID == pod.UID })
|
||||
|
||||
// If any handler rejects, the pod is rejected.
|
||||
attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: allocatedPods}
|
||||
attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: allocatedPods, Operation: operation}
|
||||
for _, podAdmitHandler := range m.admitHandlers {
|
||||
if result := podAdmitHandler.Admit(attrs); !result.Admit {
|
||||
logger.Info("Pod admission denied", "podUID", attrs.Pod.UID, "pod", klog.KObj(attrs.Pod), "reason", result.Reason, "message", result.Message)
|
||||
logger.Info("Pod admission denied", "podUID", attrs.Pod.UID, "pod", klog.KObj(attrs.Pod), "reason", result.Reason, "message", result.Message, "operation", operation)
|
||||
return false, result.Reason, result.Message
|
||||
}
|
||||
}
|
||||
|
|
@ -702,73 +627,6 @@ func (m *manager) canAdmitPod(logger klog.Logger, allocatedPods []*v1.Pod, pod *
|
|||
return true, "", ""
|
||||
}
|
||||
|
||||
// canResizePod determines if the requested resize is currently feasible.
|
||||
// pod should hold the desired (pre-allocated) spec.
|
||||
// Returns true if the resize can proceed; returns a reason and message
|
||||
// otherwise.
|
||||
func (m *manager) canResizePod(logger klog.Logger, allocatedPods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
|
||||
// TODO: Move this logic into a PodAdmitHandler by introducing an operation field to
|
||||
// lifecycle.PodAdmitAttributes, and combine canResizePod with canAdmitPod.
|
||||
if v1qos.GetPodQOS(pod) == v1.PodQOSGuaranteed {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScalingExclusiveCPUs) &&
|
||||
m.nodeConfig.CPUManagerPolicy == string(cpumanager.PolicyStatic) &&
|
||||
m.guaranteedPodResourceResizeRequired(pod, v1.ResourceCPU) {
|
||||
msg := fmt.Sprintf("Resize is infeasible for Guaranteed Pods alongside CPU Manager policy \"%s\"", string(cpumanager.PolicyStatic))
|
||||
logger.V(3).Info(msg, "pod", format.Pod(pod))
|
||||
metrics.PodInfeasibleResizes.WithLabelValues("guaranteed_pod_cpu_manager_static_policy").Inc()
|
||||
return false, v1.PodReasonInfeasible, msg
|
||||
}
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScalingExclusiveMemory) &&
|
||||
m.nodeConfig.MemoryManagerPolicy == string(memorymanager.PolicyTypeStatic) &&
|
||||
m.guaranteedPodResourceResizeRequired(pod, v1.ResourceMemory) {
|
||||
msg := fmt.Sprintf("Resize is infeasible for Guaranteed Pods alongside Memory Manager policy \"%s\"", string(memorymanager.PolicyTypeStatic))
|
||||
logger.V(3).Info(msg, "pod", format.Pod(pod))
|
||||
metrics.PodInfeasibleResizes.WithLabelValues("guaranteed_pod_memory_manager_static_policy").Inc()
|
||||
return false, v1.PodReasonInfeasible, msg
|
||||
}
|
||||
}
|
||||
|
||||
cpuAvailable := m.nodeAllocatableAbsolute.Cpu().MilliValue()
|
||||
memAvailable := m.nodeAllocatableAbsolute.Memory().Value()
|
||||
cpuRequests := resource.GetResourceRequest(pod, v1.ResourceCPU)
|
||||
memRequests := resource.GetResourceRequest(pod, v1.ResourceMemory)
|
||||
if cpuRequests > cpuAvailable || memRequests > memAvailable {
|
||||
var msg string
|
||||
if memRequests > memAvailable {
|
||||
msg = fmt.Sprintf("memory, requested: %d, capacity: %d", memRequests, memAvailable)
|
||||
} else {
|
||||
msg = fmt.Sprintf("cpu, requested: %d, capacity: %d", cpuRequests, cpuAvailable)
|
||||
}
|
||||
msg = "Node didn't have enough capacity: " + msg
|
||||
logger.V(3).Info(msg, "pod", klog.KObj(pod))
|
||||
metrics.PodInfeasibleResizes.WithLabelValues("insufficient_node_allocatable").Inc()
|
||||
return false, v1.PodReasonInfeasible, msg
|
||||
}
|
||||
|
||||
if ok, failReason, failMessage := m.canAdmitPod(logger, allocatedPods, pod); !ok {
|
||||
// Log reason and return.
|
||||
logger.V(3).Info("Resize cannot be accommodated", "pod", klog.KObj(pod), "reason", failReason, "message", failMessage)
|
||||
return false, v1.PodReasonDeferred, failMessage
|
||||
}
|
||||
|
||||
return true, "", ""
|
||||
}
|
||||
|
||||
func (m *manager) guaranteedPodResourceResizeRequired(pod *v1.Pod, resourceName v1.ResourceName) bool {
|
||||
for container, containerType := range podutil.ContainerIter(&pod.Spec, podutil.InitContainers|podutil.Containers) {
|
||||
if !IsResizableContainer(container, containerType) {
|
||||
continue
|
||||
}
|
||||
requestedResources := container.Resources
|
||||
allocatedresources, _ := m.GetContainerResourceAllocation(pod.UID, container.Name)
|
||||
// For Guaranteed pods, requests must equal limits, so checking requests is sufficient.
|
||||
if !requestedResources.Requests[resourceName].Equal(allocatedresources.Requests[resourceName]) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (m *manager) getAllocatedPods(activePods []*v1.Pod) []*v1.Pod {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
|
||||
return activePods
|
||||
|
|
|
|||
|
|
@ -2390,6 +2390,7 @@ func TestRecordPodDeferredAcceptedResizes(t *testing.T) {
|
|||
|
||||
func makeAllocationManager(t *testing.T, runtime *containertest.FakeRuntime, allocatedPods []*v1.Pod, nodeConfig *cm.NodeConfig) Manager {
|
||||
t.Helper()
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
statusManager := status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(), &statustest.FakePodDeletionSafetyProvider{}, kubeletutil.NewPodStartupLatencyTracker())
|
||||
var containerManager *cm.FakeContainerManager
|
||||
if nodeConfig == nil {
|
||||
|
|
@ -2398,8 +2399,6 @@ func makeAllocationManager(t *testing.T, runtime *containertest.FakeRuntime, all
|
|||
containerManager = cm.NewFakeContainerManagerWithNodeConfig(*nodeConfig)
|
||||
}
|
||||
allocationManager := NewInMemoryManager(
|
||||
containerManager.GetNodeConfig(),
|
||||
containerManager.GetNodeAllocatableAbsolute(),
|
||||
statusManager,
|
||||
func(pod *v1.Pod) {
|
||||
/* For testing, just mark the pod as having a pod sync triggered in an annotation. */
|
||||
|
|
@ -2437,9 +2436,10 @@ func makeAllocationManager(t *testing.T, runtime *containertest.FakeRuntime, all
|
|||
},
|
||||
}, nil
|
||||
}
|
||||
handler := lifecycle.NewPredicateAdmitHandler(getNode, lifecycle.NewAdmissionFailureHandlerStub(), containerManager.UpdatePluginResources)
|
||||
allocationManager.AddPodAdmitHandlers(lifecycle.PodAdmitHandlers{handler})
|
||||
|
||||
predicateHandler := lifecycle.NewPredicateAdmitHandler(getNode, lifecycle.NewAdmissionFailureHandlerStub(), containerManager.UpdatePluginResources)
|
||||
resizeHandler := NewPodResizesAdmitHandler(containerManager, runtime, allocationManager, logger)
|
||||
allocationManager.AddPodAdmitHandlers(lifecycle.PodAdmitHandlers{resizeHandler, predicateHandler})
|
||||
return allocationManager
|
||||
}
|
||||
|
||||
|
|
|
|||
171
pkg/kubelet/allocation/handlers.go
Normal file
171
pkg/kubelet/allocation/handlers.go
Normal file
|
|
@ -0,0 +1,171 @@
|
|||
/*
|
||||
Copyright The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package allocation
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/klog/v2"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/api/v1/resource"
|
||||
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
||||
)
|
||||
|
||||
// NewPodResizesAdmitHandler returns a PodAdmitHandler which is used to evaluate
|
||||
// if a pod resize can be allocated by the kubelet.
|
||||
func NewPodResizesAdmitHandler(containerManager cm.ContainerManager, containerRuntime kubecontainer.Runtime, allocationManager Manager, logger klog.Logger) lifecycle.PodAdmitHandler {
|
||||
return &podResizesAdmitHandler{
|
||||
containerManager: containerManager,
|
||||
containerRuntime: containerRuntime,
|
||||
allocationManager: allocationManager,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
type podResizesAdmitHandler struct {
|
||||
containerManager cm.ContainerManager
|
||||
containerRuntime kubecontainer.Runtime
|
||||
allocationManager Manager
|
||||
logger klog.Logger
|
||||
}
|
||||
|
||||
func (h *podResizesAdmitHandler) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
|
||||
if attrs.Operation != lifecycle.ResizeOperation {
|
||||
return lifecycle.PodAdmitResult{Admit: true}
|
||||
}
|
||||
|
||||
pod := attrs.Pod
|
||||
allocatedPod, _ := h.allocationManager.UpdatePodFromAllocation(pod)
|
||||
if resizable, msg, reason := IsInPlacePodVerticalScalingAllowed(pod); !resizable {
|
||||
// If there is a pending resize but the resize is not allowed, always use the allocated resources.
|
||||
metrics.PodInfeasibleResizes.WithLabelValues(reason).Inc()
|
||||
return lifecycle.PodAdmitResult{Admit: false, Reason: v1.PodReasonInfeasible, Message: msg}
|
||||
}
|
||||
if resizeNotAllowed, msg := disallowResizeForSwappableContainers(h.containerRuntime, pod, allocatedPod); resizeNotAllowed {
|
||||
// If this resize involve swap recalculation, set as infeasible, as IPPR with swap is not supported for beta.
|
||||
metrics.PodInfeasibleResizes.WithLabelValues("swap_limitation").Inc()
|
||||
return lifecycle.PodAdmitResult{Admit: false, Reason: v1.PodReasonInfeasible, Message: msg}
|
||||
}
|
||||
if !apiequality.Semantic.DeepEqual(pod.Spec.Resources, allocatedPod.Spec.Resources) {
|
||||
if resizable, msg, reason := IsInPlacePodLevelResourcesVerticalScalingAllowed(pod); !resizable {
|
||||
// If there is a pending pod-level resources resize but the resize is not allowed, always use the allocated resources.
|
||||
metrics.PodInfeasibleResizes.WithLabelValues(reason).Inc()
|
||||
return lifecycle.PodAdmitResult{Admit: false, Reason: v1.PodReasonInfeasible, Message: msg}
|
||||
}
|
||||
}
|
||||
|
||||
if v1qos.GetPodQOS(pod) == v1.PodQOSGuaranteed {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScalingExclusiveCPUs) &&
|
||||
h.containerManager.GetNodeConfig().CPUManagerPolicy == string(cpumanager.PolicyStatic) &&
|
||||
h.guaranteedPodResourceResizeRequired(pod, v1.ResourceCPU) {
|
||||
msg := fmt.Sprintf("Resize is infeasible for Guaranteed Pods alongside CPU Manager policy \"%s\"", string(cpumanager.PolicyStatic))
|
||||
h.logger.V(3).Info(msg, "pod", format.Pod(pod))
|
||||
metrics.PodInfeasibleResizes.WithLabelValues("guaranteed_pod_cpu_manager_static_policy").Inc()
|
||||
return lifecycle.PodAdmitResult{Admit: false, Reason: v1.PodReasonInfeasible, Message: msg}
|
||||
}
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScalingExclusiveMemory) &&
|
||||
h.containerManager.GetNodeConfig().MemoryManagerPolicy == string(memorymanager.PolicyTypeStatic) &&
|
||||
h.guaranteedPodResourceResizeRequired(pod, v1.ResourceMemory) {
|
||||
msg := fmt.Sprintf("Resize is infeasible for Guaranteed Pods alongside Memory Manager policy \"%s\"", string(memorymanager.PolicyTypeStatic))
|
||||
h.logger.V(3).Info(msg, "pod", format.Pod(pod))
|
||||
metrics.PodInfeasibleResizes.WithLabelValues("guaranteed_pod_memory_manager_static_policy").Inc()
|
||||
return lifecycle.PodAdmitResult{Admit: false, Reason: v1.PodReasonInfeasible, Message: msg}
|
||||
}
|
||||
}
|
||||
|
||||
allocatable := h.containerManager.GetNodeAllocatableAbsolute()
|
||||
cpuAvailable := allocatable.Cpu().MilliValue()
|
||||
memAvailable := allocatable.Memory().Value()
|
||||
cpuRequests := resource.GetResourceRequest(pod, v1.ResourceCPU)
|
||||
memRequests := resource.GetResourceRequest(pod, v1.ResourceMemory)
|
||||
|
||||
if cpuRequests > cpuAvailable || memRequests > memAvailable {
|
||||
var msg string
|
||||
if memRequests > memAvailable {
|
||||
msg = fmt.Sprintf("memory, requested: %d, capacity: %d", memRequests, memAvailable)
|
||||
} else {
|
||||
msg = fmt.Sprintf("cpu, requested: %d, capacity: %d", cpuRequests, cpuAvailable)
|
||||
}
|
||||
msg = "Node didn't have enough capacity: " + msg
|
||||
h.logger.V(3).Info(msg, "pod", klog.KObj(pod))
|
||||
metrics.PodInfeasibleResizes.WithLabelValues("insufficient_node_allocatable").Inc()
|
||||
return lifecycle.PodAdmitResult{Admit: false, Reason: v1.PodReasonInfeasible, Message: msg}
|
||||
}
|
||||
|
||||
return lifecycle.PodAdmitResult{Admit: true}
|
||||
}
|
||||
|
||||
func disallowResizeForSwappableContainers(runtime kubecontainer.Runtime, desiredPod, allocatedPod *v1.Pod) (bool, string) {
|
||||
if desiredPod == nil || allocatedPod == nil {
|
||||
return false, ""
|
||||
}
|
||||
restartableMemoryResizePolicy := func(resizePolicies []v1.ContainerResizePolicy) bool {
|
||||
for _, policy := range resizePolicies {
|
||||
if policy.ResourceName == v1.ResourceMemory {
|
||||
return policy.RestartPolicy == v1.RestartContainer
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
allocatedContainers := make(map[string]v1.Container)
|
||||
for _, container := range append(allocatedPod.Spec.Containers, allocatedPod.Spec.InitContainers...) {
|
||||
allocatedContainers[container.Name] = container
|
||||
}
|
||||
for _, desiredContainer := range append(desiredPod.Spec.Containers, desiredPod.Spec.InitContainers...) {
|
||||
allocatedContainer, ok := allocatedContainers[desiredContainer.Name]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
origMemRequest := desiredContainer.Resources.Requests[v1.ResourceMemory]
|
||||
newMemRequest := allocatedContainer.Resources.Requests[v1.ResourceMemory]
|
||||
if !origMemRequest.Equal(newMemRequest) && !restartableMemoryResizePolicy(allocatedContainer.ResizePolicy) {
|
||||
aSwapBehavior := runtime.GetContainerSwapBehavior(desiredPod, &desiredContainer)
|
||||
bSwapBehavior := runtime.GetContainerSwapBehavior(allocatedPod, &allocatedContainer)
|
||||
if aSwapBehavior != kubetypes.NoSwap || bSwapBehavior != kubetypes.NoSwap {
|
||||
return true, "In-place resize of containers with swap is not supported."
|
||||
}
|
||||
}
|
||||
}
|
||||
return false, ""
|
||||
}
|
||||
|
||||
func (h *podResizesAdmitHandler) guaranteedPodResourceResizeRequired(pod *v1.Pod, resourceName v1.ResourceName) bool {
|
||||
for container, containerType := range podutil.ContainerIter(&pod.Spec, podutil.InitContainers|podutil.Containers) {
|
||||
if !IsResizableContainer(container, containerType) {
|
||||
continue
|
||||
}
|
||||
requestedResources := container.Resources
|
||||
allocatedresources, _ := h.allocationManager.GetContainerResourceAllocation(pod.UID, container.Name)
|
||||
// For Guaranteed pods, requests must equal limits, so checking requests is sufficient.
|
||||
if !requestedResources.Requests[resourceName].Equal(allocatedresources.Requests[resourceName]) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
@ -689,8 +689,6 @@ func NewMainKubelet(ctx context.Context,
|
|||
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet, kubeDeps.PodStartupLatencyTracker)
|
||||
klet.allocationManager = allocation.NewManager(
|
||||
klet.getRootDir(),
|
||||
klet.containerManager.GetNodeConfig(),
|
||||
klet.containerManager.GetNodeAllocatableAbsolute(),
|
||||
klet.statusManager,
|
||||
func(pod *v1.Pod) { klet.HandlePodSyncs(ctx, []*v1.Pod{pod}) },
|
||||
klet.GetActivePods,
|
||||
|
|
@ -700,7 +698,6 @@ func NewMainKubelet(ctx context.Context,
|
|||
)
|
||||
|
||||
klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(ctx, klet, kubeCfg.VolumeStatsAggPeriod.Duration, kubeDeps.Recorder)
|
||||
|
||||
klet.runtimeService = kubeDeps.RemoteRuntimeService
|
||||
|
||||
if kubeDeps.KubeClient != nil {
|
||||
|
|
@ -807,6 +804,7 @@ func NewMainKubelet(ctx context.Context,
|
|||
klet.streamingRuntime = runtime
|
||||
klet.runner = runtime
|
||||
klet.allocationManager.SetContainerRuntime(runtime)
|
||||
resizeAdmitHandler := allocation.NewPodResizesAdmitHandler(klet.containerManager, runtime, klet.allocationManager, logger)
|
||||
|
||||
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime, runtimeCacheRefreshPeriod)
|
||||
if err != nil {
|
||||
|
|
@ -1023,6 +1021,8 @@ func NewMainKubelet(ctx context.Context,
|
|||
killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock, kubeCfg.LocalStorageCapacityIsolation)
|
||||
|
||||
klet.evictionManager = evictionManager
|
||||
handlers := []lifecycle.PodAdmitHandler{}
|
||||
handlers = append(handlers, evictionAdmitHandler)
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.NodeDeclaredFeatures) {
|
||||
v, err := versionutil.Parse(version.Get().String())
|
||||
|
|
@ -1039,9 +1039,6 @@ func NewMainKubelet(ctx context.Context,
|
|||
klet.nodeDeclaredFeaturesSet = ndf.NewFeatureSet(klet.nodeDeclaredFeatures...)
|
||||
}
|
||||
|
||||
handlers := []lifecycle.PodAdmitHandler{}
|
||||
handlers = append(handlers, evictionAdmitHandler)
|
||||
|
||||
// Safe, allowed sysctls can always be used as unsafe sysctls in the spec.
|
||||
// Hence, we concatenate those two lists.
|
||||
safeAndUnsafeSysctls := append(sysctl.SafeSysctlAllowlist(ctx), allowedUnsafeSysctls...)
|
||||
|
|
@ -1109,7 +1106,8 @@ func NewMainKubelet(ctx context.Context,
|
|||
})
|
||||
klet.shutdownManager = shutdownManager
|
||||
handlers = append(handlers, shutdownManager)
|
||||
klet.allocationManager.AddPodAdmitHandlers(handlers)
|
||||
|
||||
klet.allocationManager.AddPodAdmitHandlers(append([]lifecycle.PodAdmitHandler{resizeAdmitHandler}, handlers...))
|
||||
|
||||
var usernsIDsPerPod *int64
|
||||
if kubeCfg.UserNamespaces != nil {
|
||||
|
|
|
|||
|
|
@ -332,8 +332,6 @@ func newTestKubeletWithImageList(
|
|||
}
|
||||
|
||||
kubelet.allocationManager = allocation.NewInMemoryManager(
|
||||
kubelet.containerManager.GetNodeConfig(),
|
||||
kubelet.containerManager.GetNodeAllocatableAbsolute(),
|
||||
kubelet.statusManager,
|
||||
func(pod *v1.Pod) { kubelet.HandlePodSyncs(tCtx, []*v1.Pod{pod}) },
|
||||
kubelet.GetActivePods,
|
||||
|
|
@ -398,6 +396,7 @@ func newTestKubeletWithImageList(
|
|||
kubelet.evictionManager = evictionManager
|
||||
handlers := []lifecycle.PodAdmitHandler{}
|
||||
handlers = append(handlers, evictionAdmitHandler)
|
||||
handlers = append(handlers, allocation.NewPodResizesAdmitHandler(kubelet.containerManager, fakeRuntime, kubelet.allocationManager, logger))
|
||||
|
||||
// setup shutdown manager
|
||||
shutdownManager := nodeshutdown.NewManager(&nodeshutdown.Config{
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ limitations under the License.
|
|||
|
||||
package lifecycle
|
||||
|
||||
import "k8s.io/api/core/v1"
|
||||
import v1 "k8s.io/api/core/v1"
|
||||
|
||||
// PodAdmitAttributes is the context for a pod admission decision.
|
||||
// The member fields of this struct should never be mutated.
|
||||
|
|
@ -25,8 +25,20 @@ type PodAdmitAttributes struct {
|
|||
Pod *v1.Pod
|
||||
// all pods bound to the kubelet excluding the pod being evaluated
|
||||
OtherPods []*v1.Pod
|
||||
// the operation being performed; either "add" or "resize"
|
||||
Operation Operation
|
||||
}
|
||||
|
||||
// Operation represents the type of operation being performed on a pod.
|
||||
type Operation string
|
||||
|
||||
const (
|
||||
// AddOperation indicates that the pod is being added to the kubelet.
|
||||
AddOperation Operation = "add"
|
||||
// ResizeOperation indicates that the pod is being resized.
|
||||
ResizeOperation Operation = "resize"
|
||||
)
|
||||
|
||||
// PodAdmitResult provides the result of a pod admission decision.
|
||||
type PodAdmitResult struct {
|
||||
// if true, the pod should be admitted.
|
||||
|
|
|
|||
|
|
@ -35,7 +35,6 @@ import (
|
|||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/kubernetes/pkg/kubelet/allocation"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
|
||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
|
|
@ -455,7 +454,7 @@ func createPodWorkers() (*podWorkers, *containertest.FakeRuntime, map[types.UID]
|
|||
time.Second,
|
||||
time.Millisecond,
|
||||
fakeCache,
|
||||
allocation.NewInMemoryManager(cm.NodeConfig{}, nil, nil, nil, nil, nil, nil, nil),
|
||||
allocation.NewInMemoryManager(nil, nil, nil, nil, nil, nil),
|
||||
)
|
||||
workers := w.(*podWorkers)
|
||||
workers.clock = clock
|
||||
|
|
@ -2135,7 +2134,7 @@ func TestFakePodWorkers(t *testing.T) {
|
|||
time.Second,
|
||||
time.Second,
|
||||
fakeCache,
|
||||
allocation.NewInMemoryManager(cm.NodeConfig{}, nil, nil, nil, nil, nil, nil, nil),
|
||||
allocation.NewInMemoryManager(nil, nil, nil, nil, nil, nil),
|
||||
)
|
||||
fakePodWorkers := &fakePodWorkers{
|
||||
syncPodFn: kubeletForFakeWorkers.SyncPod,
|
||||
|
|
|
|||
|
|
@ -281,6 +281,11 @@ func (m *manager) SetPodResizePendingCondition(podUID types.UID, reason, message
|
|||
|
||||
previousCondition := m.podResizeConditions[podUID].PodResizePending
|
||||
|
||||
if reason != v1.PodReasonInfeasible {
|
||||
// For all other "pending" reasons, we set the reason to "Deferred".
|
||||
reason = v1.PodReasonDeferred
|
||||
}
|
||||
|
||||
m.podResizeConditions[podUID] = podResizeConditions{
|
||||
PodResizePending: updatedPodResizeCondition(v1.PodResizePending, m.podResizeConditions[podUID].PodResizePending, reason, message, observedGeneration),
|
||||
PodResizeInProgress: m.podResizeConditions[podUID].PodResizeInProgress,
|
||||
|
|
|
|||
|
|
@ -2161,7 +2161,7 @@ func TestPodResizeConditions(t *testing.T) {
|
|||
{
|
||||
name: "set pod resize pending condition to deferred with message",
|
||||
updateFunc: func(podUID types.UID) bool {
|
||||
return m.SetPodResizePendingCondition(podUID, v1.PodReasonDeferred, "some-message", 1)
|
||||
return m.SetPodResizePendingCondition(podUID, "some-reason", "some-message", 1)
|
||||
},
|
||||
expectedUpdateFuncReturnVal: true,
|
||||
expected: []*v1.PodCondition{
|
||||
|
|
|
|||
Loading…
Reference in a new issue