From e94e67337dfe4864665db25b3d3d1957c6234a92 Mon Sep 17 00:00:00 2001 From: hoteye Date: Fri, 13 Feb 2026 08:35:48 +0800 Subject: [PATCH 1/5] kubelet: tighten podworkers context flow and consolidate kubelet logcheck scope Propagate upper-level context through pod worker update and sync paths in the kubelet call flow touched by this PR. Also consolidate kubelet logcheck scope so non-contextual logging additions are blocked in pkg/kubelet while keeping behavior unchanged. --- hack/golangci-hints.yaml | 45 +----- hack/golangci.yaml | 45 +----- hack/logcheck.conf | 45 +----- pkg/kubelet/kubelet.go | 28 ++-- pkg/kubelet/kubelet_pods.go | 8 +- pkg/kubelet/kubelet_pods_test.go | 40 +++--- pkg/kubelet/kubelet_test.go | 133 +++++++++--------- pkg/kubelet/pod_workers.go | 188 +++++++++++++------------ pkg/kubelet/pod_workers_test.go | 226 ++++++++++++++++++------------- 9 files changed, 333 insertions(+), 425 deletions(-) diff --git a/hack/golangci-hints.yaml b/hack/golangci-hints.yaml index 730ca6416c7..f0292cb0732 100644 --- a/hack/golangci-hints.yaml +++ b/hack/golangci-hints.yaml @@ -322,50 +322,7 @@ linters: contextual k8s.io/kubernetes/pkg/securitycontext/.* contextual k8s.io/kubernetes/test/e2e/dra/.* contextual k8s.io/kubernetes/test/images/sample-device-plugin/.* - contextual k8s.io/kubernetes/pkg/kubelet/allocation/.* - contextual k8s.io/kubernetes/pkg/kubelet/apis/.* - contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.* - contextual k8s.io/kubernetes/pkg/kubelet/certificate/.* - contextual k8s.io/kubernetes/pkg/kubelet/checkpointmanager/.* - contextual k8s.io/kubernetes/pkg/kubelet/client/.* - contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.* - contextual k8s.io/kubernetes/pkg/kubelet/cm/.* - contextual k8s.io/kubernetes/pkg/kubelet/config/.* - contextual k8s.io/kubernetes/pkg/kubelet/configmap/.* - contextual k8s.io/kubernetes/pkg/kubelet/container/.* - contextual k8s.io/kubernetes/pkg/kubelet/envvars/.* - contextual k8s.io/kubernetes/pkg/kubelet/events/.* - contextual k8s.io/kubernetes/pkg/kubelet/eviction/.* - contextual k8s.io/kubernetes/pkg/kubelet/images/.* - contextual k8s.io/kubernetes/pkg/kubelet/kubeletconfig/.* - contextual k8s.io/kubernetes/pkg/kubelet/kuberuntime/.* - contextual k8s.io/kubernetes/pkg/kubelet/lifecycle/.* - contextual k8s.io/kubernetes/pkg/kubelet/logs/.* - contextual k8s.io/kubernetes/pkg/kubelet/metrics/.* - contextual k8s.io/kubernetes/pkg/kubelet/network/.* - contextual k8s.io/kubernetes/pkg/kubelet/nodeshutdown/.* - contextual k8s.io/kubernetes/pkg/kubelet/nodestatus/.* - contextual k8s.io/kubernetes/pkg/kubelet/oom/.* - contextual k8s.io/kubernetes/pkg/kubelet/pleg/.* - contextual k8s.io/kubernetes/pkg/kubelet/pluginmanager/.* - contextual k8s.io/kubernetes/pkg/kubelet/pod/.* - contextual k8s.io/kubernetes/pkg/kubelet/podcertificate/.* - contextual k8s.io/kubernetes/pkg/kubelet/preemption/.* - contextual k8s.io/kubernetes/pkg/kubelet/prober/.* - contextual k8s.io/kubernetes/pkg/kubelet/qos/.* - contextual k8s.io/kubernetes/pkg/kubelet/runtimeclass/.* - contextual k8s.io/kubernetes/pkg/kubelet/secret/.* - contextual k8s.io/kubernetes/pkg/kubelet/server/.* - contextual k8s.io/kubernetes/pkg/kubelet/stats/.* - contextual k8s.io/kubernetes/pkg/kubelet/status/.* - contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.* - contextual k8s.io/kubernetes/pkg/kubelet/token/.* - contextual k8s.io/kubernetes/pkg/kubelet/types/.* - contextual k8s.io/kubernetes/pkg/kubelet/userns/.* - contextual k8s.io/kubernetes/pkg/kubelet/util/.* - contextual k8s.io/kubernetes/pkg/kubelet/volumemanager/.* - contextual k8s.io/kubernetes/pkg/kubelet/watchdog/.* - contextual k8s.io/kubernetes/pkg/kubelet/winstats/.* + contextual k8s.io/kubernetes/pkg/kubelet/.* # As long as contextual logging is alpha or beta, all WithName, WithValues, # NewContext calls have to go through klog. Once it is GA, we can lift diff --git a/hack/golangci.yaml b/hack/golangci.yaml index f6988d1d73b..0a1bf73b109 100644 --- a/hack/golangci.yaml +++ b/hack/golangci.yaml @@ -331,50 +331,7 @@ linters: contextual k8s.io/kubernetes/pkg/securitycontext/.* contextual k8s.io/kubernetes/test/e2e/dra/.* contextual k8s.io/kubernetes/test/images/sample-device-plugin/.* - contextual k8s.io/kubernetes/pkg/kubelet/allocation/.* - contextual k8s.io/kubernetes/pkg/kubelet/apis/.* - contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.* - contextual k8s.io/kubernetes/pkg/kubelet/certificate/.* - contextual k8s.io/kubernetes/pkg/kubelet/checkpointmanager/.* - contextual k8s.io/kubernetes/pkg/kubelet/client/.* - contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.* - contextual k8s.io/kubernetes/pkg/kubelet/cm/.* - contextual k8s.io/kubernetes/pkg/kubelet/config/.* - contextual k8s.io/kubernetes/pkg/kubelet/configmap/.* - contextual k8s.io/kubernetes/pkg/kubelet/container/.* - contextual k8s.io/kubernetes/pkg/kubelet/envvars/.* - contextual k8s.io/kubernetes/pkg/kubelet/events/.* - contextual k8s.io/kubernetes/pkg/kubelet/eviction/.* - contextual k8s.io/kubernetes/pkg/kubelet/images/.* - contextual k8s.io/kubernetes/pkg/kubelet/kubeletconfig/.* - contextual k8s.io/kubernetes/pkg/kubelet/kuberuntime/.* - contextual k8s.io/kubernetes/pkg/kubelet/lifecycle/.* - contextual k8s.io/kubernetes/pkg/kubelet/logs/.* - contextual k8s.io/kubernetes/pkg/kubelet/metrics/.* - contextual k8s.io/kubernetes/pkg/kubelet/network/.* - contextual k8s.io/kubernetes/pkg/kubelet/nodeshutdown/.* - contextual k8s.io/kubernetes/pkg/kubelet/nodestatus/.* - contextual k8s.io/kubernetes/pkg/kubelet/oom/.* - contextual k8s.io/kubernetes/pkg/kubelet/pleg/.* - contextual k8s.io/kubernetes/pkg/kubelet/pluginmanager/.* - contextual k8s.io/kubernetes/pkg/kubelet/pod/.* - contextual k8s.io/kubernetes/pkg/kubelet/podcertificate/.* - contextual k8s.io/kubernetes/pkg/kubelet/preemption/.* - contextual k8s.io/kubernetes/pkg/kubelet/prober/.* - contextual k8s.io/kubernetes/pkg/kubelet/qos/.* - contextual k8s.io/kubernetes/pkg/kubelet/runtimeclass/.* - contextual k8s.io/kubernetes/pkg/kubelet/secret/.* - contextual k8s.io/kubernetes/pkg/kubelet/server/.* - contextual k8s.io/kubernetes/pkg/kubelet/stats/.* - contextual k8s.io/kubernetes/pkg/kubelet/status/.* - contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.* - contextual k8s.io/kubernetes/pkg/kubelet/token/.* - contextual k8s.io/kubernetes/pkg/kubelet/types/.* - contextual k8s.io/kubernetes/pkg/kubelet/userns/.* - contextual k8s.io/kubernetes/pkg/kubelet/util/.* - contextual k8s.io/kubernetes/pkg/kubelet/volumemanager/.* - contextual k8s.io/kubernetes/pkg/kubelet/watchdog/.* - contextual k8s.io/kubernetes/pkg/kubelet/winstats/.* + contextual k8s.io/kubernetes/pkg/kubelet/.* # As long as contextual logging is alpha or beta, all WithName, WithValues, # NewContext calls have to go through klog. Once it is GA, we can lift diff --git a/hack/logcheck.conf b/hack/logcheck.conf index e18e15bc373..930c039659d 100644 --- a/hack/logcheck.conf +++ b/hack/logcheck.conf @@ -64,50 +64,7 @@ contextual k8s.io/kubernetes/pkg/security/.* contextual k8s.io/kubernetes/pkg/securitycontext/.* contextual k8s.io/kubernetes/test/e2e/dra/.* contextual k8s.io/kubernetes/test/images/sample-device-plugin/.* -contextual k8s.io/kubernetes/pkg/kubelet/allocation/.* -contextual k8s.io/kubernetes/pkg/kubelet/apis/.* -contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.* -contextual k8s.io/kubernetes/pkg/kubelet/certificate/.* -contextual k8s.io/kubernetes/pkg/kubelet/checkpointmanager/.* -contextual k8s.io/kubernetes/pkg/kubelet/client/.* -contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.* -contextual k8s.io/kubernetes/pkg/kubelet/cm/.* -contextual k8s.io/kubernetes/pkg/kubelet/config/.* -contextual k8s.io/kubernetes/pkg/kubelet/configmap/.* -contextual k8s.io/kubernetes/pkg/kubelet/container/.* -contextual k8s.io/kubernetes/pkg/kubelet/envvars/.* -contextual k8s.io/kubernetes/pkg/kubelet/events/.* -contextual k8s.io/kubernetes/pkg/kubelet/eviction/.* -contextual k8s.io/kubernetes/pkg/kubelet/images/.* -contextual k8s.io/kubernetes/pkg/kubelet/kubeletconfig/.* -contextual k8s.io/kubernetes/pkg/kubelet/kuberuntime/.* -contextual k8s.io/kubernetes/pkg/kubelet/lifecycle/.* -contextual k8s.io/kubernetes/pkg/kubelet/logs/.* -contextual k8s.io/kubernetes/pkg/kubelet/metrics/.* -contextual k8s.io/kubernetes/pkg/kubelet/network/.* -contextual k8s.io/kubernetes/pkg/kubelet/nodeshutdown/.* -contextual k8s.io/kubernetes/pkg/kubelet/nodestatus/.* -contextual k8s.io/kubernetes/pkg/kubelet/oom/.* -contextual k8s.io/kubernetes/pkg/kubelet/pleg/.* -contextual k8s.io/kubernetes/pkg/kubelet/pluginmanager/.* -contextual k8s.io/kubernetes/pkg/kubelet/pod/.* -contextual k8s.io/kubernetes/pkg/kubelet/podcertificate/.* -contextual k8s.io/kubernetes/pkg/kubelet/preemption/.* -contextual k8s.io/kubernetes/pkg/kubelet/prober/.* -contextual k8s.io/kubernetes/pkg/kubelet/qos/.* -contextual k8s.io/kubernetes/pkg/kubelet/runtimeclass/.* -contextual k8s.io/kubernetes/pkg/kubelet/secret/.* -contextual k8s.io/kubernetes/pkg/kubelet/server/.* -contextual k8s.io/kubernetes/pkg/kubelet/stats/.* -contextual k8s.io/kubernetes/pkg/kubelet/status/.* -contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.* -contextual k8s.io/kubernetes/pkg/kubelet/token/.* -contextual k8s.io/kubernetes/pkg/kubelet/types/.* -contextual k8s.io/kubernetes/pkg/kubelet/userns/.* -contextual k8s.io/kubernetes/pkg/kubelet/util/.* -contextual k8s.io/kubernetes/pkg/kubelet/volumemanager/.* -contextual k8s.io/kubernetes/pkg/kubelet/watchdog/.* -contextual k8s.io/kubernetes/pkg/kubelet/winstats/.* +contextual k8s.io/kubernetes/pkg/kubelet/.* # As long as contextual logging is alpha or beta, all WithName, WithValues, # NewContext calls have to go through klog. Once it is GA, we can lift diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 5a11e482496..0ef90c56c5f 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1020,7 +1020,7 @@ func NewMainKubelet(ctx context.Context, // setup eviction manager evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, - killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock, kubeCfg.LocalStorageCapacityIsolation) + killPodNow(logger, klet.podWorkers, kubeDeps.Recorder), klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock, kubeCfg.LocalStorageCapacityIsolation) klet.evictionManager = evictionManager @@ -1061,7 +1061,7 @@ func NewMainKubelet(ctx context.Context, handlers = append(handlers, klet.containerManager.GetAllocateResourcesPodAdmitHandler()) - criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.getAllocatedPods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder) + criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.getAllocatedPods, killPodNow(logger, klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder) handlers = append(handlers, lifecycle.NewPredicateAdmitHandler(klet.GetCachedNode, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources)) // apply functional Option's for _, opt := range kubeDeps.Options { @@ -1100,7 +1100,7 @@ func NewMainKubelet(ctx context.Context, Recorder: kubeDeps.Recorder, NodeRef: nodeRef, GetPodsFunc: klet.GetActivePods, - KillPodFunc: killPodNow(klet.podWorkers, kubeDeps.Recorder), + KillPodFunc: killPodNow(logger, klet.podWorkers, kubeDeps.Recorder), SyncNodeStatusFunc: klet.syncNodeStatus, ShutdownGracePeriodRequested: kubeCfg.ShutdownGracePeriod.Duration, ShutdownGracePeriodCriticalPods: kubeCfg.ShutdownGracePeriodCriticalPods.Duration, @@ -2453,7 +2453,7 @@ func (kl *Kubelet) getPodsToSync() []*v1.Pod { // // deletePod returns an error if not all sources are ready or the pod is not // found in the runtime cache. -func (kl *Kubelet) deletePod(logger klog.Logger, pod *v1.Pod) error { +func (kl *Kubelet) deletePod(ctx context.Context, logger klog.Logger, pod *v1.Pod) error { if pod == nil { return fmt.Errorf("deletePod does not allow nil pod") } @@ -2463,7 +2463,7 @@ func (kl *Kubelet) deletePod(logger klog.Logger, pod *v1.Pod) error { return fmt.Errorf("skipping delete because sources aren't ready yet") } logger.V(3).Info("Pod has been deleted and must be killed", "pod", klog.KObj(pod), "podUID", pod.UID) - kl.podWorkers.UpdatePod(UpdatePodOptions{ + kl.podWorkers.UpdatePod(ctx, UpdatePodOptions{ Pod: pod, UpdateType: kubetypes.SyncPodKill, }) @@ -2631,7 +2631,7 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety if e.Type == pleg.ContainerDied { if containerID, ok := e.Data.(string); ok { - kl.cleanUpContainersInPod(e.ID, containerID) + kl.cleanUpContainersInPod(ctx, e.ID, containerID) } } case <-syncCh: @@ -2736,7 +2736,7 @@ func (kl *Kubelet) HandlePodAdditions(ctx context.Context, pods []*v1.Pod) { logger.V(2).Info("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) continue } - kl.podWorkers.UpdatePod(UpdatePodOptions{ + kl.podWorkers.UpdatePod(ctx, UpdatePodOptions{ Pod: pod, MirrorPod: mirrorPod, UpdateType: kubetypes.SyncPodUpdate, @@ -2776,7 +2776,7 @@ func (kl *Kubelet) HandlePodAdditions(ctx context.Context, pods []*v1.Pod) { } } } - kl.podWorkers.UpdatePod(UpdatePodOptions{ + kl.podWorkers.UpdatePod(ctx, UpdatePodOptions{ Pod: pod, MirrorPod: mirrorPod, UpdateType: kubetypes.SyncPodCreate, @@ -2848,7 +2848,7 @@ func (kl *Kubelet) HandlePodUpdates(ctx context.Context, pods []*v1.Pod) { } } - kl.podWorkers.UpdatePod(UpdatePodOptions{ + kl.podWorkers.UpdatePod(ctx, UpdatePodOptions{ Pod: pod, MirrorPod: mirrorPod, UpdateType: kubetypes.SyncPodUpdate, @@ -2959,7 +2959,7 @@ func (kl *Kubelet) HandlePodRemoves(ctx context.Context, pods []*v1.Pod) { logger.V(2).Info("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) continue } - kl.podWorkers.UpdatePod(UpdatePodOptions{ + kl.podWorkers.UpdatePod(ctx, UpdatePodOptions{ Pod: pod, MirrorPod: mirrorPod, UpdateType: kubetypes.SyncPodUpdate, @@ -2970,7 +2970,7 @@ func (kl *Kubelet) HandlePodRemoves(ctx context.Context, pods []*v1.Pod) { // Deletion is allowed to fail because the periodic cleanup routine // will trigger deletion again. - if err := kl.deletePod(logger, pod); err != nil { + if err := kl.deletePod(ctx, logger, pod); err != nil { logger.V(2).Info("Failed to delete pod", "pod", klog.KObj(pod), "err", err) } } @@ -3035,7 +3035,7 @@ func (kl *Kubelet) HandlePodReconcile(ctx context.Context, pods []*v1.Pod) { // be different than Sync, or if there is a better place for it. For instance, we have // needsReconcile in kubelet/config, here, and in status_manager. if status.NeedToReconcilePodReadiness(pod) { - kl.podWorkers.UpdatePod(UpdatePodOptions{ + kl.podWorkers.UpdatePod(ctx, UpdatePodOptions{ Pod: pod, MirrorPod: mirrorPod, UpdateType: kubetypes.SyncPodSync, @@ -3080,7 +3080,7 @@ func (kl *Kubelet) HandlePodSyncs(ctx context.Context, pods []*v1.Pod) { logger.V(3).Info("Programmer error, HandlePodSyncs does not expect to receive mirror pods", "podUID", pod.UID, "mirrorPodUID", mirrorPod.UID) continue } - kl.podWorkers.UpdatePod(UpdatePodOptions{ + kl.podWorkers.UpdatePod(ctx, UpdatePodOptions{ Pod: pod, MirrorPod: mirrorPod, UpdateType: kubetypes.SyncPodSync, @@ -3220,7 +3220,7 @@ func (kl *Kubelet) ListenAndServePodResources(ctx context.Context) { } // Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around. -func (kl *Kubelet) cleanUpContainersInPod(podID types.UID, exitedContainerID string) { +func (kl *Kubelet) cleanUpContainersInPod(ctx context.Context, podID types.UID, exitedContainerID string) { if podStatus, err := kl.podCache.Get(podID); err == nil { // When an evicted or deleted pod has already synced, all containers can be removed. removeAll := kl.podWorkers.ShouldPodContentBeRemoved(podID) diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index 94f2a2d7208..cf6a74a3ac3 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -1223,7 +1223,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error { // Stop the workers for terminated pods not in the config source logger.V(3).Info("Clean up pod workers for terminated pods") - workingPods := kl.podWorkers.SyncKnownPods(allPods) + workingPods := kl.podWorkers.SyncKnownPods(logger, allPods) // Reconcile: At this point the pod workers have been pruned to the set of // desired pods. Pods that must be restarted due to UID reuse, or leftover @@ -1348,7 +1348,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error { logger.V(2).Info("Programmer error, restartable pod was a mirror pod but activePods should never contain a mirror pod", "podUID", desiredPod.UID) continue } - kl.podWorkers.UpdatePod(UpdatePodOptions{ + kl.podWorkers.UpdatePod(ctx, UpdatePodOptions{ UpdateType: kubetypes.SyncPodCreate, Pod: pod, MirrorPod: mirrorPod, @@ -1377,7 +1377,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error { // next invocation of HandlePodCleanups. for _, pod := range kl.filterTerminalPodsToDelete(allPods, runningRuntimePods, workingPods) { logger.V(3).Info("Handling termination and deletion of the pod to pod workers", "pod", klog.KObj(pod), "podUID", pod.UID) - kl.podWorkers.UpdatePod(UpdatePodOptions{ + kl.podWorkers.UpdatePod(ctx, UpdatePodOptions{ UpdateType: kubetypes.SyncPodKill, Pod: pod, }) @@ -1400,7 +1400,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error { PodTerminationGracePeriodSecondsOverride: &one, } logger.V(2).Info("Clean up containers for orphaned pod we had not seen before", "podUID", runningPod.ID, "killPodOptions", killPodOptions) - kl.podWorkers.UpdatePod(UpdatePodOptions{ + kl.podWorkers.UpdatePod(ctx, UpdatePodOptions{ UpdateType: kubetypes.SyncPodKill, RunningPod: runningPod, KillPodOptions: killPodOptions, diff --git a/pkg/kubelet/kubelet_pods_test.go b/pkg/kubelet/kubelet_pods_test.go index 119270572ba..f5852f6e69c 100644 --- a/pkg/kubelet/kubelet_pods_test.go +++ b/pkg/kubelet/kubelet_pods_test.go @@ -49,7 +49,6 @@ import ( featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/component-base/metrics/testutil" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" - "k8s.io/klog/v2" "k8s.io/kubelet/pkg/cri/streaming/portforward" "k8s.io/kubelet/pkg/cri/streaming/remotecommand" _ "k8s.io/kubernetes/pkg/apis/core/install" @@ -4858,11 +4857,12 @@ func Test_generateAPIPodStatus(t *testing.T) { for _, test := range tests { for _, enablePodReadyToStartContainersCondition := range []bool{false, true} { t.Run(test.name, func(t *testing.T) { - logger, tCtx := ktesting.NewTestContext(t) + tCtx := ktesting.Init(t) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodReadyToStartContainersCondition, enablePodReadyToStartContainersCondition) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kl := testKubelet.kubelet + logger, _ := ktesting.NewTestContext(t) kl.statusManager.SetPodStatus(logger, test.pod, test.previousStatus) for _, name := range test.unreadyContainer { kl.readinessManager.Set(kubecontainer.BuildContainerID("", findContainerStatusByName(test.expected, name).ContainerID), results.Failure, test.pod) @@ -6450,7 +6450,7 @@ func TestKubelet_HandlePodCleanups(t *testing.T) { prepareWorker: func(t *testing.T, w *podWorkers, records map[types.UID][]syncPodRecord) { // send a create pod := simplePod() - w.UpdatePod(UpdatePodOptions{ + w.UpdatePod(tCtx, UpdatePodOptions{ UpdateType: kubetypes.SyncPodCreate, StartTime: time.Unix(1, 0).UTC(), Pod: pod, @@ -6475,7 +6475,7 @@ func TestKubelet_HandlePodCleanups(t *testing.T) { }, }, } - w.UpdatePod(UpdatePodOptions{ + w.UpdatePod(tCtx, UpdatePodOptions{ UpdateType: kubetypes.SyncPodKill, StartTime: time.Unix(3, 0).UTC(), Pod: updatedPod, @@ -6851,7 +6851,7 @@ func TestKubelet_HandlePodCleanups(t *testing.T) { }, }, } - w.UpdatePod(UpdatePodOptions{ + w.UpdatePod(tCtx, UpdatePodOptions{ UpdateType: kubetypes.SyncPodCreate, StartTime: time.Unix(1, 0).UTC(), Pod: pod, @@ -6874,7 +6874,7 @@ func TestKubelet_HandlePodCleanups(t *testing.T) { }, }, } - w.UpdatePod(UpdatePodOptions{ + w.UpdatePod(tCtx, UpdatePodOptions{ UpdateType: kubetypes.SyncPodKill, StartTime: time.Unix(3, 0).UTC(), Pod: updatedPod, @@ -6923,7 +6923,7 @@ func TestKubelet_HandlePodCleanups(t *testing.T) { // block startup of the static pod due to full name collision w.startedStaticPodsByFullname[kubecontainer.GetPodFullName(pod)] = types.UID("2") - w.UpdatePod(UpdatePodOptions{ + w.UpdatePod(tCtx, UpdatePodOptions{ UpdateType: kubetypes.SyncPodCreate, StartTime: time.Unix(1, 0).UTC(), Pod: pod, @@ -6996,7 +6996,7 @@ func TestKubelet_HandlePodCleanups(t *testing.T) { // block startup of the static pod due to full name collision w.startedStaticPodsByFullname[kubecontainer.GetPodFullName(pod)] = types.UID("2") - w.UpdatePod(UpdatePodOptions{ + w.UpdatePod(tCtx, UpdatePodOptions{ UpdateType: kubetypes.SyncPodCreate, StartTime: time.Unix(1, 0).UTC(), Pod: pod, @@ -7059,7 +7059,7 @@ func TestKubelet_HandlePodCleanups(t *testing.T) { // block startup of the static pod due to full name collision w.startedStaticPodsByFullname[kubecontainer.GetPodFullName(pod)] = types.UID("2") - w.UpdatePod(UpdatePodOptions{ + w.UpdatePod(tCtx, UpdatePodOptions{ UpdateType: kubetypes.SyncPodCreate, StartTime: time.Unix(1, 0).UTC(), Pod: pod, @@ -7104,7 +7104,7 @@ func TestKubelet_HandlePodCleanups(t *testing.T) { // send a create of a static pod pod := staticPod() - w.UpdatePod(UpdatePodOptions{ + w.UpdatePod(tCtx, UpdatePodOptions{ UpdateType: kubetypes.SyncPodCreate, StartTime: time.Unix(1, 0).UTC(), Pod: pod, @@ -7112,14 +7112,14 @@ func TestKubelet_HandlePodCleanups(t *testing.T) { drainAllWorkers(w) // terminate the pod (which won't complete) and then deliver a recreate by that same UID - w.UpdatePod(UpdatePodOptions{ + w.UpdatePod(tCtx, UpdatePodOptions{ UpdateType: kubetypes.SyncPodKill, StartTime: time.Unix(2, 0).UTC(), Pod: pod, }) pod = staticPod() pod.Annotations["version"] = "2" - w.UpdatePod(UpdatePodOptions{ + w.UpdatePod(tCtx, UpdatePodOptions{ UpdateType: kubetypes.SyncPodCreate, StartTime: time.Unix(3, 0).UTC(), Pod: pod, @@ -7213,7 +7213,7 @@ func TestKubelet_HandlePodCleanups(t *testing.T) { // send a create of a static pod pod := staticPod() - w.UpdatePod(UpdatePodOptions{ + w.UpdatePod(tCtx, UpdatePodOptions{ UpdateType: kubetypes.SyncPodCreate, StartTime: time.Unix(1, 0).UTC(), Pod: pod, @@ -7256,7 +7256,7 @@ func TestKubelet_HandlePodCleanups(t *testing.T) { // send a create of a static pod pod := staticPod() - w.UpdatePod(UpdatePodOptions{ + w.UpdatePod(tCtx, UpdatePodOptions{ UpdateType: kubetypes.SyncPodCreate, StartTime: time.Unix(1, 0).UTC(), Pod: pod, @@ -7343,19 +7343,19 @@ func TestKubelet_HandlePodCleanups(t *testing.T) { prepareWorker: func(t *testing.T, w *podWorkers, records map[types.UID][]syncPodRecord) { // simulate a delete and recreate of the static pod pod := simplePod() - w.UpdatePod(UpdatePodOptions{ + w.UpdatePod(tCtx, UpdatePodOptions{ UpdateType: kubetypes.SyncPodCreate, StartTime: time.Unix(1, 0).UTC(), Pod: pod, }) drainAllWorkers(w) - w.UpdatePod(UpdatePodOptions{ + w.UpdatePod(tCtx, UpdatePodOptions{ UpdateType: kubetypes.SyncPodKill, Pod: pod, }) pod2 := simplePod() pod2.Annotations = map[string]string{"version": "2"} - w.UpdatePod(UpdatePodOptions{ + w.UpdatePod(tCtx, UpdatePodOptions{ UpdateType: kubetypes.SyncPodCreate, Pod: pod2, }) @@ -7444,7 +7444,8 @@ func TestKubelet_HandlePodCleanups(t *testing.T) { defer testKubelet.Cleanup() kl := testKubelet.kubelet - podWorkers, _, processed := createPodWorkers() + logger, _ := ktesting.NewTestContext(t) + podWorkers, _, processed := createPodWorkers(logger) kl.podWorkers = podWorkers originalPodSyncer := podWorkers.podSyncer syncFuncs := newPodSyncerFuncs(originalPodSyncer) @@ -7517,8 +7518,7 @@ func testMetric(t *testing.T, metricName string, expectedMetric string) { } func TestGetNonExistentImagePullSecret(t *testing.T) { - _, tCtx := ktesting.NewTestContext(t) - logger := klog.FromContext(tCtx) + logger, _ := ktesting.NewTestContext(t) secrets := make([]*v1.Secret, 0) fakeRecorder := record.NewFakeRecorder(1) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 69103b8b226..ee3d90b4414 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -149,7 +149,8 @@ type fakeImageGCManager struct { } func (f *fakeImageGCManager) GetImageList() ([]kubecontainer.Image, error) { - return f.fakeImageService.ListImages(context.Background()) + // ImageGCManager interface does not accept a context parameter. + return f.fakeImageService.ListImages(context.TODO()) } type TestKubelet struct { @@ -393,7 +394,7 @@ func newTestKubeletWithImageList( } // setup eviction manager evictionManager, evictionAdmitHandler := eviction.NewManager(kubelet.resourceAnalyzer, eviction.Config{}, - killPodNow(kubelet.podWorkers, fakeRecorder), kubelet.imageManager, kubelet.containerGC, fakeRecorder, nodeRef, kubelet.clock, kubelet.supportLocalStorageCapacityIsolation()) + killPodNow(logger, kubelet.podWorkers, fakeRecorder), kubelet.imageManager, kubelet.containerGC, fakeRecorder, nodeRef, kubelet.clock, kubelet.supportLocalStorageCapacityIsolation()) kubelet.evictionManager = evictionManager handlers := []lifecycle.PodAdmitHandler{} @@ -405,7 +406,7 @@ func newTestKubeletWithImageList( Recorder: fakeRecorder, NodeRef: nodeRef, GetPodsFunc: kubelet.podManager.GetPods, - KillPodFunc: killPodNow(kubelet.podWorkers, fakeRecorder), + KillPodFunc: killPodNow(logger, kubelet.podWorkers, fakeRecorder), SyncNodeStatusFunc: func(context.Context) {}, ShutdownGracePeriodRequested: 0, ShutdownGracePeriodCriticalPods: 0, @@ -483,7 +484,7 @@ func newTestPods(count int) []*v1.Pod { } func TestSyncLoopAbort(t *testing.T) { - ctx := context.Background() + tCtx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet @@ -496,15 +497,15 @@ func TestSyncLoopAbort(t *testing.T) { close(ch) // sanity check (also prevent this test from hanging in the next step) - ok := kubelet.syncLoopIteration(ctx, ch, kubelet, make(chan time.Time), make(chan time.Time), make(chan *pleg.PodLifecycleEvent, 1)) + ok := kubelet.syncLoopIteration(tCtx, ch, kubelet, make(chan time.Time), make(chan time.Time), make(chan *pleg.PodLifecycleEvent, 1)) require.False(t, ok, "Expected syncLoopIteration to return !ok since update chan was closed") // this should terminate immediately; if it hangs then the syncLoopIteration isn't aborting properly - kubelet.syncLoop(ctx, ch, kubelet) + kubelet.syncLoop(tCtx, ch, kubelet) } func TestSyncPodsStartPod(t *testing.T) { - ctx := ktesting.Init(t) + tCtx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet @@ -517,12 +518,12 @@ func TestSyncPodsStartPod(t *testing.T) { }), } kubelet.podManager.SetPods(pods) - kubelet.HandlePodSyncs(ctx, pods) + kubelet.HandlePodSyncs(tCtx, pods) fakeRuntime.AssertStartedPods([]string{string(pods[0].UID)}) } func TestHandlePodCleanupsPerQOS(t *testing.T) { - ctx := context.Background() + tCtx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() @@ -550,7 +551,7 @@ func TestHandlePodCleanupsPerQOS(t *testing.T) { // within a goroutine so a two second delay should be enough time to // mark the pod as killed (within this test case). - kubelet.HandlePodCleanups(ctx) + require.NoError(t, kubelet.HandlePodCleanups(tCtx)) // assert that unwanted pods were killed if actual, expected := kubelet.podWorkers.(*fakePodWorkers).triggeredDeletion, []types.UID{"12345678"}; !reflect.DeepEqual(actual, expected) { @@ -561,9 +562,9 @@ func TestHandlePodCleanupsPerQOS(t *testing.T) { // simulate Runtime.KillPod fakeRuntime.PodList = nil - kubelet.HandlePodCleanups(ctx) - kubelet.HandlePodCleanups(ctx) - kubelet.HandlePodCleanups(ctx) + require.NoError(t, kubelet.HandlePodCleanups(tCtx)) + require.NoError(t, kubelet.HandlePodCleanups(tCtx)) + require.NoError(t, kubelet.HandlePodCleanups(tCtx)) destroyCount := 0 err := wait.Poll(100*time.Millisecond, 10*time.Second, func() (bool, error) { @@ -587,6 +588,7 @@ func TestHandlePodCleanupsPerQOS(t *testing.T) { } func TestDispatchWorkOfCompletedPod(t *testing.T) { + tCtx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet @@ -657,7 +659,7 @@ func TestDispatchWorkOfCompletedPod(t *testing.T) { }, } for _, pod := range pods { - kubelet.podWorkers.UpdatePod(UpdatePodOptions{ + kubelet.podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: pod, UpdateType: kubetypes.SyncPodSync, StartTime: time.Now(), @@ -670,6 +672,7 @@ func TestDispatchWorkOfCompletedPod(t *testing.T) { } func TestDispatchWorkOfActivePod(t *testing.T) { + tCtx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet @@ -715,7 +718,7 @@ func TestDispatchWorkOfActivePod(t *testing.T) { } for _, pod := range pods { - kubelet.podWorkers.UpdatePod(UpdatePodOptions{ + kubelet.podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: pod, UpdateType: kubetypes.SyncPodSync, StartTime: time.Now(), @@ -757,7 +760,7 @@ func TestHandlePodCleanups(t *testing.T) { } func TestVolumeAttachLimitExceededCleanup(t *testing.T) { - ctx := ktesting.Init(t) + tCtx := ktesting.Init(t) const podCount = 500 tk := newTestKubelet(t, true /* controller-attach-detach enabled */) defer tk.Cleanup() @@ -786,11 +789,11 @@ func TestVolumeAttachLimitExceededCleanup(t *testing.T) { pods, _ := newTestPodsWithResources(podCount) kl.podManager.SetPods(pods) - kl.HandlePodSyncs(ctx, pods) + kl.HandlePodSyncs(tCtx, pods) // all pods must reach a terminal, Failed state due to VolumeAttachmentLimitExceeded. if err := wait.PollUntilContextTimeout( - ctx, 200*time.Millisecond, 30*time.Second, true, + tCtx, 200*time.Millisecond, 30*time.Second, true, func(ctx context.Context) (bool, error) { for _, p := range pods { st, ok := kl.statusManager.GetPodStatus(p.UID) @@ -805,7 +808,7 @@ func TestVolumeAttachLimitExceededCleanup(t *testing.T) { // validate that SyncTerminatedPod completed successfully for each pod. if err := wait.PollUntilContextTimeout( - ctx, 200*time.Millisecond, 30*time.Second, true, + tCtx, 200*time.Millisecond, 30*time.Second, true, func(ctx context.Context) (bool, error) { for _, p := range pods { if !kl.podWorkers.ShouldPodBeFinished(p.UID) { @@ -858,7 +861,7 @@ func TestHandlePodRemovesWhenSourcesAreReady(t *testing.T) { if testing.Short() { t.Skip("skipping test in short mode.") } - ctx := ktesting.Init(t) + tCtx := ktesting.Init(t) ready := false @@ -885,7 +888,7 @@ func TestHandlePodRemovesWhenSourcesAreReady(t *testing.T) { kubelet := testKubelet.kubelet kubelet.sourcesReady = config.NewSourcesReady(func(_ sets.Set[string]) bool { return ready }) - kubelet.HandlePodRemoves(ctx, pods) + kubelet.HandlePodRemoves(tCtx, pods) time.Sleep(2 * time.Second) // Sources are not ready yet. Don't remove any pods. @@ -894,7 +897,7 @@ func TestHandlePodRemovesWhenSourcesAreReady(t *testing.T) { } ready = true - kubelet.HandlePodRemoves(ctx, pods) + kubelet.HandlePodRemoves(tCtx, pods) time.Sleep(2 * time.Second) // Sources are ready. Remove unwanted pods. @@ -929,7 +932,7 @@ func checkPodStatus(t *testing.T, kl *Kubelet, pod *v1.Pod, phase v1.PodPhase) { // Tests that we handle port conflicts correctly by setting the failed status in status map. func TestHandlePortConflicts(t *testing.T) { - ctx := ktesting.Init(t) + tCtx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kl := testKubelet.kubelet @@ -971,7 +974,7 @@ func TestHandlePortConflicts(t *testing.T) { pods[1].UID: true, } - kl.HandlePodAdditions(ctx, pods) + kl.HandlePodAdditions(tCtx, pods) // Check pod status stored in the status map. checkPodStatus(t, kl, notfittingPod, v1.PodFailed) @@ -980,7 +983,7 @@ func TestHandlePortConflicts(t *testing.T) { // Tests that we handle host name conflicts correctly by setting the failed status in status map. func TestHandleHostNameConflicts(t *testing.T) { - ctx := ktesting.Init(t) + tCtx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kl := testKubelet.kubelet @@ -1015,7 +1018,7 @@ func TestHandleHostNameConflicts(t *testing.T) { notfittingPod := pods[0] fittingPod := pods[1] - kl.HandlePodAdditions(ctx, pods) + kl.HandlePodAdditions(tCtx, pods) // Check pod status stored in the status map. checkPodStatus(t, kl, notfittingPod, v1.PodFailed) @@ -1024,7 +1027,7 @@ func TestHandleHostNameConflicts(t *testing.T) { // Tests that we handle not matching labels selector correctly by setting the failed status in status map. func TestHandleNodeSelector(t *testing.T) { - ctx := ktesting.Init(t) + tCtx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kl := testKubelet.kubelet @@ -1058,7 +1061,7 @@ func TestHandleNodeSelector(t *testing.T) { notfittingPod := pods[0] fittingPod := pods[1] - kl.HandlePodAdditions(ctx, pods) + kl.HandlePodAdditions(tCtx, pods) // Check pod status stored in the status map. checkPodStatus(t, kl, notfittingPod, v1.PodFailed) @@ -1095,7 +1098,7 @@ func TestHandleNodeSelectorBasedOnOS(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - ctx := ktesting.Init(t) + tCtx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kl := testKubelet.kubelet @@ -1123,7 +1126,7 @@ func TestHandleNodeSelectorBasedOnOS(t *testing.T) { pod := podWithUIDNameNsSpec("123456789", "podA", "foo", v1.PodSpec{NodeSelector: test.podSelector}) - kl.HandlePodAdditions(ctx, []*v1.Pod{pod}) + kl.HandlePodAdditions(tCtx, []*v1.Pod{pod}) // Check pod status stored in the status map. checkPodStatus(t, kl, pod, test.podStatus) @@ -1133,7 +1136,7 @@ func TestHandleNodeSelectorBasedOnOS(t *testing.T) { // Tests that we handle exceeded resources correctly by setting the failed status in status map. func TestHandleMemExceeded(t *testing.T) { - ctx := ktesting.Init(t) + tCtx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kl := testKubelet.kubelet @@ -1179,7 +1182,7 @@ func TestHandleMemExceeded(t *testing.T) { pods[1].UID: true, } - kl.HandlePodAdditions(ctx, pods) + kl.HandlePodAdditions(tCtx, pods) // Check pod status stored in the status map. checkPodStatus(t, kl, notfittingPod, v1.PodFailed) @@ -1189,7 +1192,7 @@ func TestHandleMemExceeded(t *testing.T) { // Tests that we handle result of interface UpdatePluginResources correctly // by setting corresponding status in status map. func TestHandlePluginResources(t *testing.T) { - ctx := ktesting.Init(t) + tCtx := ktesting.Init(t) testKubelet := newTestKubeletExcludeAdmitHandlers(t, false /* controllerAttachDetachEnabled */, false /*enableResizing*/) defer testKubelet.Cleanup() kl := testKubelet.kubelet @@ -1311,7 +1314,7 @@ func TestHandlePluginResources(t *testing.T) { missingPod := podWithUIDNameNsSpec("3", "missingpod", "foo", missingPodSpec) failedPod := podWithUIDNameNsSpec("4", "failedpod", "foo", failedPodSpec) - kl.HandlePodAdditions(ctx, []*v1.Pod{fittingPod, emptyPod, missingPod, failedPod}) + kl.HandlePodAdditions(tCtx, []*v1.Pod{fittingPod, emptyPod, missingPod, failedPod}) // Check pod status stored in the status map. checkPodStatus(t, kl, fittingPod, v1.PodPending) @@ -1322,7 +1325,7 @@ func TestHandlePluginResources(t *testing.T) { // TODO(filipg): This test should be removed once StatusSyncer can do garbage collection without external signal. func TestPurgingObsoleteStatusMapEntries(t *testing.T) { - ctx := ktesting.Init(t) + tCtx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() @@ -1333,13 +1336,13 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) { } podToTest := pods[1] // Run once to populate the status map. - kl.HandlePodAdditions(ctx, pods) + kl.HandlePodAdditions(tCtx, pods) if _, found := kl.statusManager.GetPodStatus(podToTest.UID); !found { t.Fatalf("expected to have status cached for pod2") } // Sync with empty pods so that the entry in status map will be removed. kl.podManager.SetPods([]*v1.Pod{}) - kl.HandlePodCleanups(ctx) + require.NoError(t, kl.HandlePodCleanups(tCtx)) if _, found := kl.statusManager.GetPodStatus(podToTest.UID); found { t.Fatalf("expected to not have status cached for pod2") } @@ -1837,7 +1840,7 @@ func TestCheckpointContainer(t *testing.T) { } func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) { - ctx := ktesting.Init(t) + tCtx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() fakeRuntime := testKubelet.fakeRuntime @@ -1878,7 +1881,7 @@ func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) { } // Let the pod worker sets the status to fail after this sync. - kubelet.HandlePodUpdates(ctx, pods) + kubelet.HandlePodUpdates(tCtx, pods) status, found := kubelet.statusManager.GetPodStatus(pods[0].UID) assert.True(t, found, "expected to found status for pod %q", pods[0].UID) assert.Equal(t, v1.PodFailed, status.Phase) @@ -1887,7 +1890,7 @@ func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) { } func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) { - ctx := ktesting.Init(t) + tCtx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() fakeRuntime := testKubelet.fakeRuntime @@ -1929,7 +1932,7 @@ func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) { } kubelet.podManager.SetPods(pods) - kubelet.HandlePodUpdates(ctx, pods) + kubelet.HandlePodUpdates(tCtx, pods) status, found := kubelet.statusManager.GetPodStatus(pods[0].UID) assert.True(t, found, "expected to found status for pod %q", pods[0].UID) assert.NotEqual(t, v1.PodFailed, status.Phase) @@ -1953,7 +1956,7 @@ func podWithUIDNameNsSpec(uid types.UID, name, namespace string, spec v1.PodSpec } func TestDeletePodDirsForDeletedPods(t *testing.T) { - ctx := ktesting.Init(t) + tCtx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kl := testKubelet.kubelet @@ -1964,26 +1967,26 @@ func TestDeletePodDirsForDeletedPods(t *testing.T) { kl.podManager.SetPods(pods) // Sync to create pod directories. - kl.HandlePodSyncs(ctx, kl.podManager.GetPods()) + kl.HandlePodSyncs(tCtx, kl.podManager.GetPods()) for i := range pods { assert.True(t, dirExists(kl.getPodDir(pods[i].UID)), "Expected directory to exist for pod %d", i) } // Pod 1 has been deleted and no longer exists. kl.podManager.SetPods([]*v1.Pod{pods[0]}) - kl.HandlePodCleanups(ctx) + require.NoError(t, kl.HandlePodCleanups(tCtx)) assert.True(t, dirExists(kl.getPodDir(pods[0].UID)), "Expected directory to exist for pod 0") assert.False(t, dirExists(kl.getPodDir(pods[1].UID)), "Expected directory to be deleted for pod 1") } func syncAndVerifyPodDir(t *testing.T, testKubelet *TestKubelet, pods []*v1.Pod, podsToCheck []*v1.Pod, shouldExist bool) { - ctx := ktesting.Init(t) + tCtx := ktesting.Init(t) t.Helper() kl := testKubelet.kubelet kl.podManager.SetPods(pods) - kl.HandlePodSyncs(ctx, pods) - kl.HandlePodCleanups(ctx) + kl.HandlePodSyncs(tCtx, pods) + require.NoError(t, kl.HandlePodCleanups(tCtx)) for i, pod := range podsToCheck { exist := dirExists(kl.getPodDir(pod.UID)) assert.Equal(t, shouldExist, exist, "directory of pod %d", i) @@ -2716,7 +2719,7 @@ func (a *testPodAdmitHandler) Admit(attrs *lifecycle.PodAdmitAttributes) lifecyc // Test verifies that the kubelet invokes an admission handler during HandlePodAdditions. func TestHandlePodAdditionsInvokesPodAdmitHandlers(t *testing.T) { - ctx := ktesting.Init(t) + tCtx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kl := testKubelet.kubelet @@ -2753,7 +2756,7 @@ func TestHandlePodAdditionsInvokesPodAdmitHandlers(t *testing.T) { kl.allocationManager.AddPodAdmitHandlers(lifecycle.PodAdmitHandlers{&testPodAdmitHandler{podsToReject: podsToReject}}) - kl.HandlePodAdditions(ctx, pods) + kl.HandlePodAdditions(tCtx, pods) // Check pod status stored in the status map. checkPodStatus(t, kl, podToReject, v1.PodFailed) @@ -2956,7 +2959,7 @@ func TestPodResourceAllocationReset(t *testing.T) { } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - ctx := ktesting.Init(t) + tCtx := ktesting.Init(t) if tc.existingPodAllocation != nil { // when kubelet restarts, AllocatedResources has already existed before adding pod err := kubelet.allocationManager.SetAllocatedResources(tc.existingPodAllocation) @@ -2964,7 +2967,7 @@ func TestPodResourceAllocationReset(t *testing.T) { t.Fatalf("failed to set pod allocation: %v", err) } } - kubelet.HandlePodAdditions(ctx, []*v1.Pod{tc.pod}) + kubelet.HandlePodAdditions(tCtx, []*v1.Pod{tc.pod}) allocatedResources, found := kubelet.allocationManager.GetContainerResourceAllocation(tc.pod.UID, tc.pod.Spec.Containers[0].Name) if !found { @@ -3048,6 +3051,7 @@ func TestGenerateAPIPodStatusInvokesPodSyncHandlers(t *testing.T) { } func TestSyncTerminatingPodKillPod(t *testing.T) { + tCtx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kl := testKubelet.kubelet @@ -3062,7 +3066,7 @@ func TestSyncTerminatingPodKillPod(t *testing.T) { kl.podManager.SetPods(pods) podStatus := &kubecontainer.PodStatus{ID: pod.UID} gracePeriodOverride := int64(0) - err := kl.SyncTerminatingPod(context.Background(), pod, podStatus, &gracePeriodOverride, func(podStatus *v1.PodStatus) { + err := kl.SyncTerminatingPod(tCtx, pod, podStatus, &gracePeriodOverride, func(podStatus *v1.PodStatus) { podStatus.Phase = v1.PodFailed podStatus.Reason = "reason" podStatus.Message = "message" @@ -3074,7 +3078,6 @@ func TestSyncTerminatingPodKillPod(t *testing.T) { } func TestSyncLabels(t *testing.T) { - tCtx := ktesting.Init(t) tests := []struct { name string existingNode *v1.Node @@ -3104,6 +3107,7 @@ func TestSyncLabels(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + tCtx := ktesting.Init(t) testKubelet := newTestKubelet(t, false) defer testKubelet.Cleanup() kl := testKubelet.kubelet @@ -3368,8 +3372,8 @@ func TestNewMainKubeletStandAlone(t *testing.T) { assert.NotNil(t, testMainKubelet, "testMainKubelet should not be nil") testMainKubelet.BirthCry() - ctx := ktesting.Init(t) - testMainKubelet.StartGarbageCollection(ctx) + tCtx = ktesting.Init(t) + testMainKubelet.StartGarbageCollection(tCtx) // Nil pointer panic can be reproduced if configmap manager is not nil. // See https://github.com/kubernetes/kubernetes/issues/113492 // pod := &v1.Pod{ @@ -3494,7 +3498,7 @@ func TestSyncPodSpans(t *testing.T) { EnableServiceLinks: ptr.To(false), }) - _, err = kubelet.SyncPod(context.Background(), kubetypes.SyncPodCreate, pod, nil, &kubecontainer.PodStatus{}) + _, err = kubelet.SyncPod(tCtx, kubetypes.SyncPodCreate, pod, nil, &kubecontainer.PodStatus{}) require.NoError(t, err) assert.NotEmpty(t, exp.GetSpans()) @@ -3827,6 +3831,7 @@ func TestCrashLoopBackOffConfiguration(t *testing.T) { } func TestSyncPodWithErrorsDuringInPlacePodResize(t *testing.T) { + tCtx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet @@ -3919,7 +3924,7 @@ func TestSyncPodWithErrorsDuringInPlacePodResize(t *testing.T) { t.Run(tc.name, func(t *testing.T) { testKubelet.fakeRuntime.SyncResults = tc.syncResults testKubelet.fakeRuntime.PodResizeInProgress = tc.podResizeInProgress - isTerminal, err := kubelet.SyncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{}) + isTerminal, err := kubelet.SyncPod(tCtx, kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{}) require.False(t, isTerminal) if tc.expectedErr == "" { require.NoError(t, err) @@ -3953,7 +3958,7 @@ func TestSyncPodWithErrorsDuringInPlacePodResize(t *testing.T) { } func TestHandlePodUpdates_RecordContainerRequestedResizes(t *testing.T) { - ctx := ktesting.Init(t) + tCtx := ktesting.Init(t) metrics.Register() metrics.ContainerRequestedResizes.Reset() @@ -4535,7 +4540,7 @@ func TestHandlePodUpdates_RecordContainerRequestedResizes(t *testing.T) { kubelet.podManager.AddPod(initialPod) require.NoError(t, kubelet.allocationManager.SetAllocatedResources(initialPod)) - kubelet.HandlePodUpdates(ctx, []*v1.Pod{updatedPod}) + kubelet.HandlePodUpdates(tCtx, []*v1.Pod{updatedPod}) tc.updateExpectedFunc(&expectedMetrics) @@ -4598,7 +4603,7 @@ func TestHandlePodReconcile_RetryPendingResizes(t *testing.T) { t.Skip("InPlacePodVerticalScaling is not currently supported for Windows") } featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, true) - ctx := ktesting.Init(t) + tCtx := ktesting.Init(t) testKubelet := newTestKubeletExcludeAdmitHandlers(t, false /* controllerAttachDetachEnabled */, true /*enableResizing*/) defer testKubelet.Cleanup() @@ -4722,7 +4727,7 @@ func TestHandlePodReconcile_RetryPendingResizes(t *testing.T) { kubelet.allocationManager.PushPendingResize(pendingResizeDesired.UID) kubelet.statusManager.ClearPodResizePendingCondition(pendingResizeDesired.UID) - kubelet.HandlePodReconcile(ctx, []*v1.Pod{tc.newPod}) + kubelet.HandlePodReconcile(tCtx, []*v1.Pod{tc.newPod}) require.Equal(t, tc.shouldRetryPendingResize, kubelet.statusManager.IsPodResizeDeferred(pendingResizeDesired.UID)) kubelet.allocationManager.RemovePod(pendingResizeDesired.UID) @@ -4733,7 +4738,7 @@ func TestHandlePodReconcile_RetryPendingResizes(t *testing.T) { } func TestSyncPodNodeDeclaredFeaturesUpdate(t *testing.T) { - ctx := ktesting.Init(t) + tCtx := ktesting.Init(t) cpu1000m := resource.MustParse("1") mem1000M := resource.MustParse("1Gi") cpu2000m := resource.MustParse("2") @@ -4855,7 +4860,7 @@ func TestSyncPodNodeDeclaredFeaturesUpdate(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - logger := klog.FromContext(ctx) + logger := klog.FromContext(tCtx) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NodeDeclaredFeatures, tc.featureGateEnabled) testKubelet := newTestKubelet(t, false) @@ -4878,7 +4883,7 @@ func TestSyncPodNodeDeclaredFeaturesUpdate(t *testing.T) { } kubelet.statusManager.SetPodStatus(logger, tc.newPod, v1.PodStatus{Phase: v1.PodRunning}) - kubelet.HandlePodUpdates(ctx, []*v1.Pod{tc.newPod}) + kubelet.HandlePodUpdates(tCtx, []*v1.Pod{tc.newPod}) if tc.expectEvent { select { case event := <-recorder.Events: diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index d2539a8bd70..4822b8e99d1 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -164,7 +164,7 @@ type PodWorkers interface { // UpdatePod() calls will be ignored for that pod until it has been forgotten // due to significant time passing. A pod that is terminated will never be // restarted. - UpdatePod(options UpdatePodOptions) + UpdatePod(ctx context.Context, options UpdatePodOptions) // SyncKnownPods removes workers for pods that are not in the desiredPods set // and have been terminated for a significant period of time. Once this method // has been called once, the workers are assumed to be fully initialized and @@ -172,7 +172,7 @@ type PodWorkers interface { // true. It returns a map describing the state of each known pod worker. It // is the responsibility of the caller to re-add any desired pods that are not // returned as knownPods. - SyncKnownPods(desiredPods []*v1.Pod) (knownPods map[types.UID]PodWorkerSync) + SyncKnownPods(logger klog.Logger, desiredPods []*v1.Pod) (knownPods map[types.UID]PodWorkerSync) // IsPodKnownTerminated returns true once SyncTerminatingPod completes // successfully - the provided pod UID it is known by the pod @@ -335,11 +335,6 @@ const ( // podSyncStatus tracks per-pod transitions through the three phases of pod // worker sync (setup, terminating, terminated). type podSyncStatus struct { - // ctx is the context that is associated with the current pod sync. - // TODO: remove this from the struct by having the context initialized - // in startPodSync, the cancelFn used by UpdatePod, and cancellation of - // a parent context for tearing down workers (if needed) on shutdown - ctx context.Context // cancelFn if set is expected to cancel the current podSyncer operation. cancelFn context.CancelFunc @@ -752,7 +747,8 @@ func isPodStatusCacheTerminal(status *kubecontainer.PodStatus) bool { // UpdatePod carries a configuration change or termination state to a pod. A pod is either runnable, // terminating, or terminated, and will transition to terminating if: deleted on the apiserver, // discovered to have a terminal phase (Succeeded or Failed), or evicted by the kubelet. -func (p *podWorkers) UpdatePod(options UpdatePodOptions) { +func (p *podWorkers) UpdatePod(ctx context.Context, options UpdatePodOptions) { + logger := klog.FromContext(ctx) // Handle when the pod is an orphan (no config) and we only have runtime status by running only // the terminating part of the lifecycle. A running pod contains only a minimal set of information // about the pod @@ -763,7 +759,7 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) { if options.Pod == nil { // the sythetic pod created here is used only as a placeholder and not tracked if options.UpdateType != kubetypes.SyncPodKill { - klog.InfoS("Pod update is ignored, runtime pods can only be killed", "pod", klog.KRef(runningPod.Namespace, runningPod.Name), "podUID", runningPod.ID, "updateType", options.UpdateType) + logger.Info("Pod update is ignored, runtime pods can only be killed", "pod", klog.KRef(runningPod.Namespace, runningPod.Name), "podUID", runningPod.ID, "updateType", options.UpdateType) return } uid, ns, name = runningPod.ID, runningPod.Namespace, runningPod.Name @@ -771,7 +767,7 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) { } else { options.RunningPod = nil uid, ns, name = options.Pod.UID, options.Pod.Namespace, options.Pod.Name - klog.InfoS("Pod update included RunningPod which is only valid when Pod is not specified", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) + logger.Info("Pod update included RunningPod which is only valid when Pod is not specified", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) } } else { uid, ns, name = options.Pod.UID, options.Pod.Namespace, options.Pod.Name @@ -785,7 +781,7 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) { now := p.clock.Now() status, ok := p.podSyncStatuses[uid] if !ok { - klog.V(4).InfoS("Pod is being synced for the first time", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) + logger.V(4).Info("Pod is being synced for the first time", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) firstTime = true status = &podSyncStatus{ syncedAt: now, @@ -850,14 +846,14 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) { if !firstTime && status.IsTerminationRequested() { if options.UpdateType == kubetypes.SyncPodCreate { status.restartRequested = true - klog.V(4).InfoS("Pod is terminating but has been requested to restart with same UID, will be reconciled later", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) + logger.V(4).Info("Pod is terminating but has been requested to restart with same UID, will be reconciled later", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) return } } // once a pod is terminated by UID, it cannot reenter the pod worker (until the UID is purged by housekeeping) if status.IsFinished() { - klog.V(4).InfoS("Pod is finished processing, no further updates", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) + logger.V(4).Info("Pod is finished processing, no further updates", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) return } @@ -866,25 +862,25 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) { if !status.IsTerminationRequested() { switch { case isRuntimePod: - klog.V(4).InfoS("Pod is orphaned and must be torn down", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) + logger.V(4).Info("Pod is orphaned and must be torn down", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) status.deleted = true status.terminatingAt = now becameTerminating = true case pod.DeletionTimestamp != nil: - klog.V(4).InfoS("Pod is marked for graceful deletion, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) + logger.V(4).Info("Pod is marked for graceful deletion, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) status.deleted = true status.terminatingAt = now becameTerminating = true case pod.Status.Phase == v1.PodFailed, pod.Status.Phase == v1.PodSucceeded: - klog.V(4).InfoS("Pod is in a terminal phase (success/failed), begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) + logger.V(4).Info("Pod is in a terminal phase (success/failed), begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) status.terminatingAt = now becameTerminating = true case options.UpdateType == kubetypes.SyncPodKill: if options.KillPodOptions != nil && options.KillPodOptions.Evict { - klog.V(4).InfoS("Pod is being evicted by the kubelet, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) + logger.V(4).Info("Pod is being evicted by the kubelet, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) status.evicted = true } else { - klog.V(4).InfoS("Pod is being removed by the kubelet, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) + logger.V(4).Info("Pod is being removed by the kubelet, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) } status.terminatingAt = now becameTerminating = true @@ -899,7 +895,7 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) { // due to housekeeping seeing an older cached version of the runtime pod simply ignore it until // after the pod worker completes. if isRuntimePod { - klog.V(3).InfoS("Pod is waiting for termination, ignoring runtime-only kill until after pod worker is fully terminated", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) + logger.V(3).Info("Pod is waiting for termination, ignoring runtime-only kill until after pod worker is fully terminated", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) return } @@ -966,8 +962,8 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) { // TODO: this should be a wait.Until with backoff to handle panics, and // accept a context for shutdown defer runtime.HandleCrash() - defer klog.V(3).InfoS("Pod worker has stopped", "podUID", uid) - p.podWorkerLoop(uid, outCh) + defer logger.V(3).Info("Pod worker has stopped", "podUID", uid) + p.podWorkerLoop(ctx, uid, outCh) }() } @@ -983,14 +979,14 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) { status.pendingUpdate.Pod, _ = p.allocationManager.UpdatePodFromAllocation(options.Pod) } status.working = true - klog.V(4).InfoS("Notifying pod of pending update", "pod", klog.KRef(ns, name), "podUID", uid, "workType", status.WorkType()) + logger.V(4).Info("Notifying pod of pending update", "pod", klog.KRef(ns, name), "podUID", uid, "workType", status.WorkType()) select { case podUpdates <- struct{}{}: default: } if (becameTerminating || wasGracePeriodShortened) && status.cancelFn != nil { - klog.V(3).InfoS("Cancelling current pod sync", "pod", klog.KRef(ns, name), "podUID", uid, "workType", status.WorkType()) + logger.V(3).Info("Cancelling current pod sync", "pod", klog.KRef(ns, name), "podUID", uid, "workType", status.WorkType()) status.cancelFn() return } @@ -1035,7 +1031,7 @@ func calculateEffectiveGracePeriod(status *podSyncStatus, pod *v1.Pod, options * // it requeues the pod and returns false. If the pod will never be able to start // because data is missing, or the pod was terminated before start, canEverStart // is false. This method can only be called while holding the pod lock. -func (p *podWorkers) allowPodStart(pod *v1.Pod) (canStart bool, canEverStart bool) { +func (p *podWorkers) allowPodStart(logger klog.Logger, pod *v1.Pod) (canStart bool, canEverStart bool) { if !kubetypes.IsStaticPod(pod) { // TODO: Do we want to allow non-static pods with the same full name? // Note that it may disable the force deletion of pods. @@ -1043,7 +1039,7 @@ func (p *podWorkers) allowPodStart(pod *v1.Pod) (canStart bool, canEverStart boo } status, ok := p.podSyncStatuses[pod.UID] if !ok { - klog.ErrorS(nil, "Pod sync status does not exist, the worker should not be running", "pod", klog.KObj(pod), "podUID", pod.UID) + logger.Error(nil, "Pod sync status does not exist, the worker should not be running", "pod", klog.KObj(pod), "podUID", pod.UID) return false, false } if status.IsTerminationRequested() { @@ -1094,14 +1090,14 @@ func (p *podWorkers) allowStaticPodStart(fullname string, uid types.UID) bool { // cleanupUnstartedPod is invoked if a pod that has never been started receives a termination // signal before it can be started. This method must be called holding the pod lock. -func (p *podWorkers) cleanupUnstartedPod(pod *v1.Pod, status *podSyncStatus) { +func (p *podWorkers) cleanupUnstartedPod(logger klog.Logger, pod *v1.Pod, status *podSyncStatus) { p.cleanupPodUpdates(pod.UID) if status.terminatingAt.IsZero() { - klog.V(4).InfoS("Pod worker is complete but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID) + logger.V(4).Info("Pod worker is complete but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID) } if !status.terminatedAt.IsZero() { - klog.V(4).InfoS("Pod worker is complete and had terminatedAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID) + logger.V(4).Info("Pod worker is complete and had terminatedAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID) } status.finished = true status.working = false @@ -1121,7 +1117,7 @@ func (p *podWorkers) cleanupUnstartedPod(pod *v1.Pod, status *podSyncStatus) { // This method should ensure that either status.pendingUpdate is cleared and merged into status.activeUpdate, // or when a pod cannot be started status.pendingUpdate remains the same. Pods that have not been started // should never have an activeUpdate because that is exposed to downstream components on started pods. -func (p *podWorkers) startPodSync(podUID types.UID) (ctx context.Context, update podWork, canStart, canEverStart, ok bool) { +func (p *podWorkers) startPodSync(parentCtx context.Context, podUID types.UID) (ctx context.Context, update podWork, canStart, canEverStart, ok bool) { p.podLock.Lock() defer p.podLock.Unlock() @@ -1129,18 +1125,18 @@ func (p *podWorkers) startPodSync(podUID types.UID) (ctx context.Context, update status, ok := p.podSyncStatuses[podUID] if !ok { // pod status has disappeared, the worker should exit - klog.V(4).InfoS("Pod worker no longer has status, worker should exit", "podUID", podUID) return nil, update, false, false, false } + logger := klog.FromContext(parentCtx) if !status.working { // working is used by unit tests to observe whether a worker is currently acting on this pod - klog.V(4).InfoS("Pod should be marked as working by the pod worker, programmer error", "podUID", podUID) + logger.V(4).Info("Pod should be marked as working by the pod worker, programmer error", "podUID", podUID) } if status.pendingUpdate == nil { // no update available, this means we were queued without work being added or there is a // race condition, both of which are unexpected status.working = false - klog.V(4).InfoS("Pod worker received no pending work, programmer error?", "podUID", podUID) + logger.V(4).Info("Pod worker received no pending work, programmer error?", "podUID", podUID) return nil, update, false, false, false } @@ -1154,11 +1150,7 @@ func (p *podWorkers) startPodSync(podUID types.UID) (ctx context.Context, update default: } - // initialize a context for the worker if one does not exist - if status.ctx == nil || status.ctx.Err() == context.Canceled { - status.ctx, status.cancelFn = context.WithCancel(context.Background()) - } - ctx = status.ctx + ctx, status.cancelFn = context.WithCancel(parentCtx) // if we are already started, make our state visible to downstream components if status.IsStarted() { @@ -1178,26 +1170,26 @@ func (p *podWorkers) startPodSync(podUID types.UID) (ctx context.Context, update // asked to start such a pod, but guard here just in case an accident occurs. if update.Options.Pod == nil { status.mergeLastUpdate(update.Options) - klog.V(4).InfoS("Running pod cannot start ever, programmer error", "pod", klog.KObj(update.Options.Pod), "podUID", podUID, "updateType", update.WorkType) + logger.V(4).Info("Running pod cannot start ever, programmer error", "pod", klog.KObj(update.Options.Pod), "podUID", podUID, "updateType", update.WorkType) return ctx, update, false, false, true } // verify we can start - canStart, canEverStart = p.allowPodStart(update.Options.Pod) + canStart, canEverStart = p.allowPodStart(logger, update.Options.Pod) switch { case !canEverStart: - p.cleanupUnstartedPod(update.Options.Pod, status) + p.cleanupUnstartedPod(logger, update.Options.Pod, status) status.working = false if start := update.Options.StartTime; !start.IsZero() { metrics.PodWorkerDuration.WithLabelValues("terminated").Observe(metrics.SinceInSeconds(start)) } - klog.V(4).InfoS("Pod cannot start ever", "pod", klog.KObj(update.Options.Pod), "podUID", podUID, "updateType", update.WorkType) + logger.V(4).Info("Pod cannot start ever", "pod", klog.KObj(update.Options.Pod), "podUID", podUID, "updateType", update.WorkType) return ctx, update, canStart, canEverStart, true case !canStart: // this is the only path we don't start the pod, so we need to put the change back in pendingUpdate status.pendingUpdate = &update.Options status.working = false - klog.V(4).InfoS("Pod cannot start yet", "pod", klog.KObj(update.Options.Pod), "podUID", podUID) + logger.V(4).Info("Pod cannot start yet", "pod", klog.KObj(update.Options.Pod), "podUID", podUID) return ctx, update, canStart, canEverStart, true } @@ -1234,14 +1226,15 @@ func podUIDAndRefForUpdate(update UpdatePodOptions) (types.UID, klog.ObjectRef) // to trigger new UpdatePod calls. SyncKnownPods will only retry pods that are no longer known to the // caller. When a pod transitions working->terminating or terminating->terminated, the next update is // queued immediately and no kubelet action is required. -func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{}) { +func (p *podWorkers) podWorkerLoop(parentCtx context.Context, podUID types.UID, podUpdates <-chan struct{}) { var lastSyncTime time.Time for range podUpdates { - ctx, update, canStart, canEverStart, ok := p.startPodSync(podUID) + ctx, update, canStart, canEverStart, ok := p.startPodSync(parentCtx, podUID) // If we had no update waiting, it means someone initialized the channel without filling out pendingUpdate. if !ok { continue } + logger := klog.FromContext(ctx) // If the pod was terminated prior to the pod being allowed to start, we exit the loop. if !canEverStart { return @@ -1253,7 +1246,7 @@ func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{}) podUID, podRef := podUIDAndRefForUpdate(update.Options) - klog.V(4).InfoS("Processing pod event", "pod", podRef, "podUID", podUID, "updateType", update.WorkType) + logger.V(4).Info("Processing pod event", "pod", podRef, "podUID", podUID, "updateType", update.WorkType) var isTerminal bool err := func() error { // The worker is responsible for ensuring the sync method sees the appropriate @@ -1296,7 +1289,7 @@ func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{}) if opt := update.Options.KillPodOptions; opt != nil { gracePeriod = opt.PodTerminationGracePeriodSecondsOverride } - podStatusFn := p.acknowledgeTerminating(podUID) + podStatusFn := p.acknowledgeTerminating(logger, podUID) // if we only have a running pod, terminate it directly if update.Options.RunningPod != nil { @@ -1317,55 +1310,55 @@ func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{}) switch { case err == context.Canceled: // when the context is cancelled we expect an update to already be queued - klog.V(2).InfoS("Sync exited with context cancellation error", "pod", podRef, "podUID", podUID, "updateType", update.WorkType) + logger.V(2).Info("Sync exited with context cancellation error", "pod", podRef, "podUID", podUID, "updateType", update.WorkType) case err != nil: // we will queue a retry - klog.ErrorS(err, "Error syncing pod, skipping", "pod", podRef, "podUID", podUID) + logger.Error(err, "Error syncing pod, skipping", "pod", podRef, "podUID", podUID) case update.WorkType == TerminatedPod: // we can shut down the worker - p.completeTerminated(podUID) + p.completeTerminated(logger, podUID) if start := update.Options.StartTime; !start.IsZero() { metrics.PodWorkerDuration.WithLabelValues("terminated").Observe(metrics.SinceInSeconds(start)) } - klog.V(4).InfoS("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType) + logger.V(4).Info("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType) return case update.WorkType == TerminatingPod: // pods that don't exist in config don't need to be terminated, other loops will clean them up if update.Options.RunningPod != nil { - p.completeTerminatingRuntimePod(podUID) + p.completeTerminatingRuntimePod(logger, podUID) if start := update.Options.StartTime; !start.IsZero() { metrics.PodWorkerDuration.WithLabelValues(update.Options.UpdateType.String()).Observe(metrics.SinceInSeconds(start)) } - klog.V(4).InfoS("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType) + logger.V(4).Info("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType) return } // otherwise we move to the terminating phase - p.completeTerminating(podUID) + p.completeTerminating(logger, podUID) phaseTransition = true case isTerminal: // if syncPod indicated we are now terminal, set the appropriate pod status to move to terminating - klog.V(4).InfoS("Pod is terminal", "pod", podRef, "podUID", podUID, "updateType", update.WorkType) - p.completeSync(podUID) + logger.V(4).Info("Pod is terminal", "pod", podRef, "podUID", podUID, "updateType", update.WorkType) + p.completeSync(logger, podUID) phaseTransition = true } // queue a retry if necessary, then put the next event in the channel if any - p.completeWork(podUID, phaseTransition, err) + p.completeWork(logger, podUID, phaseTransition, err) if start := update.Options.StartTime; !start.IsZero() { metrics.PodWorkerDuration.WithLabelValues(update.Options.UpdateType.String()).Observe(metrics.SinceInSeconds(start)) } - klog.V(4).InfoS("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType) + logger.V(4).Info("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType) } } // acknowledgeTerminating sets the terminating flag on the pod status once the pod worker sees // the termination state so that other components know no new containers will be started in this // pod. It then returns the status function, if any, that applies to this pod. -func (p *podWorkers) acknowledgeTerminating(podUID types.UID) PodStatusFunc { +func (p *podWorkers) acknowledgeTerminating(logger klog.Logger, podUID types.UID) PodStatusFunc { p.podLock.Lock() defer p.podLock.Unlock() @@ -1375,7 +1368,7 @@ func (p *podWorkers) acknowledgeTerminating(podUID types.UID) PodStatusFunc { } if !status.terminatingAt.IsZero() && !status.startedTerminating { - klog.V(4).InfoS("Pod worker has observed request to terminate", "podUID", podUID) + logger.V(4).Info("Pod worker has observed request to terminate", "podUID", podUID) status.startedTerminating = true } @@ -1389,15 +1382,15 @@ func (p *podWorkers) acknowledgeTerminating(podUID types.UID) PodStatusFunc { // be terminated. This happens when the natural pod lifecycle completes - any pod which is not RestartAlways // exits. Unnatural completions, such as evictions, API driven deletion or phase transition, are handled by // UpdatePod. -func (p *podWorkers) completeSync(podUID types.UID) { +func (p *podWorkers) completeSync(logger klog.Logger, podUID types.UID) { p.podLock.Lock() defer p.podLock.Unlock() - klog.V(4).InfoS("Pod indicated lifecycle completed naturally and should now terminate", "podUID", podUID) + logger.V(4).Info("Pod indicated lifecycle completed naturally and should now terminate", "podUID", podUID) status, ok := p.podSyncStatuses[podUID] if !ok { - klog.V(4).InfoS("Pod had no status in completeSync, programmer error?", "podUID", podUID) + logger.V(4).Info("Pod had no status in completeSync, programmer error?", "podUID", podUID) return } @@ -1405,7 +1398,7 @@ func (p *podWorkers) completeSync(podUID types.UID) { if status.terminatingAt.IsZero() { status.terminatingAt = p.clock.Now() } else { - klog.V(4).InfoS("Pod worker attempted to set terminatingAt twice, likely programmer error", "podUID", podUID) + logger.V(4).Info("Pod worker attempted to set terminatingAt twice, likely programmer error", "podUID", podUID) } status.startedTerminating = true @@ -1418,11 +1411,11 @@ func (p *podWorkers) completeSync(podUID types.UID) { // no container is running, no container will be started in the future, and we are ready for // cleanup. This updates the termination state which prevents future syncs and will ensure // other kubelet loops know this pod is not running any containers. -func (p *podWorkers) completeTerminating(podUID types.UID) { +func (p *podWorkers) completeTerminating(logger klog.Logger, podUID types.UID) { p.podLock.Lock() defer p.podLock.Unlock() - klog.V(4).InfoS("Pod terminated all containers successfully", "podUID", podUID) + logger.V(4).Info("Pod terminated all containers successfully", "podUID", podUID) status, ok := p.podSyncStatuses[podUID] if !ok { @@ -1431,7 +1424,7 @@ func (p *podWorkers) completeTerminating(podUID types.UID) { // update the status of the pod if status.terminatingAt.IsZero() { - klog.V(4).InfoS("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "podUID", podUID) + logger.V(4).Info("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "podUID", podUID) } status.terminatedAt = p.clock.Now() for _, ch := range status.notifyPostTerminating { @@ -1449,11 +1442,11 @@ func (p *podWorkers) completeTerminating(podUID types.UID) { // which means an orphaned pod (no config) is terminated and we can exit. Since orphaned // pods have no API representation, we want to exit the loop at this point and ensure no // status is present afterwards - the running pod is truly terminated when this is invoked. -func (p *podWorkers) completeTerminatingRuntimePod(podUID types.UID) { +func (p *podWorkers) completeTerminatingRuntimePod(logger klog.Logger, podUID types.UID) { p.podLock.Lock() defer p.podLock.Unlock() - klog.V(4).InfoS("Pod terminated all orphaned containers successfully and worker can now stop", "podUID", podUID) + logger.V(4).Info("Pod terminated all orphaned containers successfully and worker can now stop", "podUID", podUID) p.cleanupPodUpdates(podUID) @@ -1462,7 +1455,7 @@ func (p *podWorkers) completeTerminatingRuntimePod(podUID types.UID) { return } if status.terminatingAt.IsZero() { - klog.V(4).InfoS("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "podUID", podUID) + logger.V(4).Info("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "podUID", podUID) } status.terminatedAt = p.clock.Now() status.finished = true @@ -1479,11 +1472,11 @@ func (p *podWorkers) completeTerminatingRuntimePod(podUID types.UID) { // completeTerminated is invoked after syncTerminatedPod completes successfully and means we // can stop the pod worker. The pod is finalized at this point. -func (p *podWorkers) completeTerminated(podUID types.UID) { +func (p *podWorkers) completeTerminated(logger klog.Logger, podUID types.UID) { p.podLock.Lock() defer p.podLock.Unlock() - klog.V(4).InfoS("Pod is complete and the worker can now stop", "podUID", podUID) + logger.V(4).Info("Pod is complete and the worker can now stop", "podUID", podUID) p.cleanupPodUpdates(podUID) @@ -1492,10 +1485,10 @@ func (p *podWorkers) completeTerminated(podUID types.UID) { return } if status.terminatingAt.IsZero() { - klog.V(4).InfoS("Pod worker is complete but did not have terminatingAt set, likely programmer error", "podUID", podUID) + logger.V(4).Info("Pod worker is complete but did not have terminatingAt set, likely programmer error", "podUID", podUID) } if status.terminatedAt.IsZero() { - klog.V(4).InfoS("Pod worker is complete but did not have terminatedAt set, likely programmer error", "podUID", podUID) + logger.V(4).Info("Pod worker is complete but did not have terminatedAt set, likely programmer error", "podUID", podUID) } status.finished = true status.working = false @@ -1507,7 +1500,7 @@ func (p *podWorkers) completeTerminated(podUID types.UID) { // completeWork requeues on error or the next sync interval and then immediately executes any pending // work. -func (p *podWorkers) completeWork(podUID types.UID, phaseTransition bool, syncErr error) { +func (p *podWorkers) completeWork(logger klog.Logger, podUID types.UID, phaseTransition bool, syncErr error) { // Requeue the last update if the last sync returned error. switch { case phaseTransition: @@ -1541,9 +1534,9 @@ func (p *podWorkers) completeWork(podUID types.UID, phaseTransition bool, syncEr if status.pendingUpdate != nil { select { case p.podUpdates[podUID] <- struct{}{}: - klog.V(4).InfoS("Requeuing pod due to pending update", "podUID", podUID) + logger.V(4).Info("Requeuing pod due to pending update", "podUID", podUID) default: - klog.V(4).InfoS("Pending update already queued", "podUID", podUID) + logger.V(4).Info("Pending update already queued", "podUID", podUID) } } else { status.working = false @@ -1559,7 +1552,7 @@ func (p *podWorkers) completeWork(podUID types.UID, phaseTransition bool, syncEr // of known workers that are not finished with a value of SyncPodTerminated, // SyncPodKill, or SyncPodSync depending on whether the pod is terminated, terminating, // or syncing. -func (p *podWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkerSync { +func (p *podWorkers) SyncKnownPods(logger klog.Logger, desiredPods []*v1.Pod) map[types.UID]PodWorkerSync { workers := make(map[types.UID]PodWorkerSync) known := make(map[types.UID]struct{}) for _, pod := range desiredPods { @@ -1588,7 +1581,7 @@ func (p *podWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorke _, knownPod := known[uid] orphan := !knownPod if status.restartRequested || orphan { - if p.removeTerminatedWorker(uid, status, orphan) { + if p.removeTerminatedWorker(logger, uid, status, orphan) { // no worker running, we won't return it continue } @@ -1622,11 +1615,11 @@ func (p *podWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorke // terminated pods to prevent accidentally restarting a terminal pod, which is // proportional to the number of pods described in the pod config. The method // returns true if the worker was completely removed. -func (p *podWorkers) removeTerminatedWorker(uid types.UID, status *podSyncStatus, orphaned bool) bool { +func (p *podWorkers) removeTerminatedWorker(logger klog.Logger, uid types.UID, status *podSyncStatus, orphaned bool) bool { if !status.finished { // If the pod worker has not reached terminal state and the pod is still known, we wait. if !orphaned { - klog.V(4).InfoS("Pod worker has been requested for removal but is still not fully terminated", "podUID", uid) + logger.V(4).Info("Pod worker has been requested for removal but is still not fully terminated", "podUID", uid) return false } @@ -1640,7 +1633,7 @@ func (p *podWorkers) removeTerminatedWorker(uid types.UID, status *podSyncStatus case !status.IsStarted() && !status.observedRuntime: // The pod has not been started, which means we can safely clean up the pod - the // pod worker will shutdown as a result of this change without executing a sync. - klog.V(4).InfoS("Pod is orphaned and has not been started", "podUID", uid) + logger.V(4).Info("Pod is orphaned and has not been started", "podUID", uid) case !status.IsTerminationRequested(): // The pod has been started but termination has not been requested - set the appropriate // timestamp and notify the pod worker. Because the pod has been synced at least once, @@ -1652,22 +1645,22 @@ func (p *podWorkers) removeTerminatedWorker(uid types.UID, status *podSyncStatus status.gracePeriod = 1 } p.requeueLastPodUpdate(uid, status) - klog.V(4).InfoS("Pod is orphaned and still running, began terminating", "podUID", uid) + logger.V(4).Info("Pod is orphaned and still running, began terminating", "podUID", uid) return false default: // The pod is already moving towards termination, notify the pod worker. Because the pod // has been synced at least once, the value of status.activeUpdate will be the fallback for // the next sync. p.requeueLastPodUpdate(uid, status) - klog.V(4).InfoS("Pod is orphaned and still terminating, notified the pod worker", "podUID", uid) + logger.V(4).Info("Pod is orphaned and still terminating, notified the pod worker", "podUID", uid) return false } } if status.restartRequested { - klog.V(4).InfoS("Pod has been terminated but another pod with the same UID was created, remove history to allow restart", "podUID", uid) + logger.V(4).Info("Pod has been terminated but another pod with the same UID was created, remove history to allow restart", "podUID", uid) } else { - klog.V(4).InfoS("Pod has been terminated and is no longer known to the kubelet, remove all history", "podUID", uid) + logger.V(4).Info("Pod has been terminated and is no longer known to the kubelet, remove all history", "podUID", uid) } delete(p.podSyncStatuses, uid) p.cleanupPodUpdates(uid) @@ -1680,7 +1673,7 @@ func (p *podWorkers) removeTerminatedWorker(uid types.UID, status *podSyncStatus // killPodNow returns a KillPodFunc that can be used to kill a pod. // It is intended to be injected into other modules that need to kill a pod. -func killPodNow(podWorkers PodWorkers, recorder record.EventRecorder) eviction.KillPodFunc { +func killPodNow(logger klog.Logger, podWorkers PodWorkers, recorder record.EventRecorder) eviction.KillPodFunc { return func(pod *v1.Pod, isEvicted bool, gracePeriodOverride *int64, statusFn func(*v1.PodStatus)) error { // determine the grace period to use when killing the pod gracePeriod := int64(0) @@ -1701,16 +1694,19 @@ func killPodNow(podWorkers PodWorkers, recorder record.EventRecorder) eviction.K // open a channel we block against until we get a result ch := make(chan struct{}, 1) - podWorkers.UpdatePod(UpdatePodOptions{ - Pod: pod, - UpdateType: kubetypes.SyncPodKill, - KillPodOptions: &KillPodOptions{ - CompletedCh: ch, - Evict: isEvicted, - PodStatusFunc: statusFn, - PodTerminationGracePeriodSecondsOverride: gracePeriodOverride, - }, - }) + podWorkers.UpdatePod( + // KillPodFunc interface does not provide a context parameter. + klog.NewContext(context.TODO(), logger), + UpdatePodOptions{ + Pod: pod, + UpdateType: kubetypes.SyncPodKill, + KillPodOptions: &KillPodOptions{ + CompletedCh: ch, + Evict: isEvicted, + PodStatusFunc: statusFn, + PodTerminationGracePeriodSecondsOverride: gracePeriodOverride, + }, + }) // wait for either a response, or a timeout select { diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index 4dc418744a0..0af256d8f3a 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -34,12 +34,14 @@ import ( utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/kubelet/allocation" "k8s.io/kubernetes/pkg/kubelet/cm" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/queue" + "k8s.io/kubernetes/test/utils/ktesting" "k8s.io/utils/clock" clocktesting "k8s.io/utils/clock/testing" ) @@ -66,7 +68,7 @@ type fakePodWorkers struct { terminatingStaticPods map[string]bool } -func (f *fakePodWorkers) UpdatePod(options UpdatePodOptions) { +func (f *fakePodWorkers) UpdatePod(ctx context.Context, options UpdatePodOptions) { f.lock.Lock() defer f.lock.Unlock() var uid types.UID @@ -86,7 +88,7 @@ func (f *fakePodWorkers) UpdatePod(options UpdatePodOptions) { case kubetypes.SyncPodKill: f.triggeredDeletion = append(f.triggeredDeletion, uid) default: - isTerminal, err := f.syncPodFn(context.Background(), options.UpdateType, options.Pod, options.MirrorPod, status) + isTerminal, err := f.syncPodFn(ctx, options.UpdateType, options.Pod, options.MirrorPod, status) if err != nil { f.t.Errorf("Unexpected error: %v", err) } @@ -96,7 +98,7 @@ func (f *fakePodWorkers) UpdatePod(options UpdatePodOptions) { } } -func (f *fakePodWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkerSync { +func (f *fakePodWorkers) SyncKnownPods(_ klog.Logger, desiredPods []*v1.Pod) map[types.UID]PodWorkerSync { return map[types.UID]PodWorkerSync{} } @@ -261,11 +263,11 @@ type timeIncrementingWorkers struct { // UpdatePod increments the clock after UpdatePod is called, but before the workers // are invoked, and then drains all workers before returning. The provided functions // are invoked while holding the lock to prevent workers from receiving updates. -func (w *timeIncrementingWorkers) UpdatePod(options UpdatePodOptions, afterFns ...func()) { +func (w *timeIncrementingWorkers) UpdatePod(ctx context.Context, options UpdatePodOptions, afterFns ...func()) { func() { w.lock.Lock() defer w.lock.Unlock() - w.w.UpdatePod(options) + w.w.UpdatePod(ctx, options) w.w.clock.(*clocktesting.FakePassiveClock).SetTime(w.w.clock.Now().Add(time.Second)) for _, fn := range afterFns { fn() @@ -276,11 +278,11 @@ func (w *timeIncrementingWorkers) UpdatePod(options UpdatePodOptions, afterFns . // SyncKnownPods increments the clock after SyncKnownPods is called, but before the workers // are invoked, and then drains all workers before returning. -func (w *timeIncrementingWorkers) SyncKnownPods(desiredPods []*v1.Pod) (knownPods map[types.UID]PodWorkerSync) { +func (w *timeIncrementingWorkers) SyncKnownPods(logger klog.Logger, desiredPods []*v1.Pod) (knownPods map[types.UID]PodWorkerSync) { func() { w.lock.Lock() defer w.lock.Unlock() - knownPods = w.w.SyncKnownPods(desiredPods) + knownPods = w.w.SyncKnownPods(logger, desiredPods) w.w.clock.(*clocktesting.FakePassiveClock).SetTime(w.w.clock.Now().Add(time.Second)) }() w.drainUnpausedWorkers() @@ -369,8 +371,8 @@ func (w *timeIncrementingWorkers) tick() { // createTimeIncrementingPodWorkers will guarantee that each call to UpdatePod and each worker goroutine invocation advances the clock by one second, // although multiple workers will advance the clock in an unpredictable order. Use to observe // successive internal updates to each update pod state when only a single pod is being updated. -func createTimeIncrementingPodWorkers() (*timeIncrementingWorkers, map[types.UID][]syncPodRecord) { - nested, runtime, processed := createPodWorkers() +func createTimeIncrementingPodWorkers(logger klog.Logger) (*timeIncrementingWorkers, map[types.UID][]syncPodRecord) { + nested, runtime, processed := createPodWorkers(logger) w := &timeIncrementingWorkers{ w: nested, runtime: runtime, @@ -392,7 +394,11 @@ func createTimeIncrementingPodWorkers() (*timeIncrementingWorkers, map[types.UID return w, processed } -func createPodWorkers() (*podWorkers, *containertest.FakeRuntime, map[types.UID][]syncPodRecord) { +func createPodWorkers(logger klog.Logger) (*podWorkers, *containertest.FakeRuntime, map[types.UID][]syncPodRecord) { + return createPodWorkersWithLogger(logger) +} + +func createPodWorkersWithLogger(_ klog.Logger) (*podWorkers, *containertest.FakeRuntime, map[types.UID][]syncPodRecord) { lock := sync.Mutex{} processed := make(map[types.UID][]syncPodRecord) fakeRecorder := &record.FakeRecorder{} @@ -524,12 +530,13 @@ func drainAllWorkers(podWorkers *podWorkers) { } func TestUpdatePodParallel(t *testing.T) { - podWorkers, _, processed := createPodWorkers() + logger, tCtx := ktesting.NewTestContext(t) + podWorkers, _, processed := createPodWorkers(logger) numPods := 20 for i := 0; i < numPods; i++ { for j := i; j < numPods; j++ { - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: newNamedPod(strconv.Itoa(j), "ns", strconv.Itoa(i), false), UpdateType: kubetypes.SyncPodCreate, }) @@ -558,8 +565,8 @@ func TestUpdatePodParallel(t *testing.T) { func TestUpdatePod(t *testing.T) { one := int64(1) - hasContext := func(status *podSyncStatus) *podSyncStatus { - status.ctx, status.cancelFn = context.Background(), func() {} + hasCancelFn := func(status *podSyncStatus) *podSyncStatus { + status.cancelFn = func() {} return status } withLabel := func(pod *v1.Pod, label, value string) *v1.Pod { @@ -581,11 +588,16 @@ func TestUpdatePod(t *testing.T) { t.Helper() // handle special non-comparable fields if status != nil { - if e, a := expected.ctx != nil, status.ctx != nil; e != a { - t.Errorf("expected context %t, has context %t", e, a) - } else { - expected.ctx, status.ctx = nil, nil + clearLogger := func(opts *UpdatePodOptions) { + if opts == nil { + return + } } + clearLogger(expected.pendingUpdate) + clearLogger(expected.activeUpdate) + clearLogger(status.pendingUpdate) + clearLogger(status.activeUpdate) + if e, a := expected.cancelFn != nil, status.cancelFn != nil; e != a { t.Errorf("expected cancelFn %t, has cancelFn %t", e, a) } else { @@ -600,7 +612,7 @@ func TestUpdatePod(t *testing.T) { name string update UpdatePodOptions runtimeStatus *kubecontainer.PodStatus - prepare func(t *testing.T, w *timeIncrementingWorkers) (afterUpdateFn func()) + prepare func(t *testing.T, tCtx context.Context, w *timeIncrementingWorkers) (afterUpdateFn func()) expect *podSyncStatus expectBeforeWorker *podSyncStatus @@ -612,7 +624,7 @@ func TestUpdatePod(t *testing.T) { UpdateType: kubetypes.SyncPodCreate, Pod: newNamedPod("1", "ns", "running-pod", false), }, - expect: hasContext(&podSyncStatus{ + expect: hasCancelFn(&podSyncStatus{ fullname: "running-pod_ns", syncedAt: time.Unix(1, 0), startedAt: time.Unix(3, 0), @@ -627,19 +639,19 @@ func TestUpdatePod(t *testing.T) { UpdateType: kubetypes.SyncPodCreate, Pod: withLabel(newNamedPod("1", "ns", "running-pod", false), "updated", "value"), }, - prepare: func(t *testing.T, w *timeIncrementingWorkers) func() { - w.UpdatePod(UpdatePodOptions{ + prepare: func(t *testing.T, tCtx context.Context, w *timeIncrementingWorkers) func() { + w.UpdatePod(tCtx, UpdatePodOptions{ UpdateType: kubetypes.SyncPodCreate, Pod: newNamedPod("1", "ns", "running-pod", false), }) w.PauseWorkers("1") - w.UpdatePod(UpdatePodOptions{ + w.UpdatePod(tCtx, UpdatePodOptions{ UpdateType: kubetypes.SyncPodKill, Pod: newNamedPod("1", "ns", "running-pod", false), }) return func() { w.ReleaseWorkersUnderLock("1") } }, - expect: hasContext(&podSyncStatus{ + expect: hasCancelFn(&podSyncStatus{ fullname: "running-pod_ns", syncedAt: time.Unix(1, 0), startedAt: time.Unix(3, 0), @@ -663,7 +675,7 @@ func TestUpdatePod(t *testing.T) { Pod: newNamedPod("1", "ns", "running-pod", false), RunningPod: &kubecontainer.Pod{ID: "1", Name: "orphaned-pod", Namespace: "ns"}, }, - expect: hasContext(&podSyncStatus{ + expect: hasCancelFn(&podSyncStatus{ fullname: "running-pod_ns", syncedAt: time.Unix(1, 0), startedAt: time.Unix(3, 0), @@ -678,14 +690,14 @@ func TestUpdatePod(t *testing.T) { UpdateType: kubetypes.SyncPodUpdate, Pod: withDeletionTimestamp(newNamedPod("1", "ns", "running-pod", false), time.Unix(1, 0), intp(15)), }, - prepare: func(t *testing.T, w *timeIncrementingWorkers) func() { - w.UpdatePod(UpdatePodOptions{ + prepare: func(t *testing.T, tCtx context.Context, w *timeIncrementingWorkers) func() { + w.UpdatePod(tCtx, UpdatePodOptions{ UpdateType: kubetypes.SyncPodCreate, Pod: newNamedPod("1", "ns", "running-pod", false), }) return nil }, - expect: hasContext(&podSyncStatus{ + expect: hasCancelFn(&podSyncStatus{ fullname: "running-pod_ns", syncedAt: time.Unix(1, 0), startedAt: time.Unix(3, 0), @@ -709,14 +721,14 @@ func TestUpdatePod(t *testing.T) { Pod: newNamedPod("1", "ns", "running-pod", false), KillPodOptions: &KillPodOptions{Evict: true}, }, - prepare: func(t *testing.T, w *timeIncrementingWorkers) func() { - w.UpdatePod(UpdatePodOptions{ + prepare: func(t *testing.T, tCtx context.Context, w *timeIncrementingWorkers) func() { + w.UpdatePod(tCtx, UpdatePodOptions{ UpdateType: kubetypes.SyncPodCreate, Pod: newNamedPod("1", "ns", "running-pod", false), }) return nil }, - expect: hasContext(&podSyncStatus{ + expect: hasCancelFn(&podSyncStatus{ fullname: "running-pod_ns", syncedAt: time.Unix(1, 0), startedAt: time.Unix(3, 0), @@ -742,7 +754,7 @@ func TestUpdatePod(t *testing.T) { UpdateType: kubetypes.SyncPodCreate, Pod: newPodWithPhase("1", "done-pod", v1.PodSucceeded), }, - expect: hasContext(&podSyncStatus{ + expect: hasCancelFn(&podSyncStatus{ fullname: "done-pod_ns", syncedAt: time.Unix(1, 0), terminatingAt: time.Unix(1, 0), @@ -778,7 +790,7 @@ func TestUpdatePod(t *testing.T) { startedTerminating: true, working: true, }, - expect: hasContext(&podSyncStatus{ + expect: hasCancelFn(&podSyncStatus{ fullname: "done-pod_ns", syncedAt: time.Unix(1, 0), terminatingAt: time.Unix(1, 0), @@ -843,7 +855,8 @@ func TestUpdatePod(t *testing.T) { var fns []func() - podWorkers, _ := createTimeIncrementingPodWorkers() + logger, tCtx := ktesting.NewTestContext(t) + podWorkers, _ := createTimeIncrementingPodWorkers(logger) if tc.expectBeforeWorker != nil { fns = append(fns, func() { @@ -852,7 +865,7 @@ func TestUpdatePod(t *testing.T) { } if tc.prepare != nil { - if fn := tc.prepare(t, podWorkers); fn != nil { + if fn := tc.prepare(t, tCtx, podWorkers); fn != nil { fns = append(fns, fn) } } @@ -871,7 +884,7 @@ func TestUpdatePod(t *testing.T) { podWorkers.runtime.Err = nil }) - podWorkers.UpdatePod(tc.update, fns...) + podWorkers.UpdatePod(tCtx, tc.update, fns...) if podWorkers.w.IsPodKnownTerminated(uid) != tc.expectKnownTerminated { t.Errorf("podWorker.IsPodKnownTerminated expected to be %t", tc.expectKnownTerminated) @@ -997,7 +1010,8 @@ func TestCompleteWork_Enqueue(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - podWorkers, _, _ := createPodWorkers() + logger, _ := ktesting.NewTestContext(t) + podWorkers, _, _ := createPodWorkers(logger) podWorkers.clock = clock fakeQueue := podWorkers.workQueue.(*fakeQueue) podUID := types.UID("12345") @@ -1005,7 +1019,7 @@ func TestCompleteWork_Enqueue(t *testing.T) { podWorkers.resyncInterval = resyncInterval podWorkers.backOffPeriod = defaultBackoff podWorkers.podSyncStatuses[podUID] = &podSyncStatus{} - podWorkers.completeWork(podUID, tc.phaseTransition, tc.syncErr) + podWorkers.completeWork(logger, podUID, tc.phaseTransition, tc.syncErr) if fakeQueue.Empty() { t.Fatalf("work queue should not be empty") @@ -1031,13 +1045,14 @@ func TestCompleteWork_PendingUpdate(t *testing.T) { podUID := types.UID("pod-with-pending-update-check") t.Run("with nil pendingUpdate, clears working status", func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) p := &podWorkers{ podSyncStatuses: make(map[types.UID]*podSyncStatus), workQueue: &fakeQueue{}, } p.podSyncStatuses[podUID] = &podSyncStatus{working: true, pendingUpdate: nil} - p.completeWork(podUID, false, nil) + p.completeWork(logger, podUID, false, nil) p.podLock.Lock() defer p.podLock.Unlock() @@ -1047,6 +1062,7 @@ func TestCompleteWork_PendingUpdate(t *testing.T) { }) t.Run("with non-nil pendingUpdate, queues an update signal", func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) p := &podWorkers{ podSyncStatuses: make(map[types.UID]*podSyncStatus), podUpdates: make(map[types.UID]chan struct{}), @@ -1059,7 +1075,7 @@ func TestCompleteWork_PendingUpdate(t *testing.T) { } p.podSyncStatuses[podUID] = &podSyncStatus{working: true, pendingUpdate: dummyUpdate} - p.completeWork(podUID, false, nil) + p.completeWork(logger, podUID, false, nil) select { case <-p.podUpdates[podUID]: @@ -1071,10 +1087,11 @@ func TestCompleteWork_PendingUpdate(t *testing.T) { } func TestUpdatePodForRuntimePod(t *testing.T) { - podWorkers, _, processed := createPodWorkers() + logger, tCtx := ktesting.NewTestContext(t) + podWorkers, _, processed := createPodWorkers(logger) // ignores running pod of wrong sync type - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ UpdateType: kubetypes.SyncPodCreate, RunningPod: &kubecontainer.Pod{ID: "1", Name: "1", Namespace: "test"}, }) @@ -1084,7 +1101,7 @@ func TestUpdatePodForRuntimePod(t *testing.T) { } // creates synthetic pod - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ UpdateType: kubetypes.SyncPodKill, RunningPod: &kubecontainer.Pod{ID: "1", Name: "1", Namespace: "test"}, }) @@ -1102,7 +1119,8 @@ func TestUpdatePodForRuntimePod(t *testing.T) { } func TestUpdatePodForTerminatedRuntimePod(t *testing.T) { - podWorkers, _, processed := createPodWorkers() + logger, tCtx := ktesting.NewTestContext(t) + podWorkers, _, processed := createPodWorkers(logger) now := time.Now() podWorkers.podSyncStatuses[types.UID("1")] = &podSyncStatus{ @@ -1113,7 +1131,7 @@ func TestUpdatePodForTerminatedRuntimePod(t *testing.T) { } // creates synthetic pod - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ UpdateType: kubetypes.SyncPodKill, RunningPod: &kubecontainer.Pod{ID: "1", Name: "1", Namespace: "test"}, }) @@ -1128,19 +1146,20 @@ func TestUpdatePodForTerminatedRuntimePod(t *testing.T) { } func TestUpdatePodDoesNotForgetSyncPodKill(t *testing.T) { - podWorkers, _, processed := createPodWorkers() + logger, tCtx := ktesting.NewTestContext(t) + podWorkers, _, processed := createPodWorkers(logger) numPods := 20 for i := 0; i < numPods; i++ { pod := newNamedPod(strconv.Itoa(i), "ns", strconv.Itoa(i), false) - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: pod, UpdateType: kubetypes.SyncPodCreate, }) - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: pod, UpdateType: kubetypes.SyncPodKill, }) - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: pod, UpdateType: kubetypes.SyncPodUpdate, }) @@ -1213,14 +1232,15 @@ func newTerminalPhaseSync(fn syncPodFnType) *terminalPhaseSync { } func TestTerminalPhaseTransition(t *testing.T) { - podWorkers, _, _ := createPodWorkers() + logger, tCtx := ktesting.NewTestContext(t) + podWorkers, _, _ := createPodWorkers(logger) var channels WorkChannel podWorkers.workerChannelFn = channels.Intercept terminalPhaseSyncer := newTerminalPhaseSync(podWorkers.podSyncer.(*podSyncerFuncs).syncPod) podWorkers.podSyncer.(*podSyncerFuncs).syncPod = terminalPhaseSyncer.SyncPod // start pod - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: newNamedPod("1", "test1", "pod1", false), UpdateType: kubetypes.SyncPodUpdate, }) @@ -1233,7 +1253,7 @@ func TestTerminalPhaseTransition(t *testing.T) { } // send another update to the pod - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: newNamedPod("1", "test1", "pod1", false), UpdateType: kubetypes.SyncPodUpdate, }) @@ -1247,7 +1267,7 @@ func TestTerminalPhaseTransition(t *testing.T) { // the next sync should result in a transition to terminal terminalPhaseSyncer.SetTerminal(types.UID("1")) - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: newNamedPod("1", "test1", "pod1", false), UpdateType: kubetypes.SyncPodUpdate, }) @@ -1265,7 +1285,8 @@ func TestStaticPodExclusion(t *testing.T) { t.Skip("skipping test in short mode.") } - podWorkers, _, processed := createPodWorkers() + logger, tCtx := ktesting.NewTestContext(t) + podWorkers, _, processed := createPodWorkers(logger) var channels WorkChannel podWorkers.workerChannelFn = channels.Intercept @@ -1275,11 +1296,11 @@ func TestStaticPodExclusion(t *testing.T) { } // start two pods with the same name, one static, one apiserver - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: newNamedPod("1-normal", "test1", "pod1", false), UpdateType: kubetypes.SyncPodUpdate, }) - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: newNamedPod("2-static", "test1", "pod1", true), UpdateType: kubetypes.SyncPodUpdate, }) @@ -1308,11 +1329,11 @@ func TestStaticPodExclusion(t *testing.T) { } // attempt to start a second and third static pod, which should not start - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: newNamedPod("3-static", "test1", "pod1", true), UpdateType: kubetypes.SyncPodUpdate, }) - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: newNamedPod("4-static", "test1", "pod1", true), UpdateType: kubetypes.SyncPodUpdate, }) @@ -1367,7 +1388,7 @@ func TestStaticPodExclusion(t *testing.T) { // send a basic update for 3-static podWorkers.workQueue.GetWork() - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: newNamedPod("3-static", "test1", "pod1", true), UpdateType: kubetypes.SyncPodUpdate, }) @@ -1387,7 +1408,7 @@ func TestStaticPodExclusion(t *testing.T) { // mark 3-static as deleted while 2-static is still running podWorkers.workQueue.GetWork() - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: newNamedPod("3-static", "test1", "pod1", true), UpdateType: kubetypes.SyncPodKill, }) @@ -1412,7 +1433,7 @@ func TestStaticPodExclusion(t *testing.T) { } // terminate 2-static - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: newNamedPod("2-static", "test1", "pod1", true), UpdateType: kubetypes.SyncPodKill, }) @@ -1431,7 +1452,7 @@ func TestStaticPodExclusion(t *testing.T) { } // simulate a periodic event from the work queue for 4-static - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: newNamedPod("4-static", "test1", "pod1", true), UpdateType: kubetypes.SyncPodUpdate, }) @@ -1450,7 +1471,7 @@ func TestStaticPodExclusion(t *testing.T) { } // initiate a sync with all pods remaining - state := podWorkers.SyncKnownPods([]*v1.Pod{ + state := podWorkers.SyncKnownPods(logger, []*v1.Pod{ newNamedPod("1-normal", "test1", "pod1", false), newNamedPod("2-static", "test1", "pod1", true), newNamedPod("3-static", "test1", "pod1", true), @@ -1473,7 +1494,7 @@ func TestStaticPodExclusion(t *testing.T) { } // initiate a sync with 3-static removed - state = podWorkers.SyncKnownPods([]*v1.Pod{ + state = podWorkers.SyncKnownPods(logger, []*v1.Pod{ newNamedPod("1-normal", "test1", "pod1", false), newNamedPod("2-static", "test1", "pod1", true), newNamedPod("4-static", "test1", "pod1", true), @@ -1494,18 +1515,18 @@ func TestStaticPodExclusion(t *testing.T) { // start a static pod, kill it, then add another one, but ensure the pod worker // for pod 5 doesn't see the kill event (so it remains waiting to start) - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: newNamedPod("5-static", "test1", "pod1", true), UpdateType: kubetypes.SyncPodUpdate, }) // Wait for the previous work to be delivered to the worker drainAllWorkers(podWorkers) channels.Channel("5-static").Hold() - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: newNamedPod("5-static", "test1", "pod1", true), UpdateType: kubetypes.SyncPodKill, }) - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: newNamedPod("6-static", "test1", "pod1", true), UpdateType: kubetypes.SyncPodUpdate, }) @@ -1524,12 +1545,12 @@ func TestStaticPodExclusion(t *testing.T) { } // terminate 4-static and wake 6-static - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: newNamedPod("4-static", "test1", "pod1", true), UpdateType: kubetypes.SyncPodKill, }) drainWorkersExcept(podWorkers, "5-static") - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: newNamedPod("6-static", "test1", "pod1", true), UpdateType: kubetypes.SyncPodUpdate, }) @@ -1564,30 +1585,30 @@ func TestStaticPodExclusion(t *testing.T) { // start three more static pods, kill the previous static pod blocking start, // and simulate the second pod of three (8) getting to run first - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: newNamedPod("7-static", "test1", "pod1", true), UpdateType: kubetypes.SyncPodUpdate, }) - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: newNamedPod("8-static", "test1", "pod1", true), UpdateType: kubetypes.SyncPodUpdate, }) - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: newNamedPod("9-static", "test1", "pod1", true), UpdateType: kubetypes.SyncPodUpdate, }) drainAllWorkers(podWorkers) - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: newNamedPod("6-static", "test1", "pod1", true), UpdateType: kubetypes.SyncPodKill, }) drainAllWorkers(podWorkers) - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: newNamedPod("6-static", "test1", "pod1", true), UpdateType: kubetypes.SyncPodCreate, }) drainAllWorkers(podWorkers) - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: newNamedPod("8-static", "test1", "pod1", true), UpdateType: kubetypes.SyncPodUpdate, }) @@ -1623,12 +1644,12 @@ func TestStaticPodExclusion(t *testing.T) { } // terminate 7-static and wake 8-static - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: newNamedPod("7-static", "test1", "pod1", true), UpdateType: kubetypes.SyncPodKill, }) drainAllWorkers(podWorkers) - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: newNamedPod("8-static", "test1", "pod1", true), UpdateType: kubetypes.SyncPodUpdate, }) @@ -1642,7 +1663,7 @@ func TestStaticPodExclusion(t *testing.T) { } // initiate a sync with all but 8-static pods undesired - state = podWorkers.SyncKnownPods([]*v1.Pod{ + state = podWorkers.SyncKnownPods(logger, []*v1.Pod{ newNamedPod("8-static", "test1", "pod1", true), }) drainAllWorkers(podWorkers) @@ -1741,11 +1762,12 @@ func (w *WorkChannel) Intercept(uid types.UID, ch chan struct{}) (outCh <-chan s } func TestSyncKnownPods(t *testing.T) { - podWorkers, _, _ := createPodWorkers() + logger, tCtx := ktesting.NewTestContext(t) + podWorkers, _, _ := createPodWorkers(logger) numPods := 20 for i := 0; i < numPods; i++ { - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: newNamedPod(strconv.Itoa(i), "ns", "name", false), UpdateType: kubetypes.SyncPodUpdate, }) @@ -1771,7 +1793,7 @@ func TestSyncKnownPods(t *testing.T) { now := metav1.Now() pod.DeletionTimestamp = &now } - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: pod, UpdateType: kubetypes.SyncPodKill, }) @@ -1824,7 +1846,7 @@ func TestSyncKnownPods(t *testing.T) { t.Errorf("Expected pod to not be suitable for removal (does not exist but not yet synced)") } - podWorkers.SyncKnownPods(desiredPodList) + podWorkers.SyncKnownPods(logger, desiredPodList) if len(podWorkers.podUpdates) != 2 { t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates)) } @@ -1850,7 +1872,7 @@ func TestSyncKnownPods(t *testing.T) { // verify workers that are not terminated stay open even if config no longer // sees them - podWorkers.SyncKnownPods(nil) + podWorkers.SyncKnownPods(logger, nil) drainAllWorkers(podWorkers) if len(podWorkers.podUpdates) != 0 { t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates)) @@ -1861,7 +1883,7 @@ func TestSyncKnownPods(t *testing.T) { for uid := range desiredPods { pod := newNamedPod(string(uid), "ns", "name", false) - podWorkers.UpdatePod(UpdatePodOptions{ + podWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: pod, UpdateType: kubetypes.SyncPodKill, }) @@ -1869,7 +1891,7 @@ func TestSyncKnownPods(t *testing.T) { drainWorkers(podWorkers, numPods) // verify once those pods terminate (via some other flow) the workers are cleared - podWorkers.SyncKnownPods(nil) + podWorkers.SyncKnownPods(logger, nil) if len(podWorkers.podUpdates) != 0 { t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates)) } @@ -2048,7 +2070,16 @@ func Test_removeTerminatedWorker(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - podWorkers, _, _ := createPodWorkers() + normalizeUpdatePodOptions := func(opts *UpdatePodOptions) *UpdatePodOptions { + if opts == nil { + return nil + } + normalized := *opts + return &normalized + } + + logger, _ := ktesting.NewTestContext(t) + podWorkers, _, _ := createPodWorkers(logger) podWorkers.podSyncStatuses[podUID] = tc.podSyncStatus podWorkers.podUpdates[podUID] = make(chan struct{}, 1) if tc.podSyncStatus.working { @@ -2057,7 +2088,7 @@ func Test_removeTerminatedWorker(t *testing.T) { podWorkers.startedStaticPodsByFullname = tc.startedStaticPodsByFullname podWorkers.waitingToStartStaticPodsByFullname = tc.waitingToStartStaticPodsByFullname - podWorkers.removeTerminatedWorker(podUID, podWorkers.podSyncStatuses[podUID], tc.orphan) + podWorkers.removeTerminatedWorker(logger, podUID, podWorkers.podSyncStatuses[podUID], tc.orphan) status, exists := podWorkers.podSyncStatuses[podUID] if tc.removed && exists { t.Fatalf("Expected pod worker to be removed") @@ -2071,8 +2102,10 @@ func Test_removeTerminatedWorker(t *testing.T) { if tc.expectGracePeriod > 0 && status.gracePeriod != tc.expectGracePeriod { t.Errorf("Unexpected grace period %d", status.gracePeriod) } - if !reflect.DeepEqual(tc.expectPending, status.pendingUpdate) { - t.Errorf("Unexpected pending: %s", cmp.Diff(tc.expectPending, status.pendingUpdate)) + expectedPending := normalizeUpdatePodOptions(tc.expectPending) + actualPending := normalizeUpdatePodOptions(status.pendingUpdate) + if !reflect.DeepEqual(expectedPending, actualPending) { + t.Errorf("Unexpected pending: %s", cmp.Diff(expectedPending, actualPending)) } if tc.expectPending != nil { if !status.working { @@ -2119,6 +2152,7 @@ func (kl *simpleFakeKubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, // TestFakePodWorkers verifies that the fakePodWorkers behaves the same way as the real podWorkers // for their invocation of the syncPodFn. func TestFakePodWorkers(t *testing.T) { + tCtx := ktesting.Init(t) fakeRecorder := &record.FakeRecorder{} fakeRuntime := &containertest.FakeRuntime{} fakeCache := containertest.NewFakeCache(fakeRuntime) @@ -2163,12 +2197,12 @@ func TestFakePodWorkers(t *testing.T) { for i, tt := range tests { kubeletForRealWorkers.wg.Add(1) - realPodWorkers.UpdatePod(UpdatePodOptions{ + realPodWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: tt.pod, MirrorPod: tt.mirrorPod, UpdateType: kubetypes.SyncPodUpdate, }) - fakePodWorkers.UpdatePod(UpdatePodOptions{ + fakePodWorkers.UpdatePod(tCtx, UpdatePodOptions{ Pod: tt.pod, MirrorPod: tt.mirrorPod, UpdateType: kubetypes.SyncPodUpdate, @@ -2193,8 +2227,9 @@ func TestFakePodWorkers(t *testing.T) { // TestKillPodNowFunc tests the blocking kill pod function works with pod workers as expected. func TestKillPodNowFunc(t *testing.T) { fakeRecorder := &record.FakeRecorder{} - podWorkers, _, processed := createPodWorkers() - killPodFunc := killPodNow(podWorkers, fakeRecorder) + logger, _ := ktesting.NewTestContext(t) + podWorkers, _, processed := createPodWorkers(logger) + killPodFunc := killPodNow(logger, podWorkers, fakeRecorder) pod := newNamedPod("test", "ns", "test", false) gracePeriodOverride := int64(0) err := killPodFunc(pod, false, &gracePeriodOverride, func(status *v1.PodStatus) { @@ -2525,7 +2560,8 @@ func Test_allowPodStart(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - podWorkers, _, _ := createPodWorkers() + logger, _ := ktesting.NewTestContext(t) + podWorkers, _, _ := createPodWorkers(logger) if tc.podSyncStatuses != nil { podWorkers.podSyncStatuses = tc.podSyncStatuses } @@ -2535,7 +2571,7 @@ func Test_allowPodStart(t *testing.T) { if tc.waitingToStartStaticPodsByFullname != nil { podWorkers.waitingToStartStaticPodsByFullname = tc.waitingToStartStaticPodsByFullname } - allowed, allowedEver := podWorkers.allowPodStart(tc.pod) + allowed, allowedEver := podWorkers.allowPodStart(logger, tc.pod) if allowed != tc.allowed { if tc.allowed { t.Errorf("Pod should be allowed") From 6bccc051ebee1123675f6ba862858918a674f4ef Mon Sep 17 00:00:00 2001 From: hoteye Date: Fri, 13 Feb 2026 09:54:54 +0800 Subject: [PATCH 2/5] hack: add temporary kubelet contextual logcheck carve-outs Keep broad kubelet contextual logcheck coverage while excluding a small set of legacy files that still use non-contextual logging patterns. These carve-outs are temporary and will be removed in follow-up kubelet contextual logging PRs. --- hack/golangci-hints.yaml | 6 ++++++ hack/golangci.yaml | 6 ++++++ hack/logcheck.conf | 6 ++++++ 3 files changed, 18 insertions(+) diff --git a/hack/golangci-hints.yaml b/hack/golangci-hints.yaml index f0292cb0732..47ca7baa9f7 100644 --- a/hack/golangci-hints.yaml +++ b/hack/golangci-hints.yaml @@ -323,6 +323,12 @@ linters: contextual k8s.io/kubernetes/test/e2e/dra/.* contextual k8s.io/kubernetes/test/images/sample-device-plugin/.* contextual k8s.io/kubernetes/pkg/kubelet/.* + # Temporary carve-outs for PR1 split safety: these files still have legacy global klog calls. + # TODO: remove each exclusion as follow-up PRs migrate the file to contextual logging. + -contextual k8s.io/kubernetes/pkg/kubelet/kubelet_resources.go + -contextual k8s.io/kubernetes/pkg/kubelet/kubelet_volumes.go + -contextual k8s.io/kubernetes/pkg/kubelet/pod_container_deletor.go + -contextual k8s.io/kubernetes/pkg/kubelet/volume_host.go # As long as contextual logging is alpha or beta, all WithName, WithValues, # NewContext calls have to go through klog. Once it is GA, we can lift diff --git a/hack/golangci.yaml b/hack/golangci.yaml index 0a1bf73b109..e75c535ea02 100644 --- a/hack/golangci.yaml +++ b/hack/golangci.yaml @@ -332,6 +332,12 @@ linters: contextual k8s.io/kubernetes/test/e2e/dra/.* contextual k8s.io/kubernetes/test/images/sample-device-plugin/.* contextual k8s.io/kubernetes/pkg/kubelet/.* + # Temporary carve-outs for PR1 split safety: these files still have legacy global klog calls. + # TODO: remove each exclusion as follow-up PRs migrate the file to contextual logging. + -contextual k8s.io/kubernetes/pkg/kubelet/kubelet_resources.go + -contextual k8s.io/kubernetes/pkg/kubelet/kubelet_volumes.go + -contextual k8s.io/kubernetes/pkg/kubelet/pod_container_deletor.go + -contextual k8s.io/kubernetes/pkg/kubelet/volume_host.go # As long as contextual logging is alpha or beta, all WithName, WithValues, # NewContext calls have to go through klog. Once it is GA, we can lift diff --git a/hack/logcheck.conf b/hack/logcheck.conf index 930c039659d..ba4194e06ac 100644 --- a/hack/logcheck.conf +++ b/hack/logcheck.conf @@ -65,6 +65,12 @@ contextual k8s.io/kubernetes/pkg/securitycontext/.* contextual k8s.io/kubernetes/test/e2e/dra/.* contextual k8s.io/kubernetes/test/images/sample-device-plugin/.* contextual k8s.io/kubernetes/pkg/kubelet/.* +# Temporary carve-outs for PR1 split safety: these files still have legacy global klog calls. +# TODO: remove each exclusion as follow-up PRs migrate the file to contextual logging. +-contextual k8s.io/kubernetes/pkg/kubelet/kubelet_resources.go +-contextual k8s.io/kubernetes/pkg/kubelet/kubelet_volumes.go +-contextual k8s.io/kubernetes/pkg/kubelet/pod_container_deletor.go +-contextual k8s.io/kubernetes/pkg/kubelet/volume_host.go # As long as contextual logging is alpha or beta, all WithName, WithValues, # NewContext calls have to go through klog. Once it is GA, we can lift From 81096981713938dba53290d82fc3cf700fd570ee Mon Sep 17 00:00:00 2001 From: hoteye Date: Fri, 13 Feb 2026 18:32:10 +0800 Subject: [PATCH 3/5] kubelet: use a single test context in pod status subtest Replace duplicate test context creation in kubelet_pods_test with a single ktesting.NewTestContext(t) call. Reuse logger and tCtx from that call to align with test context naming and usage guidelines. --- pkg/kubelet/kubelet_pods_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/kubelet/kubelet_pods_test.go b/pkg/kubelet/kubelet_pods_test.go index f5852f6e69c..d0b17feb9f6 100644 --- a/pkg/kubelet/kubelet_pods_test.go +++ b/pkg/kubelet/kubelet_pods_test.go @@ -4857,12 +4857,11 @@ func Test_generateAPIPodStatus(t *testing.T) { for _, test := range tests { for _, enablePodReadyToStartContainersCondition := range []bool{false, true} { t.Run(test.name, func(t *testing.T) { - tCtx := ktesting.Init(t) + logger, tCtx := ktesting.NewTestContext(t) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodReadyToStartContainersCondition, enablePodReadyToStartContainersCondition) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kl := testKubelet.kubelet - logger, _ := ktesting.NewTestContext(t) kl.statusManager.SetPodStatus(logger, test.pod, test.previousStatus) for _, name := range test.unreadyContainer { kl.readinessManager.Set(kubecontainer.BuildContainerID("", findContainerStatusByName(test.expected, name).ContainerID), results.Failure, test.pod) From 5fdef09048f5674b208f64bcfe3cf44d6bb17377 Mon Sep 17 00:00:00 2001 From: hoteye Date: Fri, 20 Feb 2026 00:30:24 +0800 Subject: [PATCH 4/5] kubelet: pass upper-level context through killPodNow Switch killPodNow to take context.Context and pass the existing upper-level context into podWorkers.UpdatePod, instead of creating a TODO-based context from logger. --- pkg/kubelet/kubelet.go | 6 +++--- pkg/kubelet/kubelet_test.go | 4 ++-- pkg/kubelet/pod_workers.go | 5 ++--- pkg/kubelet/pod_workers_test.go | 4 ++-- 4 files changed, 9 insertions(+), 10 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 0ef90c56c5f..756fe8b84ab 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1020,7 +1020,7 @@ func NewMainKubelet(ctx context.Context, // setup eviction manager evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, - killPodNow(logger, klet.podWorkers, kubeDeps.Recorder), klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock, kubeCfg.LocalStorageCapacityIsolation) + killPodNow(ctx, klet.podWorkers, kubeDeps.Recorder), klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock, kubeCfg.LocalStorageCapacityIsolation) klet.evictionManager = evictionManager @@ -1061,7 +1061,7 @@ func NewMainKubelet(ctx context.Context, handlers = append(handlers, klet.containerManager.GetAllocateResourcesPodAdmitHandler()) - criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.getAllocatedPods, killPodNow(logger, klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder) + criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.getAllocatedPods, killPodNow(ctx, klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder) handlers = append(handlers, lifecycle.NewPredicateAdmitHandler(klet.GetCachedNode, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources)) // apply functional Option's for _, opt := range kubeDeps.Options { @@ -1100,7 +1100,7 @@ func NewMainKubelet(ctx context.Context, Recorder: kubeDeps.Recorder, NodeRef: nodeRef, GetPodsFunc: klet.GetActivePods, - KillPodFunc: killPodNow(logger, klet.podWorkers, kubeDeps.Recorder), + KillPodFunc: killPodNow(ctx, klet.podWorkers, kubeDeps.Recorder), SyncNodeStatusFunc: klet.syncNodeStatus, ShutdownGracePeriodRequested: kubeCfg.ShutdownGracePeriod.Duration, ShutdownGracePeriodCriticalPods: kubeCfg.ShutdownGracePeriodCriticalPods.Duration, diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index ee3d90b4414..73d121fc6f3 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -394,7 +394,7 @@ func newTestKubeletWithImageList( } // setup eviction manager evictionManager, evictionAdmitHandler := eviction.NewManager(kubelet.resourceAnalyzer, eviction.Config{}, - killPodNow(logger, kubelet.podWorkers, fakeRecorder), kubelet.imageManager, kubelet.containerGC, fakeRecorder, nodeRef, kubelet.clock, kubelet.supportLocalStorageCapacityIsolation()) + killPodNow(tCtx, kubelet.podWorkers, fakeRecorder), kubelet.imageManager, kubelet.containerGC, fakeRecorder, nodeRef, kubelet.clock, kubelet.supportLocalStorageCapacityIsolation()) kubelet.evictionManager = evictionManager handlers := []lifecycle.PodAdmitHandler{} @@ -406,7 +406,7 @@ func newTestKubeletWithImageList( Recorder: fakeRecorder, NodeRef: nodeRef, GetPodsFunc: kubelet.podManager.GetPods, - KillPodFunc: killPodNow(logger, kubelet.podWorkers, fakeRecorder), + KillPodFunc: killPodNow(tCtx, kubelet.podWorkers, fakeRecorder), SyncNodeStatusFunc: func(context.Context) {}, ShutdownGracePeriodRequested: 0, ShutdownGracePeriodCriticalPods: 0, diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 4822b8e99d1..8e90399a53f 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -1673,7 +1673,7 @@ func (p *podWorkers) removeTerminatedWorker(logger klog.Logger, uid types.UID, s // killPodNow returns a KillPodFunc that can be used to kill a pod. // It is intended to be injected into other modules that need to kill a pod. -func killPodNow(logger klog.Logger, podWorkers PodWorkers, recorder record.EventRecorder) eviction.KillPodFunc { +func killPodNow(ctx context.Context, podWorkers PodWorkers, recorder record.EventRecorder) eviction.KillPodFunc { return func(pod *v1.Pod, isEvicted bool, gracePeriodOverride *int64, statusFn func(*v1.PodStatus)) error { // determine the grace period to use when killing the pod gracePeriod := int64(0) @@ -1695,8 +1695,7 @@ func killPodNow(logger klog.Logger, podWorkers PodWorkers, recorder record.Event // open a channel we block against until we get a result ch := make(chan struct{}, 1) podWorkers.UpdatePod( - // KillPodFunc interface does not provide a context parameter. - klog.NewContext(context.TODO(), logger), + ctx, UpdatePodOptions{ Pod: pod, UpdateType: kubetypes.SyncPodKill, diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index 0af256d8f3a..e9802e391b0 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -2227,9 +2227,9 @@ func TestFakePodWorkers(t *testing.T) { // TestKillPodNowFunc tests the blocking kill pod function works with pod workers as expected. func TestKillPodNowFunc(t *testing.T) { fakeRecorder := &record.FakeRecorder{} - logger, _ := ktesting.NewTestContext(t) + logger, tCtx := ktesting.NewTestContext(t) podWorkers, _, processed := createPodWorkers(logger) - killPodFunc := killPodNow(logger, podWorkers, fakeRecorder) + killPodFunc := killPodNow(tCtx, podWorkers, fakeRecorder) pod := newNamedPod("test", "ns", "test", false) gracePeriodOverride := int64(0) err := killPodFunc(pod, false, &gracePeriodOverride, func(status *v1.PodStatus) { From beefa128aaf51dfc6885c00375347c9fdb0ef0d4 Mon Sep 17 00:00:00 2001 From: hoteye Date: Sat, 21 Feb 2026 00:21:56 +0800 Subject: [PATCH 5/5] kubelet: reuse common UpdatePod log fields Introduced a local updateLogger with common key-value pairs (pod, podUID, updateType) after uid/ns/name are resolved, and removed those repeated fields from subsequent log calls. Used a separate variable instead of reassigning logger to avoid affecting the goroutine closure used by defer logger.V(3).Info(...). --- pkg/kubelet/pod_workers.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 8e90399a53f..a5b00af8d74 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -773,6 +773,7 @@ func (p *podWorkers) UpdatePod(ctx context.Context, options UpdatePodOptions) { uid, ns, name = options.Pod.UID, options.Pod.Namespace, options.Pod.Name } + updateLogger := logger.WithValues("pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) p.podLock.Lock() defer p.podLock.Unlock() @@ -781,7 +782,7 @@ func (p *podWorkers) UpdatePod(ctx context.Context, options UpdatePodOptions) { now := p.clock.Now() status, ok := p.podSyncStatuses[uid] if !ok { - logger.V(4).Info("Pod is being synced for the first time", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) + updateLogger.V(4).Info("Pod is being synced for the first time") firstTime = true status = &podSyncStatus{ syncedAt: now, @@ -846,14 +847,14 @@ func (p *podWorkers) UpdatePod(ctx context.Context, options UpdatePodOptions) { if !firstTime && status.IsTerminationRequested() { if options.UpdateType == kubetypes.SyncPodCreate { status.restartRequested = true - logger.V(4).Info("Pod is terminating but has been requested to restart with same UID, will be reconciled later", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) + updateLogger.V(4).Info("Pod is terminating but has been requested to restart with same UID, will be reconciled later") return } } // once a pod is terminated by UID, it cannot reenter the pod worker (until the UID is purged by housekeeping) if status.IsFinished() { - logger.V(4).Info("Pod is finished processing, no further updates", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) + updateLogger.V(4).Info("Pod is finished processing, no further updates") return } @@ -862,25 +863,25 @@ func (p *podWorkers) UpdatePod(ctx context.Context, options UpdatePodOptions) { if !status.IsTerminationRequested() { switch { case isRuntimePod: - logger.V(4).Info("Pod is orphaned and must be torn down", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) + updateLogger.V(4).Info("Pod is orphaned and must be torn down") status.deleted = true status.terminatingAt = now becameTerminating = true case pod.DeletionTimestamp != nil: - logger.V(4).Info("Pod is marked for graceful deletion, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) + updateLogger.V(4).Info("Pod is marked for graceful deletion, begin teardown") status.deleted = true status.terminatingAt = now becameTerminating = true case pod.Status.Phase == v1.PodFailed, pod.Status.Phase == v1.PodSucceeded: - logger.V(4).Info("Pod is in a terminal phase (success/failed), begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) + updateLogger.V(4).Info("Pod is in a terminal phase (success/failed), begin teardown") status.terminatingAt = now becameTerminating = true case options.UpdateType == kubetypes.SyncPodKill: if options.KillPodOptions != nil && options.KillPodOptions.Evict { - logger.V(4).Info("Pod is being evicted by the kubelet, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) + updateLogger.V(4).Info("Pod is being evicted by the kubelet, begin teardown") status.evicted = true } else { - logger.V(4).Info("Pod is being removed by the kubelet, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) + updateLogger.V(4).Info("Pod is being removed by the kubelet, begin teardown") } status.terminatingAt = now becameTerminating = true @@ -895,7 +896,7 @@ func (p *podWorkers) UpdatePod(ctx context.Context, options UpdatePodOptions) { // due to housekeeping seeing an older cached version of the runtime pod simply ignore it until // after the pod worker completes. if isRuntimePod { - logger.V(3).Info("Pod is waiting for termination, ignoring runtime-only kill until after pod worker is fully terminated", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) + updateLogger.V(3).Info("Pod is waiting for termination, ignoring runtime-only kill until after pod worker is fully terminated") return } @@ -979,14 +980,14 @@ func (p *podWorkers) UpdatePod(ctx context.Context, options UpdatePodOptions) { status.pendingUpdate.Pod, _ = p.allocationManager.UpdatePodFromAllocation(options.Pod) } status.working = true - logger.V(4).Info("Notifying pod of pending update", "pod", klog.KRef(ns, name), "podUID", uid, "workType", status.WorkType()) + updateLogger.V(4).Info("Notifying pod of pending update", "workType", status.WorkType()) select { case podUpdates <- struct{}{}: default: } if (becameTerminating || wasGracePeriodShortened) && status.cancelFn != nil { - logger.V(3).Info("Cancelling current pod sync", "pod", klog.KRef(ns, name), "podUID", uid, "workType", status.WorkType()) + updateLogger.V(3).Info("Cancelling current pod sync", "workType", status.WorkType()) status.cancelFn() return }