Kubelet changes to support IPPR + PodLevelResources

Signed-off-by: ndixita <ndixita@google.com>
This commit is contained in:
ndixita 2025-10-22 13:53:47 +00:00
parent 7645eb70e9
commit 11ff4efcde
No known key found for this signature in database
GPG key ID: 83041EA80D61856F
19 changed files with 778 additions and 22 deletions

View file

@ -440,6 +440,15 @@ func updatePodFromAllocation(pod *v1.Pod, allocated state.PodResourceInfo) (*v1.
}
updated := false
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodLevelResourcesVerticalScaling) {
pAlloc := allocated.PodLevelResources
if !apiequality.Semantic.DeepEqual(pod.Spec.Resources, pAlloc) {
// Allocation differs from pod spec, retrieve the allocation
pod = pod.DeepCopy()
pod.Spec.Resources = pAlloc
updated = true
}
}
containerAlloc := func(c v1.Container) (v1.ResourceRequirements, bool) {
if cAlloc, ok := allocated.ContainerResources[c.Name]; ok {
if !apiequality.Semantic.DeepEqual(c.Resources, cAlloc) {
@ -477,6 +486,9 @@ func (m *manager) SetAllocatedResources(pod *v1.Pod) error {
func allocationFromPod(pod *v1.Pod) state.PodResourceInfo {
var podAlloc state.PodResourceInfo
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodLevelResourcesVerticalScaling) && pod.Spec.Resources != nil {
podAlloc.PodLevelResources = pod.Spec.Resources.DeepCopy()
}
podAlloc.ContainerResources = make(map[string]v1.ResourceRequirements)
for _, container := range pod.Spec.Containers {
alloc := *container.Resources.DeepCopy()
@ -559,6 +571,15 @@ func (m *manager) handlePodResourcesResize(pod *v1.Pod) (bool, error) {
return false, nil
}
if !apiequality.Semantic.DeepEqual(pod.Spec.Resources, allocatedPod.Spec.Resources) {
if resizable, msg, reason := IsInPlacePodLevelResourcesVerticalScalingAllowed(pod); !resizable {
// If there is a pending pod-level resources resize but the resize is not allowed, always use the allocated resources.
metrics.PodInfeasibleResizes.WithLabelValues(reason).Inc()
m.statusManager.SetPodResizePendingCondition(pod.UID, v1.PodReasonInfeasible, msg, pod.Generation)
return false, nil
}
}
// Desired resources != allocated resources. Can we update the allocation to the desired resources?
fit, reason, message := m.canResizePod(m.getAllocatedPods(m.getActivePods()), pod)
if fit {

View file

@ -130,12 +130,25 @@ func TestUpdatePodFromAllocation(t *testing.T) {
resizedPod.Spec.Containers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(200, resource.DecimalSI)
resizedPod.Spec.InitContainers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(300, resource.DecimalSI)
resizedPodWithPodLevelResources := resizedPod.DeepCopy()
resizedPodWithPodLevelResources.Spec.Resources = &v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(1500, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(1700, resource.DecimalSI),
},
Limits: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(2500, resource.DecimalSI),
},
}
tests := []struct {
name string
pod *v1.Pod
allocated state.PodResourceInfo
expectPod *v1.Pod
expectUpdate bool
name string
pod *v1.Pod
allocated state.PodResourceInfo
expectPod *v1.Pod
expectUpdate bool
inPlacePodLevelResizeEnabled bool
}{{
name: "steady state",
pod: pod,
@ -175,10 +188,28 @@ func TestUpdatePodFromAllocation(t *testing.T) {
},
expectUpdate: true,
expectPod: resizedPod,
}, {
name: "with resized pod-level resource allocation",
pod: pod,
allocated: state.PodResourceInfo{
ContainerResources: map[string]v1.ResourceRequirements{
"c1": *resizedPod.Spec.Containers[0].Resources.DeepCopy(),
"c2": *resizedPod.Spec.Containers[1].Resources.DeepCopy(),
"c1-restartable-init": *resizedPod.Spec.InitContainers[0].Resources.DeepCopy(),
"c1-init": *resizedPod.Spec.InitContainers[1].Resources.DeepCopy(),
},
PodLevelResources: resizedPodWithPodLevelResources.Spec.Resources.DeepCopy(),
},
expectUpdate: true,
expectPod: resizedPodWithPodLevelResources,
inPlacePodLevelResizeEnabled: true,
}}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.inPlacePodLevelResizeEnabled {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodLevelResourcesVerticalScaling, true)
}
pod := test.pod.DeepCopy()
allocatedPod, updated := updatePodFromAllocation(pod, test.allocated)

View file

@ -35,3 +35,10 @@ func IsInPlacePodVerticalScalingAllowed(pod *v1.Pod) (allowed bool, msg, reason
}
return true, "", ""
}
func IsInPlacePodLevelResourcesVerticalScalingAllowed(pod *v1.Pod) (allowed bool, msg, reason string) {
if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodLevelResourcesVerticalScaling) {
return false, "InPlacePodLevelResourcesVerticalScaling is disabled", "plr_feature_gate_off"
}
return true, "", ""
}

View file

@ -24,3 +24,7 @@ import v1 "k8s.io/api/core/v1"
func IsInPlacePodVerticalScalingAllowed(_ *v1.Pod) (allowed bool, msg, reason string) {
return false, "In-place pod resize is not supported on this node", "unsupported_platform"
}
func IsInPlacePodLevelResourcesVerticalScalingAllowed(pod *v1.Pod) (allowed bool, msg, reason string) {
return false, "In-place pod-level resources resize is not supported on this node", "unsupported_platform"
}

View file

@ -24,3 +24,7 @@ import v1 "k8s.io/api/core/v1"
func IsInPlacePodVerticalScalingAllowed(_ *v1.Pod) (allowed bool, msg, reason string) {
return false, "In-place pod resize is not supported on Windows", "windows"
}
func IsInPlacePodLevelResourcesVerticalScalingAllowed(pod *v1.Pod) (allowed bool, msg, reason string) {
return false, "In-place pod-level resources resize is not supported on Windows", "windows"
}

View file

@ -26,6 +26,7 @@ import (
type PodResourceInfo struct {
// ContainerResources maps container names to their respective ResourceRequirements.
ContainerResources map[string]v1.ResourceRequirements
PodLevelResources *v1.ResourceRequirements
}
// PodResourceInfoMap maps pod UIDs to their corresponding PodResourceInfo,
@ -38,6 +39,7 @@ func (pr PodResourceInfoMap) Clone() PodResourceInfoMap {
for podUID, podInfo := range pr {
prCopy[podUID] = PodResourceInfo{
ContainerResources: make(map[string]v1.ResourceRequirements),
PodLevelResources: podInfo.PodLevelResources.DeepCopy(),
}
for containerName, containerInfo := range podInfo.ContainerResources {
prCopy[podUID].ContainerResources[containerName] = *containerInfo.DeepCopy()
@ -51,11 +53,13 @@ type Reader interface {
GetContainerResources(podUID types.UID, containerName string) (v1.ResourceRequirements, bool)
GetPodResourceInfoMap() PodResourceInfoMap
GetPodResourceInfo(podUID types.UID) (PodResourceInfo, bool)
GetPodLevelResources(podUID types.UID) (*v1.ResourceRequirements, bool)
}
type writer interface {
SetContainerResources(podUID types.UID, containerName string, resources v1.ResourceRequirements) error
SetPodResourceInfo(podUID types.UID, resourceInfo PodResourceInfo) error
SetPodLevelResources(podUID types.UID, alloc *v1.ResourceRequirements) error
RemovePod(podUID types.UID) error
// RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods.
RemoveOrphanedPods(remainingPods sets.Set[types.UID])

View file

@ -112,6 +112,13 @@ func (sc *stateCheckpoint) GetContainerResources(podUID types.UID, containerName
return sc.cache.GetContainerResources(podUID, containerName)
}
// GetPodLevelResources returns current resources information at pod-level
func (sc *stateCheckpoint) GetPodLevelResources(podUID types.UID) (*v1.ResourceRequirements, bool) {
sc.mux.RLock()
defer sc.mux.RUnlock()
return sc.cache.GetPodLevelResources(podUID)
}
// GetPodResourceInfoMap returns current pod resource information map
func (sc *stateCheckpoint) GetPodResourceInfoMap() PodResourceInfoMap {
sc.mux.RLock()
@ -137,6 +144,17 @@ func (sc *stateCheckpoint) SetContainerResources(podUID types.UID, containerName
return sc.storeState()
}
// SetPodLevelResources sets resources information for a pod's resources at pod-level.
func (sc *stateCheckpoint) SetPodLevelResources(podUID types.UID, resInfo *v1.ResourceRequirements) error {
sc.mux.Lock()
defer sc.mux.Unlock()
err := sc.cache.SetPodLevelResources(podUID, resInfo)
if err != nil {
return err
}
return sc.storeState()
}
// SetPodResourceInfo sets pod resource information
func (sc *stateCheckpoint) SetPodResourceInfo(podUID types.UID, resourceInfo PodResourceInfo) error {
sc.mux.Lock()
@ -175,6 +193,10 @@ func (sc *noopStateCheckpoint) GetContainerResources(_ types.UID, _ string) (v1.
return v1.ResourceRequirements{}, false
}
func (sc *noopStateCheckpoint) GetPodLevelResources(_ types.UID) (*v1.ResourceRequirements, bool) {
return nil, false
}
func (sc *noopStateCheckpoint) GetPodResourceInfoMap() PodResourceInfoMap {
return nil
}
@ -187,6 +209,10 @@ func (sc *noopStateCheckpoint) SetContainerResources(_ types.UID, _ string, _ v1
return nil
}
func (sc *noopStateCheckpoint) SetPodLevelResources(_ types.UID, _ *v1.ResourceRequirements) error {
return nil
}
func (sc *noopStateCheckpoint) SetPodResourceInfo(_ types.UID, _ PodResourceInfo) error {
return nil
}

View file

@ -101,6 +101,12 @@ func Test_stateCheckpoint_storeState(t *testing.T) {
},
},
},
PodLevelResources: &v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse(fmt.Sprintf("%s%s", fact, suf)),
v1.ResourceMemory: resource.MustParse(fmt.Sprintf("%s%s", fact, suf)),
},
},
},
},
},
@ -142,6 +148,7 @@ func Test_stateCheckpoint_storeState(t *testing.T) {
ContainerResources: map[string]v1.ResourceRequirements{
"container1": {Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1")}},
},
PodLevelResources: &v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1")}},
}))
require.FileExists(t, checkpointPath, "checkpoint should be re-written")
})

