mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-06-11 01:41:54 -04:00
kubelet: migrate allocation to contextual logging
This commit is contained in:
parent
c245b40b87
commit
4822f77879
6 changed files with 105 additions and 56 deletions
|
|
@ -139,7 +139,8 @@ type manager struct {
|
|||
recorder record.EventRecorder
|
||||
}
|
||||
|
||||
func NewManager(checkpointDirectory string,
|
||||
func NewManager(
|
||||
checkpointDirectory string,
|
||||
nodeConfig cm.NodeConfig,
|
||||
nodeAllocatableAbsolute v1.ResourceList,
|
||||
statusManager status.Manager,
|
||||
|
|
@ -149,8 +150,11 @@ func NewManager(checkpointDirectory string,
|
|||
sourcesReady config.SourcesReady,
|
||||
recorder record.EventRecorder,
|
||||
) Manager {
|
||||
// Use klog.TODO() because we currently do not have a proper logger to pass in.
|
||||
// Replace this with an appropriate logger when refactoring this function to accept a logger parameter.
|
||||
logger := klog.TODO()
|
||||
return &manager{
|
||||
allocated: newStateImpl(checkpointDirectory, allocatedPodsStateFile),
|
||||
allocated: newStateImpl(logger, checkpointDirectory, allocatedPodsStateFile),
|
||||
|
||||
statusManager: statusManager,
|
||||
admitHandlers: lifecycle.PodAdmitHandlers{},
|
||||
|
|
@ -166,7 +170,7 @@ func NewManager(checkpointDirectory string,
|
|||
}
|
||||
}
|
||||
|
||||
func newStateImpl(checkpointDirectory, checkpointName string) state.State {
|
||||
func newStateImpl(logger klog.Logger, checkpointDirectory, checkpointName string) state.State {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
|
||||
return state.NewNoopStateCheckpoint()
|
||||
}
|
||||
|
|
@ -174,7 +178,7 @@ func newStateImpl(checkpointDirectory, checkpointName string) state.State {
|
|||
stateImpl, err := state.NewStateCheckpoint(checkpointDirectory, checkpointName)
|
||||
if err != nil {
|
||||
// This is a critical, non-recoverable failure.
|
||||
klog.ErrorS(err, "Failed to initialize allocation checkpoint manager",
|
||||
logger.Error(err, "Failed to initialize allocation checkpoint manager",
|
||||
"checkpointPath", filepath.Join(checkpointDirectory, checkpointName))
|
||||
panic(err)
|
||||
}
|
||||
|
|
@ -184,7 +188,8 @@ func newStateImpl(checkpointDirectory, checkpointName string) state.State {
|
|||
|
||||
// NewInMemoryManager returns an allocation manager that doesn't persist state.
|
||||
// For testing purposes only!
|
||||
func NewInMemoryManager(nodeConfig cm.NodeConfig,
|
||||
func NewInMemoryManager(
|
||||
nodeConfig cm.NodeConfig,
|
||||
nodeAllocatableAbsolute v1.ResourceList,
|
||||
statusManager status.Manager,
|
||||
triggerPodSync func(pod *v1.Pod),
|
||||
|
|
@ -213,12 +218,13 @@ func NewInMemoryManager(nodeConfig cm.NodeConfig,
|
|||
func (m *manager) Run(ctx context.Context) {
|
||||
// Start a goroutine to periodically check for pending resizes and process them if needed.
|
||||
go func() {
|
||||
logger := klog.FromContext(ctx)
|
||||
for {
|
||||
select {
|
||||
case <-m.ticker.C:
|
||||
successfulResizes := m.retryPendingResizes(triggerReasonPeriodic)
|
||||
successfulResizes := m.retryPendingResizes(logger, triggerReasonPeriodic)
|
||||
for _, po := range successfulResizes {
|
||||
klog.InfoS("Successfully retried resize after timeout", "pod", klog.KObj(po))
|
||||
logger.Info("Successfully retried resize after timeout", "pod", klog.KObj(po))
|
||||
}
|
||||
case <-ctx.Done():
|
||||
m.ticker.Stop()
|
||||
|
|
@ -229,15 +235,18 @@ func (m *manager) Run(ctx context.Context) {
|
|||
}
|
||||
|
||||
func (m *manager) RetryPendingResizes(trigger string) {
|
||||
m.retryPendingResizes(trigger)
|
||||
// Use klog.TODO() because we currently do not have a proper logger to pass in.
|
||||
// Replace this with an appropriate logger when refactoring this function to accept a logger parameter.
|
||||
logger := klog.TODO()
|
||||
m.retryPendingResizes(logger, trigger)
|
||||
}
|
||||
|
||||
func (m *manager) retryPendingResizes(trigger string) []*v1.Pod {
|
||||
func (m *manager) retryPendingResizes(logger klog.Logger, trigger string) []*v1.Pod {
|
||||
m.allocationMutex.Lock()
|
||||
defer m.allocationMutex.Unlock()
|
||||
|
||||
if !m.sourcesReady.AllReady() {
|
||||
klog.V(4).InfoS("Skipping evaluation of pending resizes; sources are not ready")
|
||||
logger.V(4).Info("Skipping evaluation of pending resizes; sources are not ready")
|
||||
m.ticker.Reset(initialRetryDelay)
|
||||
return nil
|
||||
}
|
||||
|
|
@ -251,25 +260,25 @@ func (m *manager) retryPendingResizes(trigger string) []*v1.Pod {
|
|||
for _, uid := range m.podsWithPendingResizes {
|
||||
pod, found := m.getPodByUID(uid)
|
||||
if !found {
|
||||
klog.V(4).InfoS("Pod not found; removing from pending resizes", "podUID", uid)
|
||||
logger.V(4).Info("Pod not found; removing from pending resizes", "podUID", uid)
|
||||
continue
|
||||
}
|
||||
|
||||
oldResizeStatus := m.statusManager.GetPodResizeConditions(uid)
|
||||
isDeferred := m.statusManager.IsPodResizeDeferred(uid)
|
||||
|
||||
resizeAllocated, err := m.handlePodResourcesResize(pod)
|
||||
resizeAllocated, err := m.handlePodResourcesResize(logger, pod)
|
||||
switch {
|
||||
case err != nil:
|
||||
klog.ErrorS(err, "Failed to handle pod resources resize", "pod", klog.KObj(pod))
|
||||
logger.Error(err, "Failed to handle pod resources resize", "pod", klog.KObj(pod))
|
||||
newPendingResizes = append(newPendingResizes, uid)
|
||||
case m.statusManager.IsPodResizeDeferred(uid):
|
||||
klog.V(4).InfoS("Pod resize is deferred; will reevaluate later", "pod", klog.KObj(pod))
|
||||
logger.V(4).Info("Pod resize is deferred; will reevaluate later", "pod", klog.KObj(pod))
|
||||
newPendingResizes = append(newPendingResizes, uid)
|
||||
case m.statusManager.IsPodResizeInfeasible(uid):
|
||||
klog.V(4).InfoS("Pod resize is infeasible", "pod", klog.KObj(pod))
|
||||
logger.V(4).Info("Pod resize is infeasible", "pod", klog.KObj(pod))
|
||||
default:
|
||||
klog.V(4).InfoS("Pod resize successfully allocated", "pod", klog.KObj(pod))
|
||||
logger.V(4).Info("Pod resize successfully allocated", "pod", klog.KObj(pod))
|
||||
successfulResizes = append(successfulResizes, pod)
|
||||
if isDeferred {
|
||||
metrics.PodDeferredAcceptedResizes.WithLabelValues(trigger).Inc()
|
||||
|
|
@ -288,6 +297,9 @@ func (m *manager) retryPendingResizes(trigger string) []*v1.Pod {
|
|||
}
|
||||
|
||||
func (m *manager) PushPendingResize(uid types.UID) {
|
||||
// Use klog.TODO() because we currently do not have a proper logger to pass in.
|
||||
// Replace this with an appropriate logger when refactoring this function to accept a logger parameter.
|
||||
logger := klog.TODO()
|
||||
m.allocationMutex.Lock()
|
||||
defer m.allocationMutex.Unlock()
|
||||
|
||||
|
|
@ -300,7 +312,7 @@ func (m *manager) PushPendingResize(uid types.UID) {
|
|||
|
||||
// Add the pod to the pending resizes list and sort by priority.
|
||||
m.podsWithPendingResizes = append(m.podsWithPendingResizes, uid)
|
||||
m.sortPendingResizes()
|
||||
m.sortPendingResizes(logger)
|
||||
}
|
||||
|
||||
// sortPendingResizes sorts the list of pending resizes:
|
||||
|
|
@ -308,12 +320,12 @@ func (m *manager) PushPendingResize(uid types.UID) {
|
|||
// - Second, based on the pod's PriorityClass.
|
||||
// - Third, based on the pod's QoS class.
|
||||
// - Last, prioritizing resizes that have been in the deferred state the longest.
|
||||
func (m *manager) sortPendingResizes() {
|
||||
func (m *manager) sortPendingResizes(logger klog.Logger) {
|
||||
var pendingPods []*v1.Pod
|
||||
for _, uid := range m.podsWithPendingResizes {
|
||||
pod, found := m.getPodByUID(uid)
|
||||
if !found {
|
||||
klog.V(4).InfoS("Pod not found; removing from pending resizes", "podUID", uid)
|
||||
logger.V(4).Info("Pod not found; removing from pending resizes", "podUID", uid)
|
||||
continue
|
||||
}
|
||||
pendingPods = append(pendingPods, pod)
|
||||
|
|
@ -490,7 +502,10 @@ func updatePodFromAllocation(pod *v1.Pod, allocated state.PodResourceInfo) (*v1.
|
|||
|
||||
// SetAllocatedResources checkpoints the resources allocated to a pod's containers
|
||||
func (m *manager) SetAllocatedResources(pod *v1.Pod) error {
|
||||
return m.allocated.SetPodResourceInfo(pod.UID, allocationFromPod(pod))
|
||||
// Use klog.TODO() because we currently do not have a proper logger to pass in.
|
||||
// Replace this with an appropriate logger when refactoring this function to accept a logger parameter.
|
||||
logger := klog.TODO()
|
||||
return m.allocated.SetPodResourceInfo(logger, pod.UID, allocationFromPod(pod))
|
||||
}
|
||||
|
||||
func allocationFromPod(pod *v1.Pod) state.PodResourceInfo {
|
||||
|
|
@ -525,6 +540,9 @@ func (m *manager) SetContainerRuntime(runtime kubecontainer.Runtime) {
|
|||
}
|
||||
|
||||
func (m *manager) AddPod(activePods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
|
||||
// Use klog.TODO() because we currently do not have a proper logger to pass in.
|
||||
// Replace this with an appropriate logger when refactoring this function to accept a logger parameter.
|
||||
logger := klog.TODO()
|
||||
m.allocationMutex.Lock()
|
||||
defer m.allocationMutex.Unlock()
|
||||
|
||||
|
|
@ -536,13 +554,13 @@ func (m *manager) AddPod(activePods []*v1.Pod, pod *v1.Pod) (bool, string, strin
|
|||
|
||||
// Check if we can admit the pod; if so, update the allocation.
|
||||
allocatedPods := m.getAllocatedPods(activePods)
|
||||
ok, reason, message := m.canAdmitPod(allocatedPods, pod)
|
||||
ok, reason, message := m.canAdmitPod(logger, allocatedPods, pod)
|
||||
|
||||
if ok && utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
|
||||
// Checkpoint the resource values at which the Pod has been admitted or resized.
|
||||
if err := m.SetAllocatedResources(pod); err != nil {
|
||||
// TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate
|
||||
klog.ErrorS(err, "SetPodAllocation failed", "pod", klog.KObj(pod))
|
||||
logger.Error(err, "SetPodAllocation failed", "pod", klog.KObj(pod))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -550,9 +568,12 @@ func (m *manager) AddPod(activePods []*v1.Pod, pod *v1.Pod) (bool, string, strin
|
|||
}
|
||||
|
||||
func (m *manager) RemovePod(uid types.UID) {
|
||||
// Use klog.TODO() because we currently do not have a proper logger to pass in.
|
||||
// Replace this with an appropriate logger when refactoring this function to accept a logger parameter.
|
||||
logger := klog.TODO()
|
||||
if err := m.allocated.RemovePod(uid); err != nil {
|
||||
// If the deletion fails, it will be retried by RemoveOrphanedPods, so we can safely ignore the error.
|
||||
klog.V(3).ErrorS(err, "Failed to delete pod allocation", "podUID", uid)
|
||||
logger.V(3).Error(err, "Failed to delete pod allocation", "podUID", uid)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -560,7 +581,7 @@ func (m *manager) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) {
|
|||
m.allocated.RemoveOrphanedPods(remainingPods)
|
||||
}
|
||||
|
||||
func (m *manager) handlePodResourcesResize(pod *v1.Pod) (bool, error) {
|
||||
func (m *manager) handlePodResourcesResize(logger klog.Logger, pod *v1.Pod) (bool, error) {
|
||||
allocatedPod, updated := m.UpdatePodFromAllocation(pod)
|
||||
if !updated {
|
||||
// Desired resources == allocated resources. Pod allocation does not need to be updated.
|
||||
|
|
@ -590,7 +611,7 @@ func (m *manager) handlePodResourcesResize(pod *v1.Pod) (bool, error) {
|
|||
}
|
||||
|
||||
// Desired resources != allocated resources. Can we update the allocation to the desired resources?
|
||||
fit, reason, message := m.canResizePod(m.getAllocatedPods(m.getActivePods()), pod)
|
||||
fit, reason, message := m.canResizePod(logger, m.getAllocatedPods(m.getActivePods()), pod)
|
||||
if fit {
|
||||
// Update pod resource allocation checkpoint
|
||||
if err := m.SetAllocatedResources(pod); err != nil {
|
||||
|
|
@ -665,7 +686,7 @@ func disallowResizeForSwappableContainers(runtime kubecontainer.Runtime, desired
|
|||
// the pod cannot be admitted.
|
||||
// allocatedPods should represent the pods that have already been admitted, along with their
|
||||
// admitted (allocated) resources.
|
||||
func (m *manager) canAdmitPod(allocatedPods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
|
||||
func (m *manager) canAdmitPod(logger klog.Logger, allocatedPods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
|
||||
// Filter out the pod being evaluated.
|
||||
allocatedPods = slices.DeleteFunc(allocatedPods, func(p *v1.Pod) bool { return p.UID == pod.UID })
|
||||
|
||||
|
|
@ -673,7 +694,7 @@ func (m *manager) canAdmitPod(allocatedPods []*v1.Pod, pod *v1.Pod) (bool, strin
|
|||
attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: allocatedPods}
|
||||
for _, podAdmitHandler := range m.admitHandlers {
|
||||
if result := podAdmitHandler.Admit(attrs); !result.Admit {
|
||||
klog.InfoS("Pod admission denied", "podUID", attrs.Pod.UID, "pod", klog.KObj(attrs.Pod), "reason", result.Reason, "message", result.Message)
|
||||
logger.Info("Pod admission denied", "podUID", attrs.Pod.UID, "pod", klog.KObj(attrs.Pod), "reason", result.Reason, "message", result.Message)
|
||||
return false, result.Reason, result.Message
|
||||
}
|
||||
}
|
||||
|
|
@ -685,7 +706,7 @@ func (m *manager) canAdmitPod(allocatedPods []*v1.Pod, pod *v1.Pod) (bool, strin
|
|||
// pod should hold the desired (pre-allocated) spec.
|
||||
// Returns true if the resize can proceed; returns a reason and message
|
||||
// otherwise.
|
||||
func (m *manager) canResizePod(allocatedPods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
|
||||
func (m *manager) canResizePod(logger klog.Logger, allocatedPods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
|
||||
// TODO: Move this logic into a PodAdmitHandler by introducing an operation field to
|
||||
// lifecycle.PodAdmitAttributes, and combine canResizePod with canAdmitPod.
|
||||
if v1qos.GetPodQOS(pod) == v1.PodQOSGuaranteed {
|
||||
|
|
@ -693,7 +714,7 @@ func (m *manager) canResizePod(allocatedPods []*v1.Pod, pod *v1.Pod) (bool, stri
|
|||
m.nodeConfig.CPUManagerPolicy == string(cpumanager.PolicyStatic) &&
|
||||
m.guaranteedPodResourceResizeRequired(pod, v1.ResourceCPU) {
|
||||
msg := fmt.Sprintf("Resize is infeasible for Guaranteed Pods alongside CPU Manager policy \"%s\"", string(cpumanager.PolicyStatic))
|
||||
klog.V(3).InfoS(msg, "pod", format.Pod(pod))
|
||||
logger.V(3).Info(msg, "pod", format.Pod(pod))
|
||||
metrics.PodInfeasibleResizes.WithLabelValues("guaranteed_pod_cpu_manager_static_policy").Inc()
|
||||
return false, v1.PodReasonInfeasible, msg
|
||||
}
|
||||
|
|
@ -702,7 +723,7 @@ func (m *manager) canResizePod(allocatedPods []*v1.Pod, pod *v1.Pod) (bool, stri
|
|||
m.nodeConfig.MemoryManagerPolicy == string(memorymanager.PolicyTypeStatic) &&
|
||||
m.guaranteedPodResourceResizeRequired(pod, v1.ResourceMemory) {
|
||||
msg := fmt.Sprintf("Resize is infeasible for Guaranteed Pods alongside Memory Manager policy \"%s\"", string(memorymanager.PolicyTypeStatic))
|
||||
klog.V(3).InfoS(msg, "pod", format.Pod(pod))
|
||||
logger.V(3).Info(msg, "pod", format.Pod(pod))
|
||||
metrics.PodInfeasibleResizes.WithLabelValues("guaranteed_pod_memory_manager_static_policy").Inc()
|
||||
return false, v1.PodReasonInfeasible, msg
|
||||
}
|
||||
|
|
@ -720,14 +741,14 @@ func (m *manager) canResizePod(allocatedPods []*v1.Pod, pod *v1.Pod) (bool, stri
|
|||
msg = fmt.Sprintf("cpu, requested: %d, capacity: %d", cpuRequests, cpuAvailable)
|
||||
}
|
||||
msg = "Node didn't have enough capacity: " + msg
|
||||
klog.V(3).InfoS(msg, "pod", klog.KObj(pod))
|
||||
logger.V(3).Info(msg, "pod", klog.KObj(pod))
|
||||
metrics.PodInfeasibleResizes.WithLabelValues("insufficient_node_allocatable").Inc()
|
||||
return false, v1.PodReasonInfeasible, msg
|
||||
}
|
||||
|
||||
if ok, failReason, failMessage := m.canAdmitPod(allocatedPods, pod); !ok {
|
||||
if ok, failReason, failMessage := m.canAdmitPod(logger, allocatedPods, pod); !ok {
|
||||
// Log reason and return.
|
||||
klog.V(3).InfoS("Resize cannot be accommodated", "pod", klog.KObj(pod), "reason", failReason, "message", failMessage)
|
||||
logger.V(3).Info("Resize cannot be accommodated", "pod", klog.KObj(pod), "reason", failReason, "message", failMessage)
|
||||
return false, v1.PodReasonDeferred, failMessage
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -54,6 +54,7 @@ import (
|
|||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
|
||||
_ "k8s.io/kubernetes/pkg/volume/hostpath"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
|
|
@ -2345,6 +2346,7 @@ func TestRecordPodDeferredAcceptedResizes(t *testing.T) {
|
|||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
original := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
UID: "1111",
|
||||
|
|
@ -2387,7 +2389,7 @@ func TestRecordPodDeferredAcceptedResizes(t *testing.T) {
|
|||
return resizedPod, true
|
||||
}
|
||||
am.PushPendingResize(original.UID)
|
||||
resizedPods := am.(*manager).retryPendingResizes(tc.trigger)
|
||||
resizedPods := am.(*manager).retryPendingResizes(logger, tc.trigger)
|
||||
|
||||
require.Len(t, resizedPods, 1)
|
||||
require.Equal(t, original.UID, resizedPods[0].UID)
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ import (
|
|||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// PodResourceInfo stores resource requirements for containers within a pod.
|
||||
|
|
@ -58,7 +59,7 @@ type Reader interface {
|
|||
|
||||
type writer interface {
|
||||
SetContainerResources(podUID types.UID, containerName string, resources v1.ResourceRequirements) error
|
||||
SetPodResourceInfo(podUID types.UID, resourceInfo PodResourceInfo) error
|
||||
SetPodResourceInfo(logger klog.Logger, podUID types.UID, resourceInfo PodResourceInfo) error
|
||||
SetPodLevelResources(podUID types.UID, alloc *v1.ResourceRequirements) error
|
||||
RemovePod(podUID types.UID) error
|
||||
// RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods.
|
||||
|
|
|
|||
|
|
@ -42,12 +42,15 @@ type stateCheckpoint struct {
|
|||
|
||||
// NewStateCheckpoint creates new State for keeping track of pod resource information with checkpoint backend
|
||||
func NewStateCheckpoint(stateDir, checkpointName string) (State, error) {
|
||||
// Use klog.TODO() because we currently do not have a proper logger to pass in.
|
||||
// Replace this with an appropriate logger when refactoring this function to accept a logger parameter.
|
||||
logger := klog.TODO()
|
||||
checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize checkpoint manager for pod resource information tracking: %w", err)
|
||||
}
|
||||
|
||||
pra, checksum, err := restoreState(checkpointManager, checkpointName)
|
||||
pra, checksum, err := restoreState(logger, checkpointManager, checkpointName)
|
||||
if err != nil {
|
||||
//lint:ignore ST1005 user-facing error message
|
||||
return nil, fmt.Errorf("could not restore state from checkpoint: %w, please drain this node and delete pod resource information checkpoint file %q before restarting Kubelet",
|
||||
|
|
@ -64,7 +67,7 @@ func NewStateCheckpoint(stateDir, checkpointName string) (State, error) {
|
|||
}
|
||||
|
||||
// restores state from a checkpoint and creates it if it doesn't exist
|
||||
func restoreState(checkpointManager checkpointmanager.CheckpointManager, checkpointName string) (PodResourceInfoMap, checksum.Checksum, error) {
|
||||
func restoreState(logger klog.Logger, checkpointManager checkpointmanager.CheckpointManager, checkpointName string) (PodResourceInfoMap, checksum.Checksum, error) {
|
||||
checkpoint := &Checkpoint{}
|
||||
if err := checkpointManager.GetCheckpoint(checkpointName, checkpoint); err != nil {
|
||||
if err == errors.ErrCheckpointNotFound {
|
||||
|
|
@ -78,12 +81,12 @@ func restoreState(checkpointManager checkpointmanager.CheckpointManager, checkpo
|
|||
return nil, 0, fmt.Errorf("failed to get pod resource information: %w", err)
|
||||
}
|
||||
|
||||
klog.V(2).InfoS("State checkpoint: restored pod resource state from checkpoint")
|
||||
logger.V(2).Info("State checkpoint: restored pod resource state from checkpoint")
|
||||
return praInfo.Entries, checkpoint.Checksum, nil
|
||||
}
|
||||
|
||||
// saves state to a checkpoint, caller is responsible for locking
|
||||
func (sc *stateCheckpoint) storeState() error {
|
||||
func (sc *stateCheckpoint) storeState(logger klog.Logger) error {
|
||||
resourceInfo := sc.cache.GetPodResourceInfoMap()
|
||||
|
||||
checkpoint, err := NewCheckpoint(&PodResourceCheckpointInfo{
|
||||
|
|
@ -98,7 +101,7 @@ func (sc *stateCheckpoint) storeState() error {
|
|||
}
|
||||
err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Failed to save pod resource information checkpoint")
|
||||
logger.Error(err, "Failed to save pod resource information checkpoint")
|
||||
return err
|
||||
}
|
||||
sc.lastChecksum = checkpoint.Checksum
|
||||
|
|
@ -135,35 +138,41 @@ func (sc *stateCheckpoint) GetPodResourceInfo(podUID types.UID) (PodResourceInfo
|
|||
|
||||
// SetContainerResoruces sets resources information for a pod's container
|
||||
func (sc *stateCheckpoint) SetContainerResources(podUID types.UID, containerName string, resources v1.ResourceRequirements) error {
|
||||
// Use klog.TODO() because we currently do not have a proper logger to pass in.
|
||||
// Replace this with an appropriate logger when refactoring this function to accept a logger parameter.
|
||||
logger := klog.TODO()
|
||||
sc.mux.Lock()
|
||||
defer sc.mux.Unlock()
|
||||
err := sc.cache.SetContainerResources(podUID, containerName, resources)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return sc.storeState()
|
||||
return sc.storeState(logger)
|
||||
}
|
||||
|
||||
// SetPodLevelResources sets resources information for a pod's resources at pod-level.
|
||||
func (sc *stateCheckpoint) SetPodLevelResources(podUID types.UID, resInfo *v1.ResourceRequirements) error {
|
||||
// Use klog.TODO() because we currently do not have a proper logger to pass in.
|
||||
// Replace this with an appropriate logger when refactoring this function to accept a logger parameter.
|
||||
logger := klog.TODO()
|
||||
sc.mux.Lock()
|
||||
defer sc.mux.Unlock()
|
||||
err := sc.cache.SetPodLevelResources(podUID, resInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return sc.storeState()
|
||||
return sc.storeState(logger)
|
||||
}
|
||||
|
||||
// SetPodResourceInfo sets pod resource information
|
||||
func (sc *stateCheckpoint) SetPodResourceInfo(podUID types.UID, resourceInfo PodResourceInfo) error {
|
||||
func (sc *stateCheckpoint) SetPodResourceInfo(logger klog.Logger, podUID types.UID, resourceInfo PodResourceInfo) error {
|
||||
sc.mux.Lock()
|
||||
defer sc.mux.Unlock()
|
||||
err := sc.cache.SetPodResourceInfo(podUID, resourceInfo)
|
||||
err := sc.cache.SetPodResourceInfo(logger, podUID, resourceInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return sc.storeState()
|
||||
return sc.storeState(logger)
|
||||
}
|
||||
|
||||
// Delete deletes resource information for specified pod
|
||||
|
|
@ -213,7 +222,7 @@ func (sc *noopStateCheckpoint) SetPodLevelResources(_ types.UID, _ *v1.ResourceR
|
|||
return nil
|
||||
}
|
||||
|
||||
func (sc *noopStateCheckpoint) SetPodResourceInfo(_ types.UID, _ PodResourceInfo) error {
|
||||
func (sc *noopStateCheckpoint) SetPodResourceInfo(_ klog.Logger, _ types.UID, _ PodResourceInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ import (
|
|||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
||||
)
|
||||
|
||||
|
|
@ -115,12 +116,13 @@ func Test_stateCheckpoint_storeState(t *testing.T) {
|
|||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
testDir := getTestDir(t)
|
||||
originalSC, err := NewStateCheckpoint(testDir, testCheckpoint)
|
||||
require.NoError(t, err)
|
||||
|
||||
for podUID, alloc := range tt.args.resInfoMap {
|
||||
err = originalSC.SetPodResourceInfo(podUID, alloc)
|
||||
err = originalSC.SetPodResourceInfo(logger, podUID, alloc)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
|
|
@ -139,12 +141,12 @@ func Test_stateCheckpoint_storeState(t *testing.T) {
|
|||
|
||||
// Setting the pod allocations to the same values should not re-write the checkpoint.
|
||||
for podUID, alloc := range tt.args.resInfoMap {
|
||||
require.NoError(t, originalSC.SetPodResourceInfo(podUID, alloc))
|
||||
require.NoError(t, originalSC.SetPodResourceInfo(logger, podUID, alloc))
|
||||
require.NoFileExists(t, checkpointPath, "checkpoint should not be re-written")
|
||||
}
|
||||
|
||||
// Setting a new value should update the checkpoint.
|
||||
require.NoError(t, originalSC.SetPodResourceInfo("foo-bar", PodResourceInfo{
|
||||
require.NoError(t, originalSC.SetPodResourceInfo(logger, "foo-bar", PodResourceInfo{
|
||||
ContainerResources: map[string]v1.ResourceRequirements{
|
||||
"container1": {Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1")}},
|
||||
},
|
||||
|
|
@ -156,6 +158,7 @@ func Test_stateCheckpoint_storeState(t *testing.T) {
|
|||
}
|
||||
|
||||
func Test_stateCheckpoint_formatUpgraded(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
// Based on the PodResourceAllocationInfo struct, it's mostly possible that new field will be added
|
||||
// in struct PodResourceAllocationInfo, rather than in struct PodResourceAllocationInfo.AllocationEntries.
|
||||
// Emulate upgrade scenario by pretending that `ResizeStatusEntries` is a new field.
|
||||
|
|
@ -184,7 +187,7 @@ func Test_stateCheckpoint_formatUpgraded(t *testing.T) {
|
|||
err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
|
||||
require.NoError(t, err, "failed to create old checkpoint")
|
||||
|
||||
actualPodResourceAllocation, _, err := restoreState(sc.checkpointManager, sc.checkpointName)
|
||||
actualPodResourceAllocation, _, err := restoreState(logger, sc.checkpointManager, sc.checkpointName)
|
||||
require.NoError(t, err, "failed to restore state")
|
||||
|
||||
require.Equal(t, expectedPodResourceAllocation, actualPodResourceAllocation, "pod resource allocation info is not equal")
|
||||
|
|
|
|||
|
|
@ -34,10 +34,13 @@ var _ State = &stateMemory{}
|
|||
|
||||
// NewStateMemory creates new State to track resources resourcesated to pods
|
||||
func NewStateMemory(resources PodResourceInfoMap) State {
|
||||
// Use klog.TODO() because we currently do not have a proper logger to pass in.
|
||||
// Replace this with an appropriate logger when refactoring this function to accept a logger parameter.
|
||||
logger := klog.TODO()
|
||||
if resources == nil {
|
||||
resources = PodResourceInfoMap{}
|
||||
}
|
||||
klog.V(2).InfoS("Initialized new in-memory state store for pod resource information tracking")
|
||||
logger.V(2).Info("Initialized new in-memory state store for pod resource information tracking")
|
||||
return &stateMemory{
|
||||
podResources: resources,
|
||||
}
|
||||
|
|
@ -87,6 +90,10 @@ func (s *stateMemory) GetPodResourceInfo(podUID types.UID) (PodResourceInfo, boo
|
|||
}
|
||||
|
||||
func (s *stateMemory) SetContainerResources(podUID types.UID, containerName string, resources v1.ResourceRequirements) error {
|
||||
// Use klog.TODO() because we currently do not have a proper logger to pass in.
|
||||
// Replace this with an appropriate logger when refactoring this function to accept a logger parameter.
|
||||
logger := klog.TODO()
|
||||
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
|
|
@ -104,11 +111,14 @@ func (s *stateMemory) SetContainerResources(podUID types.UID, containerName stri
|
|||
podInfo.ContainerResources[containerName] = resources
|
||||
s.podResources[podUID] = podInfo
|
||||
|
||||
klog.V(3).InfoS("Updated container resource information", "podUID", podUID, "containerName", containerName, "resources", resources)
|
||||
logger.V(3).Info("Updated container resource information", "podUID", podUID, "containerName", containerName, "resources", resources)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *stateMemory) SetPodLevelResources(podUID types.UID, resources *v1.ResourceRequirements) error {
|
||||
// Use klog.TODO() because we currently do not have a proper logger to pass in.
|
||||
// Replace this with an appropriate logger when refactoring this function to accept a logger parameter.
|
||||
logger := klog.TODO()
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
|
|
@ -121,24 +131,27 @@ func (s *stateMemory) SetPodLevelResources(podUID types.UID, resources *v1.Resou
|
|||
|
||||
s.podResources[podUID] = podInfo
|
||||
|
||||
klog.V(3).InfoS("Updated pod-level resource info", "podUID", podUID, "resources", resources)
|
||||
logger.V(3).Info("Updated pod-level resource info", "podUID", podUID, "resources", resources)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *stateMemory) SetPodResourceInfo(podUID types.UID, resourceInfo PodResourceInfo) error {
|
||||
func (s *stateMemory) SetPodResourceInfo(logger klog.Logger, podUID types.UID, resourceInfo PodResourceInfo) error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
s.podResources[podUID] = resourceInfo
|
||||
klog.V(3).InfoS("Updated pod resource information", "podUID", podUID, "information", resourceInfo)
|
||||
logger.V(3).Info("Updated pod resource information", "podUID", podUID, "information", resourceInfo)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *stateMemory) RemovePod(podUID types.UID) error {
|
||||
// Use klog.TODO() because we currently do not have a proper logger to pass in.
|
||||
// Replace this with an appropriate logger when refactoring this function to accept a logger parameter.
|
||||
logger := klog.TODO()
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
delete(s.podResources, podUID)
|
||||
klog.V(3).InfoS("Deleted pod resource information", "podUID", podUID)
|
||||
logger.V(3).Info("Deleted pod resource information", "podUID", podUID)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue