From 5cd3a4ffc68b9d8848f8f9664b46fb05bd907415 Mon Sep 17 00:00:00 2001 From: hoteye Date: Wed, 14 Jan 2026 19:45:12 +0800 Subject: [PATCH] kubelet: migrate kubelet_getters.go to contextual logging Migrate kubelet_getters.go and related functions to use contextual logging. Functions that only need logging receive logger klog.Logger directly instead of ctx context.Context. Changes: - getPodVolumePathListFromDisk: ctx -> logger - getMountedVolumePathListFromDisk: ctx -> logger - getPodVolumeSubpathListFromDisk: ctx -> logger - getLastObservedNodeAddresses: ctx -> logger - podVolumesExist: ctx -> logger - removeOrphanedPodVolumeDirs: ctx -> logger - cleanupOrphanedPodDirs: ctx -> logger - cleanupOrphanedPodCgroups: ctx -> logger Callers use klog.FromContext(ctx) to obtain logger from existing context. --- pkg/kubelet/kubelet.go | 6 ++++-- pkg/kubelet/kubelet_getters.go | 18 +++++++++--------- pkg/kubelet/kubelet_getters_test.go | 4 +++- pkg/kubelet/kubelet_pods.go | 4 ++-- pkg/kubelet/kubelet_volumes.go | 22 +++++++++++----------- pkg/kubelet/kubelet_volumes_linux_test.go | 7 +++++-- pkg/kubelet/kubelet_volumes_test.go | 6 +++--- 7 files changed, 37 insertions(+), 30 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index a8c63db7404..1924d91e86e 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -900,7 +900,9 @@ func NewMainKubelet(ctx context.Context, if kubeDeps.TLSOptions != nil { if kubeCfg.ServerTLSBootstrap && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) { - klet.serverCertificateManager, err = kubeletcertificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, klet.getLastObservedNodeAddresses, certDirectory) + klet.serverCertificateManager, err = kubeletcertificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, func() []v1.NodeAddress { + return klet.getLastObservedNodeAddresses(logger) + }, certDirectory) if err != nil { return nil, fmt.Errorf("failed to initialize certificate manager: %w", err) } @@ -2371,7 +2373,7 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus // This waiting loop relies on the background cleanup which starts after pod workers respond // true for ShouldPodRuntimeBeRemoved, which happens after `SyncTerminatingPod` is completed. if err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) { - volumesExist := kl.podVolumesExist(pod.UID) + volumesExist := kl.podVolumesExist(logger, pod.UID) if volumesExist { klog.V(3).InfoS("Pod is terminated, but some volumes have not been cleaned up", "pod", klog.KObj(pod), "podUID", pod.UID) } diff --git a/pkg/kubelet/kubelet_getters.go b/pkg/kubelet/kubelet_getters.go index 05087c9a15d..70554880d81 100644 --- a/pkg/kubelet/kubelet_getters.go +++ b/pkg/kubelet/kubelet_getters.go @@ -334,20 +334,20 @@ func (kl *Kubelet) GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64 { // getPodVolumePathListFromDisk returns a list of the volume paths by reading the // volume directories for the given pod from the disk. -func (kl *Kubelet) getPodVolumePathListFromDisk(podUID types.UID) ([]string, error) { +func (kl *Kubelet) getPodVolumePathListFromDisk(logger klog.Logger, podUID types.UID) ([]string, error) { volumes := []string{} podVolDir := kl.getPodVolumesDir(podUID) if pathExists, pathErr := mount.PathExists(podVolDir); pathErr != nil { return volumes, fmt.Errorf("error checking if path %q exists: %v", podVolDir, pathErr) } else if !pathExists { - klog.V(6).InfoS("Path does not exist", "path", podVolDir) + logger.V(6).Info("Path does not exist", "path", podVolDir) return volumes, nil } volumePluginDirs, err := os.ReadDir(podVolDir) if err != nil { - klog.ErrorS(err, "Could not read directory", "path", podVolDir) + logger.Error(err, "Could not read directory", "path", podVolDir) return volumes, err } for _, volumePluginDir := range volumePluginDirs { @@ -378,9 +378,9 @@ func (kl *Kubelet) getPodVolumePathListFromDisk(podUID types.UID) ([]string, err return volumes, nil } -func (kl *Kubelet) getMountedVolumePathListFromDisk(podUID types.UID) ([]string, error) { +func (kl *Kubelet) getMountedVolumePathListFromDisk(logger klog.Logger, podUID types.UID) ([]string, error) { mountedVolumes := []string{} - volumePaths, err := kl.getPodVolumePathListFromDisk(podUID) + volumePaths, err := kl.getPodVolumePathListFromDisk(logger, podUID) if err != nil { return mountedVolumes, err } @@ -403,7 +403,7 @@ func (kl *Kubelet) getMountedVolumePathListFromDisk(podUID types.UID) ([]string, // getPodVolumeSubpathListFromDisk returns a list of the volume-subpath paths by reading the // subpath directories for the given pod from the disk. -func (kl *Kubelet) getPodVolumeSubpathListFromDisk(podUID types.UID) ([]string, error) { +func (kl *Kubelet) getPodVolumeSubpathListFromDisk(logger klog.Logger, podUID types.UID) ([]string, error) { volumes := []string{} podSubpathsDir := kl.getPodVolumeSubpathsDir(podUID) @@ -416,7 +416,7 @@ func (kl *Kubelet) getPodVolumeSubpathListFromDisk(podUID types.UID) ([]string, // Explicitly walks /// volumePluginDirs, err := os.ReadDir(podSubpathsDir) if err != nil { - klog.ErrorS(err, "Could not read directory", "path", podSubpathsDir) + logger.Error(err, "Could not read directory", "path", podSubpathsDir) return volumes, err } for _, volumePluginDir := range volumePluginDirs { @@ -467,10 +467,10 @@ func (kl *Kubelet) setCachedMachineInfo(info *cadvisorapiv1.MachineInfo) { } // getLastStableNodeAddresses returns the last observed node addresses. -func (kl *Kubelet) getLastObservedNodeAddresses() []v1.NodeAddress { +func (kl *Kubelet) getLastObservedNodeAddresses(logger klog.Logger) []v1.NodeAddress { node, err := kl.GetNode() if err != nil || node == nil { - klog.V(4).InfoS("fail to obtain node from local cache", "node", kl.nodeName, "error", err) + logger.V(4).Info("fail to obtain node from local cache", "node", kl.nodeName, "error", err) return nil } return node.Status.Addresses diff --git a/pkg/kubelet/kubelet_getters_test.go b/pkg/kubelet/kubelet_getters_test.go index abd08432b86..d9710ace39f 100644 --- a/pkg/kubelet/kubelet_getters_test.go +++ b/pkg/kubelet/kubelet_getters_test.go @@ -25,6 +25,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" cloudproviderapi "k8s.io/cloud-provider/api" + "k8s.io/klog/v2/ktesting" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" ) @@ -285,6 +286,7 @@ func Test_getLastObservedNodeAddresses(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kl := testKubelet.kubelet @@ -295,7 +297,7 @@ func Test_getLastObservedNodeAddresses(t *testing.T) { nodeLister.nodes = append(nodeLister.nodes, tc.node) } kl.nodeLister = nodeLister - addrs := kl.getLastObservedNodeAddresses() + addrs := kl.getLastObservedNodeAddresses(logger) if len(addrs) != len(tc.expectedAddrs) { t.Errorf("expected %d addresses, got %d", len(tc.expectedAddrs), len(addrs)) diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index c8b57400204..83b8f696e1d 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -1293,7 +1293,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error { // in the future (volumes, mount dirs, logs, and containers could all be // better separated) logger.V(3).Info("Clean up orphaned pod directories") - err = kl.cleanupOrphanedPodDirs(allPods, runningRuntimePods) + err = kl.cleanupOrphanedPodDirs(logger, allPods, runningRuntimePods) if err != nil { // We want all cleanup tasks to be run even if one of them failed. So // we just log an error here and continue other cleanup tasks. @@ -2842,7 +2842,7 @@ func (kl *Kubelet) cleanupOrphanedPodCgroups(logger klog.Logger, pcm cm.PodConta // so any memory backed volumes don't have their charges propagated to the // parent croup. If the volumes still exist, reduce the cpu shares for any // process in the cgroup to the minimum value while we wait. - if podVolumesExist := kl.podVolumesExist(uid); podVolumesExist { + if podVolumesExist := kl.podVolumesExist(logger, uid); podVolumesExist { logger.V(3).Info("Orphaned pod found, but volumes not yet removed. Reducing cpu to minimum", "podUID", uid) if err := pcm.ReduceCPULimits(logger, val); err != nil { logger.Info("Failed to reduce cpu time for pod pending volume cleanup", "podUID", uid, "err", err) diff --git a/pkg/kubelet/kubelet_volumes.go b/pkg/kubelet/kubelet_volumes.go index 9e0bd818490..199dcb28e33 100644 --- a/pkg/kubelet/kubelet_volumes.go +++ b/pkg/kubelet/kubelet_volumes.go @@ -75,20 +75,20 @@ func (kl *Kubelet) ListBlockVolumesForPod(podUID types.UID) (map[string]volume.B // podVolumesExist checks with the volume manager and returns true any of the // pods for the specified volume are mounted or are uncertain. -func (kl *Kubelet) podVolumesExist(podUID types.UID) bool { +func (kl *Kubelet) podVolumesExist(logger klog.Logger, podUID types.UID) bool { if kl.volumeManager.HasPossiblyMountedVolumesForPod(volumetypes.UniquePodName(podUID)) { return true } // TODO: This checks pod volume paths and whether they are mounted. If checking returns error, podVolumesExist will return true // which means we consider volumes might exist and requires further checking. // There are some volume plugins such as flexvolume might not have mounts. See issue #61229 - volumePaths, err := kl.getMountedVolumePathListFromDisk(podUID) + volumePaths, err := kl.getMountedVolumePathListFromDisk(logger, podUID) if err != nil { - klog.ErrorS(err, "Pod found, but error occurred during checking mounted volumes from disk", "podUID", podUID) + logger.Error(err, "Pod found, but error occurred during checking mounted volumes from disk", "podUID", podUID) return true } if len(volumePaths) > 0 { - klog.V(4).InfoS("Pod found, but volumes are still mounted on disk", "podUID", podUID, "paths", volumePaths) + logger.V(4).Info("Pod found, but volumes are still mounted on disk", "podUID", podUID, "paths", volumePaths) return true } @@ -116,11 +116,11 @@ func (kl *Kubelet) newVolumeMounterFromPlugins(spec *volume.Spec, pod *v1.Pod) ( // when this is called, so it effectively does a recursive rmdir instead of // RemoveAll to ensure it only removes empty directories and files that were // used as mount points, but not content of the mount points. -func (kl *Kubelet) removeOrphanedPodVolumeDirs(uid types.UID) []error { +func (kl *Kubelet) removeOrphanedPodVolumeDirs(logger klog.Logger, uid types.UID) []error { orphanVolumeErrors := []error{} // If there are still volume directories, attempt to rmdir them - volumePaths, err := kl.getPodVolumePathListFromDisk(uid) + volumePaths, err := kl.getPodVolumePathListFromDisk(logger, uid) if err != nil { orphanVolumeErrors = append(orphanVolumeErrors, fmt.Errorf("orphaned pod %q found, but error occurred during reading volume dir from disk: %v", uid, err)) return orphanVolumeErrors @@ -136,7 +136,7 @@ func (kl *Kubelet) removeOrphanedPodVolumeDirs(uid types.UID) []error { } // If there are any volume-subpaths, attempt to remove them - subpathVolumePaths, err := kl.getPodVolumeSubpathListFromDisk(uid) + subpathVolumePaths, err := kl.getPodVolumeSubpathListFromDisk(logger, uid) if err != nil { orphanVolumeErrors = append(orphanVolumeErrors, fmt.Errorf("orphaned pod %q found, but error occurred during reading of volume-subpaths dir from disk: %v", uid, err)) return orphanVolumeErrors @@ -166,7 +166,7 @@ func (kl *Kubelet) removeOrphanedPodVolumeDirs(uid types.UID) []error { // cleanupOrphanedPodDirs removes the volumes of pods that should not be // running and that have no containers running. Note that we roll up logs here since it runs in the main loop. -func (kl *Kubelet) cleanupOrphanedPodDirs(pods []*v1.Pod, runningPods []*kubecontainer.Pod) error { +func (kl *Kubelet) cleanupOrphanedPodDirs(logger klog.Logger, pods []*v1.Pod, runningPods []*kubecontainer.Pod) error { allPods := sets.New[string]() for _, pod := range pods { allPods.Insert(string(pod.UID)) @@ -195,14 +195,14 @@ func (kl *Kubelet) cleanupOrphanedPodDirs(pods []*v1.Pod, runningPods []*kubecon // Doing so may result in corruption of data. // TODO: getMountedVolumePathListFromDisk() call may be redundant with // kl.getPodVolumePathListFromDisk(). Can this be cleaned up? - if podVolumesExist := kl.podVolumesExist(uid); podVolumesExist { + if podVolumesExist := kl.podVolumesExist(logger, uid); podVolumesExist { errorPods++ - klog.V(3).InfoS("Orphaned pod found, but volumes are not cleaned up", "podUID", uid) + logger.V(3).Info("Orphaned pod found, but volumes are not cleaned up", "podUID", uid) continue } // Attempt to remove the pod volumes directory and its subdirectories - podVolumeErrors := kl.removeOrphanedPodVolumeDirs(uid) + podVolumeErrors := kl.removeOrphanedPodVolumeDirs(logger, uid) if len(podVolumeErrors) > 0 { errorPods++ orphanVolumeErrors = append(orphanVolumeErrors, podVolumeErrors...) diff --git a/pkg/kubelet/kubelet_volumes_linux_test.go b/pkg/kubelet/kubelet_volumes_linux_test.go index 96e2e698355..5b5c23adfb9 100644 --- a/pkg/kubelet/kubelet_volumes_linux_test.go +++ b/pkg/kubelet/kubelet_volumes_linux_test.go @@ -28,6 +28,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2/ktesting" _ "k8s.io/kubernetes/pkg/apis/core/install" "k8s.io/mount-utils" ) @@ -170,6 +171,7 @@ func TestCleanupOrphanedPodDirs(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet @@ -180,7 +182,7 @@ func TestCleanupOrphanedPodDirs(t *testing.T) { } } - err := kubelet.cleanupOrphanedPodDirs(tc.pods, nil) + err := kubelet.cleanupOrphanedPodDirs(logger, tc.pods, nil) if tc.expectErr && err == nil { t.Errorf("%s failed: expected error, got success", name) } @@ -292,6 +294,7 @@ func TestPodVolumesExistWithMount(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet @@ -302,7 +305,7 @@ func TestPodVolumesExistWithMount(t *testing.T) { } } - exist := kubelet.podVolumesExist(poduid) + exist := kubelet.podVolumesExist(logger, poduid) if tc.expected != exist { t.Errorf("%s failed: expected %t, got %t", name, tc.expected, exist) } diff --git a/pkg/kubelet/kubelet_volumes_test.go b/pkg/kubelet/kubelet_volumes_test.go index 4f8bf7c45ef..b1539bd613e 100644 --- a/pkg/kubelet/kubelet_volumes_test.go +++ b/pkg/kubelet/kubelet_volumes_test.go @@ -198,18 +198,18 @@ func TestPodVolumesExist(t *testing.T) { }, } - tCtx := ktesting.Init(t) + logger, tCtx := ktesting.NewTestContext(t) defer tCtx.Cancel("test has completed") go kubelet.volumeManager.Run(tCtx, kubelet.sourcesReady) kubelet.podManager.SetPods(pods) for _, pod := range pods { - err := kubelet.volumeManager.WaitForAttachAndMount(context.Background(), pod) + err := kubelet.volumeManager.WaitForAttachAndMount(tCtx, pod) assert.NoError(t, err) } for _, pod := range pods { - podVolumesExist := kubelet.podVolumesExist(pod.UID) + podVolumesExist := kubelet.podVolumesExist(logger, pod.UID) assert.True(t, podVolumesExist, "pod %q", pod.UID) } }