View file

@ -59,6 +59,19 @@ func (s *stateMemory) GetContainerResources(podUID types.UID, containerName stri
return *resources.DeepCopy(), ok
}
// GetPodLevelResources returns current resources information at pod-level
func (s *stateMemory) GetPodLevelResources(podUID types.UID) (*v1.ResourceRequirements, bool) {
s.RLock()
defer s.RUnlock()
pr, ok := s.podResources[podUID]
if !ok {
return nil, ok
}
return pr.PodLevelResources.DeepCopy(), ok
}
func (s *stateMemory) GetPodResourceInfoMap() PodResourceInfoMap {
s.RLock()
defer s.RUnlock()
@ -77,17 +90,41 @@ func (s *stateMemory) SetContainerResources(podUID types.UID, containerName stri
s.Lock()
defer s.Unlock()
if _, ok := s.podResources[podUID]; !ok {
s.podResources[podUID] = PodResourceInfo{
podInfo, ok := s.podResources[podUID]
if !ok {
podInfo = PodResourceInfo{
ContainerResources: make(map[string]v1.ResourceRequirements),
}
}
s.podResources[podUID].ContainerResources[containerName] = resources
if podInfo.ContainerResources == nil {
podInfo.ContainerResources = make(map[string]v1.ResourceRequirements)
}
podInfo.ContainerResources[containerName] = resources
s.podResources[podUID] = podInfo
klog.V(3).InfoS("Updated container resource information", "podUID", podUID, "containerName", containerName, "resources", resources)
return nil
}
func (s *stateMemory) SetPodLevelResources(podUID types.UID, resources *v1.ResourceRequirements) error {
s.Lock()
defer s.Unlock()
podInfo, ok := s.podResources[podUID]
if !ok {
podInfo.PodLevelResources = &v1.ResourceRequirements{}
}
podInfo.PodLevelResources = resources
s.podResources[podUID] = podInfo
klog.V(3).InfoS("Updated pod-level resource info", "podUID", podUID, "resources", resources)
return nil
}
func (s *stateMemory) SetPodResourceInfo(podUID types.UID, resourceInfo PodResourceInfo) error {
s.Lock()
defer s.Unlock()

View file

@ -19,17 +19,18 @@ package cm
import (
"bufio"
"fmt"
"math"
"os"
"path/filepath"
"strconv"
libcontainercgroups "github.com/opencontainers/cgroups"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
resourcehelper "k8s.io/component-helpers/resource"
"k8s.io/klog/v2"
"k8s.io/component-helpers/resource"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
kubefeatures "k8s.io/kubernetes/pkg/features"
@ -125,7 +126,7 @@ func HugePageLimits(resourceList v1.ResourceList) map[int64]int64 {
func ResourceConfigForPod(allocatedPod *v1.Pod, enforceCPULimits bool, cpuPeriod uint64, enforceMemoryQoS bool) *ResourceConfig {
podLevelResourcesEnabled := utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodLevelResources)
// sum requests and limits.
reqs := resource.PodRequests(allocatedPod, resource.PodResourcesOptions{
reqs := resourcehelper.PodRequests(allocatedPod, resourcehelper.PodResourcesOptions{
// SkipPodLevelResources is set to false when PodLevelResources feature is enabled.
SkipPodLevelResources: !podLevelResourcesEnabled,
UseStatusResources: false,
@ -134,10 +135,10 @@ func ResourceConfigForPod(allocatedPod *v1.Pod, enforceCPULimits bool, cpuPeriod
memoryLimitsDeclared := true
cpuLimitsDeclared := true
limits := resource.PodLimits(allocatedPod, resource.PodResourcesOptions{
limits := resourcehelper.PodLimits(allocatedPod, resourcehelper.PodResourcesOptions{
// SkipPodLevelResources is set to false when PodLevelResources feature is enabled.
SkipPodLevelResources: !podLevelResourcesEnabled,
ContainerFn: func(res v1.ResourceList, containerType resource.ContainerType) {
ContainerFn: func(res v1.ResourceList, containerType resourcehelper.ContainerType) {
if res.Cpu().IsZero() {
cpuLimitsDeclared = false
}
@ -147,7 +148,7 @@ func ResourceConfigForPod(allocatedPod *v1.Pod, enforceCPULimits bool, cpuPeriod
},
})
if podLevelResourcesEnabled && resource.IsPodLevelResourcesSet(allocatedPod) {
if podLevelResourcesEnabled && resourcehelper.IsPodLevelResourcesSet(allocatedPod) {
if !allocatedPod.Spec.Resources.Limits.Cpu().IsZero() {
cpuLimitsDeclared = true
}
@ -342,3 +343,57 @@ func GetKubeletContainer(logger klog.Logger, kubeletCgroups string) (string, err
}
return kubeletCgroups, nil
}
func CPURequestsFromConfig(podConfig *ResourceConfig) *resource.Quantity {
var cpuRequest *resource.Quantity
if podConfig != nil && *podConfig.CPUShares > 0 {
milliCPU := sharesToMilliCPU(int64(*podConfig.CPUShares))
if milliCPU > 0 {
cpuRequest = resource.NewMilliQuantity(milliCPU, resource.DecimalSI)
}
}
return cpuRequest
}
func CPULimitsFromConfig(podConfig *ResourceConfig) *resource.Quantity {
var cpuLimit *resource.Quantity
if podConfig != nil && *podConfig.CPUPeriod > 0 {
milliCPU := quotaToMilliCPU(*podConfig.CPUQuota, int64(*podConfig.CPUPeriod))
if milliCPU > 0 {
cpuLimit = resource.NewMilliQuantity(milliCPU, resource.DecimalSI)
}
}
return cpuLimit
}
func MemoryLimitsFromConfig(podConfig *ResourceConfig) *resource.Quantity {
var memLimit *resource.Quantity
if podConfig != nil && *podConfig.Memory > int64(0) {
memLimit = resource.NewQuantity(*podConfig.Memory, resource.BinarySI)
}
return memLimit
}
// sharesToMilliCPU converts CpuShares (cpu.shares) to milli-CPU value
// TODO - dedup sharesToMilliCPU with sharesToMilliCPU in pkg/kubelet/kuberuntime/helpers_linux.go
func sharesToMilliCPU(shares int64) int64 {
milliCPU := int64(0)
if shares >= int64(MinShares) {
milliCPU = int64(math.Ceil(float64(shares*MilliCPUToCPU) / float64(SharesPerCPU)))
}
return milliCPU
}
// quotaToMilliCPU converts cpu.cfs_quota_us and cpu.cfs_period_us to milli-CPU
// value
// TODO - dedup quotaToMilliCPU with sharesToMilliCPU in pkg/kubelet/kuberuntime/helpers_linux.go
func quotaToMilliCPU(quota int64, period int64) int64 {
if quota == -1 {
return int64(0)
}
return (quota * MilliCPUToCPU) / period
}

View file

@ -21,6 +21,7 @@ package cm
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
)
@ -75,3 +76,15 @@ func NodeAllocatableRoot(cgroupRoot string, cgroupsPerQOS bool, cgroupDriver str
func GetKubeletContainer(logger klog.Logger, kubeletCgroups string) (string, error) {
return "", nil
}
func CPURequestsFromConfig(podConfig *ResourceConfig) *resource.Quantity {
return nil
}
func CPULimitsFromConfig(podConfig *ResourceConfig) *resource.Quantity {
return nil
}
func MemoryLimitsFromConfig(podConfig *ResourceConfig) *resource.Quantity {
return nil
}

View file

@ -100,6 +100,7 @@ func (m *podContainerManagerImpl) EnsureExists(logger klog.Logger, pod *v1.Pod)
if err := m.cgroupManager.Create(logger, containerConfig); err != nil {
return fmt.Errorf("failed to create container for %v : %v", podContainerName, err)
}
}
return nil
}

View file

@ -145,6 +145,8 @@ type Runtime interface {
// IsPodResizeInProgress checks whether the given pod is in the process of resizing
// (allocated resources != actuated resources).
IsPodResizeInProgress(allocatedPod *v1.Pod, podStatus *PodStatus) bool
// UpdateActuatedPodLevelResources updates pod-level resources in actuatedState
UpdateActuatedPodLevelResources(actuatedPod *v1.Pod) error
}
// StreamingRuntime is the interface implemented by runtimes that handle the serving of the

View file

@ -573,3 +573,7 @@ func (f *FakeRuntime) GetContainerSwapBehavior(pod *v1.Pod, container *v1.Contai
func (f *FakeRuntime) IsPodResizeInProgress(allocatedPod *v1.Pod, podStatus *kubecontainer.PodStatus) bool {
return f.PodResizeInProgress
}
func (f *FakeRuntime) UpdateActuatedPodLevelResources(allocatedPod *v1.Pod) error {
return nil
}

View file

@ -1611,6 +1611,57 @@ func (_c *MockRuntime_Type_Call) RunAndReturn(run func() string) *MockRuntime_Ty
return _c
}
// UpdateActuatedPodLevelResources provides a mock function for the type MockRuntime
func (_mock *MockRuntime) UpdateActuatedPodLevelResources(actuatedPod *v10.Pod) error {
ret := _mock.Called(actuatedPod)
if len(ret) == 0 {
panic("no return value specified for UpdateActuatedPodLevelResources")
}
var r0 error
if returnFunc, ok := ret.Get(0).(func(*v10.Pod) error); ok {
r0 = returnFunc(actuatedPod)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockRuntime_UpdateActuatedPodLevelResources_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateActuatedPodLevelResources'
type MockRuntime_UpdateActuatedPodLevelResources_Call struct {
*mock.Call
}
// UpdateActuatedPodLevelResources is a helper method to define mock.On call
// - actuatedPod *v10.Pod
func (_e *MockRuntime_Expecter) UpdateActuatedPodLevelResources(actuatedPod interface{}) *MockRuntime_UpdateActuatedPodLevelResources_Call {
return &MockRuntime_UpdateActuatedPodLevelResources_Call{Call: _e.mock.On("UpdateActuatedPodLevelResources", actuatedPod)}
}
func (_c *MockRuntime_UpdateActuatedPodLevelResources_Call) Run(run func(actuatedPod *v10.Pod)) *MockRuntime_UpdateActuatedPodLevelResources_Call {
_c.Call.Run(func(args mock.Arguments) {
var arg0 *v10.Pod
if args[0] != nil {
arg0 = args[0].(*v10.Pod)
}
run(
arg0,
)
})
return _c
}
func (_c *MockRuntime_UpdateActuatedPodLevelResources_Call) Return(err error) *MockRuntime_UpdateActuatedPodLevelResources_Call {
_c.Call.Return(err)
return _c
}
func (_c *MockRuntime_UpdateActuatedPodLevelResources_Call) RunAndReturn(run func(actuatedPod *v10.Pod) error) *MockRuntime_UpdateActuatedPodLevelResources_Call {
_c.Call.Return(run)
return _c
}
// UpdatePodCIDR provides a mock function for the type MockRuntime
func (_mock *MockRuntime) UpdatePodCIDR(ctx context.Context, podCIDR string) error {
ret := _mock.Called(ctx, podCIDR)

View file

@ -46,6 +46,7 @@ import (
ndffeatures "k8s.io/component-helpers/nodedeclaredfeatures/features"
"k8s.io/mount-utils"
apiequality "k8s.io/apimachinery/pkg/api/equality"
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
@ -2097,6 +2098,10 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)
return false, fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
}
if err = kl.containerRuntime.UpdateActuatedPodLevelResources(pod); err != nil {
return false, fmt.Errorf("failed to update the state of pod-level resources for the pod %v : %w", pod.UID, err)
}
}
}
}
@ -2799,7 +2804,7 @@ func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
}
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
if recordContainerResizeOperations(oldPod, pod) {
if recordResizeOperations(oldPod, pod) {
_, updatedFromAllocation := kl.allocationManager.UpdatePodFromAllocation(pod)
if updatedFromAllocation {
kl.allocationManager.PushPendingResize(pod.UID)
@ -2844,14 +2849,37 @@ func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
}
}
// recordContainerResizeOperations records if any of the pod's containers needs to be resized, and returns
// recordResizeOperaations records if any of the pod level resources or
// containers need to be resized, and returns
// true if so
func recordContainerResizeOperations(oldPod, newPod *v1.Pod) bool {
hasResize := false
func recordResizeOperations(oldPod, newPod *v1.Pod) bool {
if oldPod == nil {
// This should never happen.
return true
}
hasResize := recordContainerResizeOperations(oldPod, newPod) || recordPodLevelResourceResizeOperations(oldPod, newPod)
return hasResize
}
// recordPodLevelResourceResizeOperations records if any of the pod level resources need to be resized, and returns
// true if so
func recordPodLevelResourceResizeOperations(oldPod, newPod *v1.Pod) bool {
if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
return false
}
// TODO(ndixita): add metrics for pod-level resources resize.
return !apiequality.Semantic.DeepEqual(oldPod.Spec.Resources, newPod.Spec.Resources)
}
// recordContainerResizeOperations records if any of the pod's containers needs to be resized, and returns
// true if so
func recordContainerResizeOperations(oldPod, newPod *v1.Pod) bool {
hasResize := false
for oldContainer, containerType := range podutil.ContainerIter(&oldPod.Spec, podutil.InitContainers|podutil.Containers) {
if !allocation.IsResizableContainer(oldContainer, containerType) {
continue
@ -2970,8 +2998,9 @@ func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
// resources changing.
if hasPendingResizes && !retryPendingResizes && oldPod != nil {
opts := resourcehelper.PodResourcesOptions{
UseStatusResources: true,
SkipPodLevelResources: !utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources),
UseStatusResources: true,
SkipPodLevelResources: !utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources),
InPlacePodLevelResourcesVerticalScalingEnabled: utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodLevelResourcesVerticalScaling),
}
// Ignore desired resources when aggregating the resources.

View file

@ -44,6 +44,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
utilvalidation "k8s.io/apimachinery/pkg/util/validation"
utilfeature "k8s.io/apiserver/pkg/util/feature"
resourcehelper "k8s.io/component-helpers/resource"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2"
"k8s.io/kubelet/pkg/cri/streaming/portforward"
@ -2123,9 +2124,107 @@ func (kl *Kubelet) convertStatusToAPIStatus(pod *v1.Pod, podStatus *kubecontaine
podRestarting,
)
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodLevelResourcesVerticalScaling) {
apiPodStatus.Resources = kl.convertToAPIPodLevelResourcesStatus(pod, oldPodStatus)
opts := resourcehelper.PodResourcesOptions{
SkipPodLevelResources: !utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources),
}
apiPodStatus.AllocatedResources = resourcehelper.PodRequests(pod, opts)
}
return &apiPodStatus
}
func (kl *Kubelet) convertToAPIPodLevelResourcesStatus(allocatedPod *v1.Pod, oldPodStatus v1.PodStatus) *v1.ResourceRequirements {
if allocatedPod.Status.Phase != v1.PodRunning {
return allocatedPod.Spec.Resources.DeepCopy()
}
pcm := kl.containerManager.NewPodContainerManager()
memoryConfig, err := pcm.GetPodCgroupConfig(allocatedPod, v1.ResourceMemory)
if err != nil {
klog.ErrorS(err, "failed to read memory cgroup config for the pod", "podName", allocatedPod.Name)
}
memoryLimit := cm.MemoryLimitsFromConfig(memoryConfig)
cpuConfig, err := pcm.GetPodCgroupConfig(allocatedPod, v1.ResourceCPU)
if err != nil {
klog.ErrorS(err, "failed to read memory cgroup limits for the pod", "podName", allocatedPod.Name)
}
cpuRequest := cm.CPURequestsFromConfig(cpuConfig)
cpuLimit := cm.CPULimitsFromConfig(cpuConfig)
preserveOldResourcesValue := func(rName v1.ResourceName, oldStatusResource, resource v1.ResourceList) {
if allocatedPod.Status.Phase == v1.PodRunning && oldPodStatus.Phase == v1.PodRunning && oldPodStatus.Resources != nil {
if r, exists := oldStatusResource[rName]; exists {
resource[rName] = r.DeepCopy()
}
}
}
resources := allocatedPod.Spec.Resources.DeepCopy()
if oldPodStatus.Resources == nil {
oldPodStatus.Resources = &v1.ResourceRequirements{}
}
if resources == nil {
resources = &v1.ResourceRequirements{}
}
if resources.Requests == nil {
resources.Requests = make(v1.ResourceList)
}
if resources.Limits == nil {
resources.Limits = make(v1.ResourceList)
}
if cpuRequest != nil {
// If both the allocated & actual resources are at
// or below MinShares, preserve the allocated value in the API to avoid
// confusion and simplify comparisons.
if cpuRequest.MilliValue() > cm.MinShares || resources.Requests.Cpu().MilliValue() > cm.MinShares {
resources.Requests[v1.ResourceCPU] = cpuRequest.DeepCopy()
}
} else {
preserveOldResourcesValue(v1.ResourceCPU, oldPodStatus.Resources.Requests, resources.Requests)
}
if _, found := resources.Requests[v1.ResourceMemory]; !found {
opts := resourcehelper.PodResourcesOptions{
SkipPodLevelResources: !utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources),
}
aggregatedResources := resourcehelper.PodRequests(allocatedPod, opts)
resources.Requests[v1.ResourceMemory] = aggregatedResources[v1.ResourceMemory]
}
// TODO: Once we begin persisting memory Request from the PodSpec to cgroups,
// the code needs to persist that value if it is non-nil.
preserveOldResourcesValue(v1.ResourceMemory, oldPodStatus.Resources.Requests, resources.Requests)
if cpuLimit != nil {
// If both the allocated & actual resources are at
// or below the minimum effective limit, preserve the
// allocated value in the API to avoid confusion and simplify comparisons.
if cpuLimit.MilliValue() > cm.MinMilliCPULimit || resources.Limits.Cpu().MilliValue() > cm.MinMilliCPULimit {
resources.Limits[v1.ResourceCPU] = cpuLimit.DeepCopy()
}
} else {
preserveOldResourcesValue(v1.ResourceCPU, oldPodStatus.Resources.Limits, resources.Limits)
}
if memoryLimit != nil {
resources.Limits[v1.ResourceMemory] = memoryLimit.DeepCopy()
} else {
preserveOldResourcesValue(v1.ResourceMemory, oldPodStatus.Resources.Limits, resources.Limits)
}
return resources
}
// convertToAPIContainerStatuses converts the given internal container
// statuses into API container statuses.
func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecontainer.PodStatus, previousStatus []v1.ContainerStatus, containers []v1.Container, hasInitContainers, isInitContainer, podRestarting bool) []v1.ContainerStatus {

View file

@ -596,6 +596,14 @@ type podActions struct {
ContainersToReset []containerToRemoveInfo
}
// podLevelResources holds the set of resources applicable to the running pod
type podLevelResources struct {
memoryLimit int64
memoryRequest int64
cpuLimit int64
cpuRequest int64
}
func (p podActions) String() string {
return fmt.Sprintf("KillPod: %t, CreateSandbox: %t, UpdatePodResources: %t, Attempt: %d, InitContainersToStart: %v, ContainersToStart: %v, EphemeralContainersToStart: %v,ContainersToUpdate: %v, ContainersToKill: %v, ContainersToRemove: %v",
p.KillPod, p.CreateSandbox, p.UpdatePodResources, p.Attempt, p.InitContainersToStart, p.ContainersToStart, p.EphemeralContainersToStart, p.ContainersToUpdate, p.ContainersToKill, p.ContainersToReset)
@ -632,6 +640,19 @@ func containerResourcesFromRequirements(requirements *v1.ResourceRequirements) c
}
}
func podResourcesFromRequirements(requirements *v1.ResourceRequirements) podLevelResources {
if requirements == nil {
return podLevelResources{}
}
return podLevelResources{
memoryLimit: requirements.Limits.Memory().Value(),
memoryRequest: requirements.Requests.Memory().Value(),
cpuLimit: requirements.Limits.Cpu().MilliValue(),
cpuRequest: requirements.Requests.Cpu().MilliValue(),
}
}
// computePodResizeAction determines the actions required (if any) to resize the given container.
// Returns whether to keep (true) or restart (false) the container.
// TODO(vibansal): Make this function to be agnostic to whether it is dealing with a restartable init container or not (i.e. remove the argument `isRestartableInitContainer`).
@ -781,6 +802,57 @@ func (m *kubeGenericRuntimeManager) doPodResizeAction(ctx context.Context, pod *
return resizeResult
}
updateActuatedPodLevelResources := func(resourceName v1.ResourceName) error {
allocatedResources := pod.Spec.Resources
if allocatedResources == nil {
return nil
}
// allocated resources will never be nil
actuatedPodResources, found := m.actuatedState.GetPodLevelResources(pod.UID)
if !found || actuatedPodResources == nil {
logger.Error(nil, "Missing actuated resource record", "pod", klog.KObj(pod), "pod", pod.Name)
// Proceed with the zero-value actuated resources.
actuatedPodResources = &v1.ResourceRequirements{}
}
defaultResourceListIfNil := func(rl v1.ResourceList) v1.ResourceList {
if rl == nil {
return make(v1.ResourceList)
}
return rl
}
switch resourceName {
case v1.ResourceMemory:
if allocatedResources.Requests != nil {
actuatedPodResources.Requests = defaultResourceListIfNil(actuatedPodResources.Requests)
actuatedPodResources.Requests[v1.ResourceMemory] = allocatedResources.Requests[v1.ResourceMemory]
}
if allocatedResources.Limits != nil {
actuatedPodResources.Limits = defaultResourceListIfNil(actuatedPodResources.Limits)
actuatedPodResources.Limits[v1.ResourceMemory] = allocatedResources.Limits[v1.ResourceMemory]
}
case v1.ResourceCPU:
if allocatedResources.Requests != nil {
actuatedPodResources.Requests = defaultResourceListIfNil(actuatedPodResources.Requests)
actuatedPodResources.Requests[v1.ResourceCPU] = allocatedResources.Requests[v1.ResourceCPU]
}
if allocatedResources.Limits != nil {
actuatedPodResources.Limits = defaultResourceListIfNil(actuatedPodResources.Limits)
actuatedPodResources.Limits[v1.ResourceCPU] = allocatedResources.Limits[v1.ResourceCPU]
}
}
if err = m.actuatedState.SetPodLevelResources(pod.UID, actuatedPodResources); err != nil {
logger.Error(err, "SetPodLevelResources failed", "pod", pod.Name, "UID", pod.UID,
"pod", format.Pod(pod), "resourceName", resourceName)
return err
}
return nil
}
setPodCgroupConfig := func(logger klog.Logger, rName v1.ResourceName, setLimitValue bool) error {
var err error
resizedResources := &cm.ResourceConfig{}
@ -809,8 +881,16 @@ func (m *kubeGenericRuntimeManager) doPodResizeAction(ctx context.Context, pod *
logger.Error(err, "Failed to notify runtime for UpdatePodSandboxResources", "resource", rName, "pod", klog.KObj(pod))
// Don't propagate the error since the updatePodSandboxResources call is best-effort.
}
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodLevelResourcesVerticalScaling) {
if err = updateActuatedPodLevelResources(rName); err != nil {
logger.Error(err, "Failed to update pod-level actuated resources", "resource", rName, "pod", klog.KObj(pod))
}
}
return nil
}
// Memory and CPU are updated separately because memory resizes may be ordered differently than CPU resizes.
// If resize results in net pod resource increase, set pod cgroup config before resizing containers.
// If resize results in net pod resource decrease, set pod cgroup config after resizing containers.
@ -1229,6 +1309,13 @@ func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod *
changes.InitContainersToStart = nil
}
// If no container-level resource updates were found, check for pod-level resource changes.
// The 'UpdatePodResources' is set if EITHER container-level OR pod-level
// resources have been modified
if !changes.UpdatePodResources {
changes.UpdatePodResources = m.computePodLevelResourcesResizeAction(ctx, pod)
}
return changes
}
@ -1264,6 +1351,25 @@ func (m *kubeGenericRuntimeManager) getContainersToReset(containers []v1.Contain
return
}
func (m *kubeGenericRuntimeManager) computePodLevelResourcesResizeAction(ctx context.Context, pod *v1.Pod) bool {
if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodLevelResourcesVerticalScaling) {
return false
}
logger := klog.FromContext(ctx)
actuatedPodLevelResources, found := m.actuatedState.GetPodLevelResources(pod.UID)
if !found {
logger.Error(nil, "Missing actuated pod level resource record", "pod", klog.KObj(pod), "pod", pod.Name)
// Proceed with the zero-value actuated resources. For restart NotRequired, this may
// result in an extra call to UpdateContainerResources, but that call should be idempotent.
// For RestartContainer, this may trigger a container restart.
}
desiredPodLevelResources := podResourcesFromRequirements(pod.Spec.Resources)
currentPodLevelResources := podResourcesFromRequirements(actuatedPodLevelResources)
return currentPodLevelResources != desiredPodLevelResources
}
// SyncPod syncs the running pod into the desired pod by executing following steps:
//
// 1. Compute sandbox and container changes.
@ -1963,12 +2069,38 @@ func (m *kubeGenericRuntimeManager) ListPodSandboxMetrics(ctx context.Context) (
return m.runtimeService.ListPodSandboxMetrics(ctx)
}
// isPodResizeInProgress checks whether the actuated resizable resources differ from the allocated resources
// for any running containers. Specifically, the following differences are ignored:
func (m *kubeGenericRuntimeManager) UpdateActuatedPodLevelResources(actuatedPod *v1.Pod) error {
if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
return nil
}
if actuatedPod.Spec.Resources == nil {
return nil
}
if actuatedPod.Spec.Resources.Requests == nil && actuatedPod.Spec.Resources.Limits == nil {
return nil
}
return m.actuatedState.SetPodLevelResources(actuatedPod.UID, actuatedPod.Spec.Resources)
}
// isPodResizeInProgress checks whether the actuated resizable resources differ from
// the resources allocated for:
// * any running containers - Specifically, the following differences are ignored:
// - Non-resizable containers: non-restartable init containers, ephemeral containers
// - Non-resizable resources: only CPU & memory are resizable
// - Non-running containers: they will be sized correctly when (re)started
// * any running pod if InPlacePodLevelResourcesVerticalScaling is enabled.
func (m *kubeGenericRuntimeManager) IsPodResizeInProgress(allocatedPod *v1.Pod, podStatus *kubecontainer.PodStatus) bool {
if m.isContainerResourceResizeInProgress(allocatedPod, podStatus) {
return true
}
return m.isPodLevelResourcesResizeInProgress(allocatedPod, podStatus)
}
func (m *kubeGenericRuntimeManager) isContainerResourceResizeInProgress(allocatedPod *v1.Pod, podStatus *kubecontainer.PodStatus) bool {
return !podutil.VisitContainers(&allocatedPod.Spec, podutil.InitContainers|podutil.Containers,
func(allocatedContainer *v1.Container, containerType podutil.ContainerType) (shouldContinue bool) {
if !isResizableContainer(allocatedContainer, containerType) {
@ -1991,6 +2123,54 @@ func (m *kubeGenericRuntimeManager) IsPodResizeInProgress(allocatedPod *v1.Pod,
})
}
func (m *kubeGenericRuntimeManager) isPodLevelResourcesResizeInProgress(allocatedPod *v1.Pod, podStatus *kubecontainer.PodStatus) bool {
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodLevelResourcesVerticalScaling) {
return false
}
if allocatedPod.Spec.Resources == nil {
return false
}
actuatedPodResources, _ := m.actuatedState.GetPodLevelResources(allocatedPod.UID)
allocatedPodResources := allocatedPod.Spec.Resources
return cpuMemoryResourcesEqual(actuatedPodResources, allocatedPodResources)
}
func cpuMemoryResourcesEqual(actuatedPodResources, allocatedPodResources *v1.ResourceRequirements) bool {
// TODO(ndixita): refactor to a separate function for safe access of nil pointer
// values
if actuatedPodResources == nil {
actuatedPodResources = &v1.ResourceRequirements{}
}
if allocatedPodResources == nil {
allocatedPodResources = &v1.ResourceRequirements{}
}
if actuatedPodResources.Requests == nil {
actuatedPodResources.Requests = make(v1.ResourceList)
}
if actuatedPodResources.Limits == nil {
actuatedPodResources.Limits = make(v1.ResourceList)
}
if allocatedPodResources.Requests == nil {
allocatedPodResources.Requests = make(v1.ResourceList)
}
if allocatedPodResources.Limits == nil {
allocatedPodResources.Limits = make(v1.ResourceList)
}
return allocatedPodResources.Requests[v1.ResourceCPU].Equal(actuatedPodResources.Requests[v1.ResourceCPU]) &&
allocatedPodResources.Limits[v1.ResourceCPU].Equal(actuatedPodResources.Limits[v1.ResourceCPU]) &&
allocatedPodResources.Requests[v1.ResourceMemory].Equal(actuatedPodResources.Requests[v1.ResourceMemory]) &&
allocatedPodResources.Limits[v1.ResourceMemory].Equal(actuatedPodResources.Limits[v1.ResourceMemory])
}
func isResizableContainer(container *v1.Container, containerType podutil.ContainerType) bool {
switch containerType {
case podutil.InitContainers:

View file

@ -4489,3 +4489,184 @@ func TestDoBackOff(t *testing.T) {
})
}
}
func TestCmpActuatedAllocated(t *testing.T) {
tests := []struct {
name string
actuatedResources *v1.ResourceRequirements
allocatedResources *v1.ResourceRequirements
cpuMemoryequal bool
}{
{
name: "both nil",
actuatedResources: nil,
allocatedResources: nil,
cpuMemoryequal: true,
},
{
name: "actuated nil and allocated empty",
actuatedResources: nil,
allocatedResources: &v1.ResourceRequirements{},
cpuMemoryequal: true,
},
{
name: "actuated empty allocated nil",
actuatedResources: &v1.ResourceRequirements{},
allocatedResources: nil,
cpuMemoryequal: true,
},
{
name: "actuated empty and allocated empty",
actuatedResources: &v1.ResourceRequirements{
Requests: v1.ResourceList{},
Limits: v1.ResourceList{},
},
allocatedResources: &v1.ResourceRequirements{},
cpuMemoryequal: true,
},
{
name: "actuated empty and allocated requests limits empty",
actuatedResources: &v1.ResourceRequirements{},
allocatedResources: &v1.ResourceRequirements{
Requests: v1.ResourceList{},
Limits: v1.ResourceList{},
},
cpuMemoryequal: true,
},
{
name: "actuated empty and allocated request limits nil",
actuatedResources: &v1.ResourceRequirements{},
allocatedResources: &v1.ResourceRequirements{
Requests: nil,
Limits: nil,
},
cpuMemoryequal: true,
},
{
name: "actuated requests limits nil and allocated empty",
actuatedResources: &v1.ResourceRequirements{
Requests: nil,
Limits: nil,
},
allocatedResources: &v1.ResourceRequirements{},
cpuMemoryequal: true,
},
{
name: "actuated requests limits nil and allocated requests non-nil",
actuatedResources: &v1.ResourceRequirements{
Requests: nil,
Limits: nil,
},
allocatedResources: &v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("10m"),
},
Limits: nil,
},
},
{
name: "actuated requests limits nil and allocated requests non-nil limits nil",
actuatedResources: &v1.ResourceRequirements{
Requests: nil,
Limits: nil,
},
allocatedResources: &v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceMemory: resource.MustParse("10Mi"),
v1.ResourceCPU: resource.MustParse("10m"),
},
Limits: nil,
},
},
{
name: "actuated requests non-nil limits nil and allocated requests non-nil limits nil",
actuatedResources: &v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceMemory: resource.MustParse("10Mi"),
v1.ResourceCPU: resource.MustParse("10m"),
},
Limits: nil,
},
allocatedResources: &v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceMemory: resource.MustParse("10Mi"),
v1.ResourceCPU: resource.MustParse("10m"),
},
Limits: nil,
},
cpuMemoryequal: true,
},
{
name: "actuated requests non-nil limits nil and allocated requests nil limits non-nil",
actuatedResources: &v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceMemory: resource.MustParse("10Mi"),
v1.ResourceCPU: resource.MustParse("10m"),
},
Limits: nil,
},
allocatedResources: &v1.ResourceRequirements{
Requests: nil,
Limits: v1.ResourceList{
v1.ResourceMemory: resource.MustParse("10Mi"),
v1.ResourceCPU: resource.MustParse("10m"),
},
},
},
{
name: "actuated equals allocated",
actuatedResources: &v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceMemory: resource.MustParse("20Mi"),
v1.ResourceCPU: resource.MustParse("10m"),
},
Limits: v1.ResourceList{
v1.ResourceMemory: resource.MustParse("10Mi"),
v1.ResourceCPU: resource.MustParse("20m"),
},
},
allocatedResources: &v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceMemory: resource.MustParse("20Mi"),
v1.ResourceCPU: resource.MustParse("10m"),
},
Limits: v1.ResourceList{
v1.ResourceMemory: resource.MustParse("10Mi"),
v1.ResourceCPU: resource.MustParse("20m"),
},
},
cpuMemoryequal: true,
},
{
name: "actuated equals allocated for resizable resources",
actuatedResources: &v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceMemory: resource.MustParse("20Mi"),
v1.ResourceCPU: resource.MustParse("10m"),
},
Limits: v1.ResourceList{
v1.ResourceMemory: resource.MustParse("10Mi"),
v1.ResourceCPU: resource.MustParse("20m"),
},
},
allocatedResources: &v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceMemory: resource.MustParse("20Mi"),
v1.ResourceCPU: resource.MustParse("10m"),
},
Limits: v1.ResourceList{
v1.ResourceMemory: resource.MustParse("10Mi"),
v1.ResourceCPU: resource.MustParse("20m"),
},
},
cpuMemoryequal: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
gotEqual := cmpActuatedAllocated(test.actuatedResources, test.allocatedResources)
assert.Equal(t, test.cpuMemoryequal, gotEqual)
})
}
}