mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-06-09 08:55:55 -04:00
Merge pull request #136186 from hoteye/migrate-kubelet-getters-contextual-logging
kubelet: migrate kubelet_getters.go to contextual logging
This commit is contained in:
commit
e51e40ede6
7 changed files with 37 additions and 30 deletions
|
|
@ -900,7 +900,9 @@ func NewMainKubelet(ctx context.Context,
|
|||
|
||||
if kubeDeps.TLSOptions != nil {
|
||||
if kubeCfg.ServerTLSBootstrap && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) {
|
||||
klet.serverCertificateManager, err = kubeletcertificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, klet.getLastObservedNodeAddresses, certDirectory)
|
||||
klet.serverCertificateManager, err = kubeletcertificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, func() []v1.NodeAddress {
|
||||
return klet.getLastObservedNodeAddresses(logger)
|
||||
}, certDirectory)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize certificate manager: %w", err)
|
||||
}
|
||||
|
|
@ -2371,7 +2373,7 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus
|
|||
// This waiting loop relies on the background cleanup which starts after pod workers respond
|
||||
// true for ShouldPodRuntimeBeRemoved, which happens after `SyncTerminatingPod` is completed.
|
||||
if err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) {
|
||||
volumesExist := kl.podVolumesExist(pod.UID)
|
||||
volumesExist := kl.podVolumesExist(logger, pod.UID)
|
||||
if volumesExist {
|
||||
klog.V(3).InfoS("Pod is terminated, but some volumes have not been cleaned up", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -334,20 +334,20 @@ func (kl *Kubelet) GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64 {
|
|||
|
||||
// getPodVolumePathListFromDisk returns a list of the volume paths by reading the
|
||||
// volume directories for the given pod from the disk.
|
||||
func (kl *Kubelet) getPodVolumePathListFromDisk(podUID types.UID) ([]string, error) {
|
||||
func (kl *Kubelet) getPodVolumePathListFromDisk(logger klog.Logger, podUID types.UID) ([]string, error) {
|
||||
volumes := []string{}
|
||||
podVolDir := kl.getPodVolumesDir(podUID)
|
||||
|
||||
if pathExists, pathErr := mount.PathExists(podVolDir); pathErr != nil {
|
||||
return volumes, fmt.Errorf("error checking if path %q exists: %v", podVolDir, pathErr)
|
||||
} else if !pathExists {
|
||||
klog.V(6).InfoS("Path does not exist", "path", podVolDir)
|
||||
logger.V(6).Info("Path does not exist", "path", podVolDir)
|
||||
return volumes, nil
|
||||
}
|
||||
|
||||
volumePluginDirs, err := os.ReadDir(podVolDir)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Could not read directory", "path", podVolDir)
|
||||
logger.Error(err, "Could not read directory", "path", podVolDir)
|
||||
return volumes, err
|
||||
}
|
||||
for _, volumePluginDir := range volumePluginDirs {
|
||||
|
|
@ -378,9 +378,9 @@ func (kl *Kubelet) getPodVolumePathListFromDisk(podUID types.UID) ([]string, err
|
|||
return volumes, nil
|
||||
}
|
||||
|
||||
func (kl *Kubelet) getMountedVolumePathListFromDisk(podUID types.UID) ([]string, error) {
|
||||
func (kl *Kubelet) getMountedVolumePathListFromDisk(logger klog.Logger, podUID types.UID) ([]string, error) {
|
||||
mountedVolumes := []string{}
|
||||
volumePaths, err := kl.getPodVolumePathListFromDisk(podUID)
|
||||
volumePaths, err := kl.getPodVolumePathListFromDisk(logger, podUID)
|
||||
if err != nil {
|
||||
return mountedVolumes, err
|
||||
}
|
||||
|
|
@ -403,7 +403,7 @@ func (kl *Kubelet) getMountedVolumePathListFromDisk(podUID types.UID) ([]string,
|
|||
|
||||
// getPodVolumeSubpathListFromDisk returns a list of the volume-subpath paths by reading the
|
||||
// subpath directories for the given pod from the disk.
|
||||
func (kl *Kubelet) getPodVolumeSubpathListFromDisk(podUID types.UID) ([]string, error) {
|
||||
func (kl *Kubelet) getPodVolumeSubpathListFromDisk(logger klog.Logger, podUID types.UID) ([]string, error) {
|
||||
volumes := []string{}
|
||||
podSubpathsDir := kl.getPodVolumeSubpathsDir(podUID)
|
||||
|
||||
|
|
@ -416,7 +416,7 @@ func (kl *Kubelet) getPodVolumeSubpathListFromDisk(podUID types.UID) ([]string,
|
|||
// Explicitly walks /<volume>/<container name>/<subPathIndex>
|
||||
volumePluginDirs, err := os.ReadDir(podSubpathsDir)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Could not read directory", "path", podSubpathsDir)
|
||||
logger.Error(err, "Could not read directory", "path", podSubpathsDir)
|
||||
return volumes, err
|
||||
}
|
||||
for _, volumePluginDir := range volumePluginDirs {
|
||||
|
|
@ -467,10 +467,10 @@ func (kl *Kubelet) setCachedMachineInfo(info *cadvisorapiv1.MachineInfo) {
|
|||
}
|
||||
|
||||
// getLastStableNodeAddresses returns the last observed node addresses.
|
||||
func (kl *Kubelet) getLastObservedNodeAddresses() []v1.NodeAddress {
|
||||
func (kl *Kubelet) getLastObservedNodeAddresses(logger klog.Logger) []v1.NodeAddress {
|
||||
node, err := kl.GetNode()
|
||||
if err != nil || node == nil {
|
||||
klog.V(4).InfoS("fail to obtain node from local cache", "node", kl.nodeName, "error", err)
|
||||
logger.V(4).Info("fail to obtain node from local cache", "node", kl.nodeName, "error", err)
|
||||
return nil
|
||||
}
|
||||
return node.Status.Addresses
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
cloudproviderapi "k8s.io/cloud-provider/api"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
)
|
||||
|
||||
|
|
@ -285,6 +286,7 @@ func Test_getLastObservedNodeAddresses(t *testing.T) {
|
|||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
|
||||
defer testKubelet.Cleanup()
|
||||
kl := testKubelet.kubelet
|
||||
|
|
@ -295,7 +297,7 @@ func Test_getLastObservedNodeAddresses(t *testing.T) {
|
|||
nodeLister.nodes = append(nodeLister.nodes, tc.node)
|
||||
}
|
||||
kl.nodeLister = nodeLister
|
||||
addrs := kl.getLastObservedNodeAddresses()
|
||||
addrs := kl.getLastObservedNodeAddresses(logger)
|
||||
|
||||
if len(addrs) != len(tc.expectedAddrs) {
|
||||
t.Errorf("expected %d addresses, got %d", len(tc.expectedAddrs), len(addrs))
|
||||
|
|
|
|||
|
|
@ -1293,7 +1293,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
|
|||
// in the future (volumes, mount dirs, logs, and containers could all be
|
||||
// better separated)
|
||||
logger.V(3).Info("Clean up orphaned pod directories")
|
||||
err = kl.cleanupOrphanedPodDirs(allPods, runningRuntimePods)
|
||||
err = kl.cleanupOrphanedPodDirs(logger, allPods, runningRuntimePods)
|
||||
if err != nil {
|
||||
// We want all cleanup tasks to be run even if one of them failed. So
|
||||
// we just log an error here and continue other cleanup tasks.
|
||||
|
|
@ -2842,7 +2842,7 @@ func (kl *Kubelet) cleanupOrphanedPodCgroups(logger klog.Logger, pcm cm.PodConta
|
|||
// so any memory backed volumes don't have their charges propagated to the
|
||||
// parent croup. If the volumes still exist, reduce the cpu shares for any
|
||||
// process in the cgroup to the minimum value while we wait.
|
||||
if podVolumesExist := kl.podVolumesExist(uid); podVolumesExist {
|
||||
if podVolumesExist := kl.podVolumesExist(logger, uid); podVolumesExist {
|
||||
logger.V(3).Info("Orphaned pod found, but volumes not yet removed. Reducing cpu to minimum", "podUID", uid)
|
||||
if err := pcm.ReduceCPULimits(logger, val); err != nil {
|
||||
logger.Info("Failed to reduce cpu time for pod pending volume cleanup", "podUID", uid, "err", err)
|
||||
|
|
|
|||
|
|
@ -75,20 +75,20 @@ func (kl *Kubelet) ListBlockVolumesForPod(podUID types.UID) (map[string]volume.B
|
|||
|
||||
// podVolumesExist checks with the volume manager and returns true any of the
|
||||
// pods for the specified volume are mounted or are uncertain.
|
||||
func (kl *Kubelet) podVolumesExist(podUID types.UID) bool {
|
||||
func (kl *Kubelet) podVolumesExist(logger klog.Logger, podUID types.UID) bool {
|
||||
if kl.volumeManager.HasPossiblyMountedVolumesForPod(volumetypes.UniquePodName(podUID)) {
|
||||
return true
|
||||
}
|
||||
// TODO: This checks pod volume paths and whether they are mounted. If checking returns error, podVolumesExist will return true
|
||||
// which means we consider volumes might exist and requires further checking.
|
||||
// There are some volume plugins such as flexvolume might not have mounts. See issue #61229
|
||||
volumePaths, err := kl.getMountedVolumePathListFromDisk(podUID)
|
||||
volumePaths, err := kl.getMountedVolumePathListFromDisk(logger, podUID)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Pod found, but error occurred during checking mounted volumes from disk", "podUID", podUID)
|
||||
logger.Error(err, "Pod found, but error occurred during checking mounted volumes from disk", "podUID", podUID)
|
||||
return true
|
||||
}
|
||||
if len(volumePaths) > 0 {
|
||||
klog.V(4).InfoS("Pod found, but volumes are still mounted on disk", "podUID", podUID, "paths", volumePaths)
|
||||
logger.V(4).Info("Pod found, but volumes are still mounted on disk", "podUID", podUID, "paths", volumePaths)
|
||||
return true
|
||||
}
|
||||
|
||||
|
|
@ -116,11 +116,11 @@ func (kl *Kubelet) newVolumeMounterFromPlugins(spec *volume.Spec, pod *v1.Pod) (
|
|||
// when this is called, so it effectively does a recursive rmdir instead of
|
||||
// RemoveAll to ensure it only removes empty directories and files that were
|
||||
// used as mount points, but not content of the mount points.
|
||||
func (kl *Kubelet) removeOrphanedPodVolumeDirs(uid types.UID) []error {
|
||||
func (kl *Kubelet) removeOrphanedPodVolumeDirs(logger klog.Logger, uid types.UID) []error {
|
||||
orphanVolumeErrors := []error{}
|
||||
|
||||
// If there are still volume directories, attempt to rmdir them
|
||||
volumePaths, err := kl.getPodVolumePathListFromDisk(uid)
|
||||
volumePaths, err := kl.getPodVolumePathListFromDisk(logger, uid)
|
||||
if err != nil {
|
||||
orphanVolumeErrors = append(orphanVolumeErrors, fmt.Errorf("orphaned pod %q found, but error occurred during reading volume dir from disk: %v", uid, err))
|
||||
return orphanVolumeErrors
|
||||
|
|
@ -136,7 +136,7 @@ func (kl *Kubelet) removeOrphanedPodVolumeDirs(uid types.UID) []error {
|
|||
}
|
||||
|
||||
// If there are any volume-subpaths, attempt to remove them
|
||||
subpathVolumePaths, err := kl.getPodVolumeSubpathListFromDisk(uid)
|
||||
subpathVolumePaths, err := kl.getPodVolumeSubpathListFromDisk(logger, uid)
|
||||
if err != nil {
|
||||
orphanVolumeErrors = append(orphanVolumeErrors, fmt.Errorf("orphaned pod %q found, but error occurred during reading of volume-subpaths dir from disk: %v", uid, err))
|
||||
return orphanVolumeErrors
|
||||
|
|
@ -166,7 +166,7 @@ func (kl *Kubelet) removeOrphanedPodVolumeDirs(uid types.UID) []error {
|
|||
|
||||
// cleanupOrphanedPodDirs removes the volumes of pods that should not be
|
||||
// running and that have no containers running. Note that we roll up logs here since it runs in the main loop.
|
||||
func (kl *Kubelet) cleanupOrphanedPodDirs(pods []*v1.Pod, runningPods []*kubecontainer.Pod) error {
|
||||
func (kl *Kubelet) cleanupOrphanedPodDirs(logger klog.Logger, pods []*v1.Pod, runningPods []*kubecontainer.Pod) error {
|
||||
allPods := sets.New[string]()
|
||||
for _, pod := range pods {
|
||||
allPods.Insert(string(pod.UID))
|
||||
|
|
@ -195,14 +195,14 @@ func (kl *Kubelet) cleanupOrphanedPodDirs(pods []*v1.Pod, runningPods []*kubecon
|
|||
// Doing so may result in corruption of data.
|
||||
// TODO: getMountedVolumePathListFromDisk() call may be redundant with
|
||||
// kl.getPodVolumePathListFromDisk(). Can this be cleaned up?
|
||||
if podVolumesExist := kl.podVolumesExist(uid); podVolumesExist {
|
||||
if podVolumesExist := kl.podVolumesExist(logger, uid); podVolumesExist {
|
||||
errorPods++
|
||||
klog.V(3).InfoS("Orphaned pod found, but volumes are not cleaned up", "podUID", uid)
|
||||
logger.V(3).Info("Orphaned pod found, but volumes are not cleaned up", "podUID", uid)
|
||||
continue
|
||||
}
|
||||
|
||||
// Attempt to remove the pod volumes directory and its subdirectories
|
||||
podVolumeErrors := kl.removeOrphanedPodVolumeDirs(uid)
|
||||
podVolumeErrors := kl.removeOrphanedPodVolumeDirs(logger, uid)
|
||||
if len(podVolumeErrors) > 0 {
|
||||
errorPods++
|
||||
orphanVolumeErrors = append(orphanVolumeErrors, podVolumeErrors...)
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
_ "k8s.io/kubernetes/pkg/apis/core/install"
|
||||
"k8s.io/mount-utils"
|
||||
)
|
||||
|
|
@ -170,6 +171,7 @@ func TestCleanupOrphanedPodDirs(t *testing.T) {
|
|||
|
||||
for name, tc := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
|
||||
defer testKubelet.Cleanup()
|
||||
kubelet := testKubelet.kubelet
|
||||
|
|
@ -180,7 +182,7 @@ func TestCleanupOrphanedPodDirs(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
err := kubelet.cleanupOrphanedPodDirs(tc.pods, nil)
|
||||
err := kubelet.cleanupOrphanedPodDirs(logger, tc.pods, nil)
|
||||
if tc.expectErr && err == nil {
|
||||
t.Errorf("%s failed: expected error, got success", name)
|
||||
}
|
||||
|
|
@ -292,6 +294,7 @@ func TestPodVolumesExistWithMount(t *testing.T) {
|
|||
|
||||
for name, tc := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
|
||||
defer testKubelet.Cleanup()
|
||||
kubelet := testKubelet.kubelet
|
||||
|
|
@ -302,7 +305,7 @@ func TestPodVolumesExistWithMount(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
exist := kubelet.podVolumesExist(poduid)
|
||||
exist := kubelet.podVolumesExist(logger, poduid)
|
||||
if tc.expected != exist {
|
||||
t.Errorf("%s failed: expected %t, got %t", name, tc.expected, exist)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -198,18 +198,18 @@ func TestPodVolumesExist(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
tCtx := ktesting.Init(t)
|
||||
logger, tCtx := ktesting.NewTestContext(t)
|
||||
defer tCtx.Cancel("test has completed")
|
||||
go kubelet.volumeManager.Run(tCtx, kubelet.sourcesReady)
|
||||
|
||||
kubelet.podManager.SetPods(pods)
|
||||
for _, pod := range pods {
|
||||
err := kubelet.volumeManager.WaitForAttachAndMount(context.Background(), pod)
|
||||
err := kubelet.volumeManager.WaitForAttachAndMount(tCtx, pod)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
for _, pod := range pods {
|
||||
podVolumesExist := kubelet.podVolumesExist(pod.UID)
|
||||
podVolumesExist := kubelet.podVolumesExist(logger, pod.UID)
|
||||
assert.True(t, podVolumesExist, "pod %q", pod.UID)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue