From e9965e7f791b60b6454de355d8bd03f49db31e4f Mon Sep 17 00:00:00 2001 From: hoteye Date: Fri, 8 May 2026 08:51:10 +0800 Subject: [PATCH] kubelet: pass logger through pod API server Replace logger-only klog.FromContext(context.Background()) usage in the pod API server with an injected logger. This keeps the cleanup scoped to logging and does not change context cancellation behavior. --- pkg/kubelet/apis/pods/server.go | 33 +++++----- pkg/kubelet/apis/pods/server_unit_test.go | 77 +++++++++++++---------- pkg/kubelet/kubelet.go | 2 +- pkg/kubelet/status/status_manager.go | 18 +++--- 4 files changed, 67 insertions(+), 63 deletions(-) diff --git a/pkg/kubelet/apis/pods/server.go b/pkg/kubelet/apis/pods/server.go index 5b9fff3e7f6..86535a826ba 100644 --- a/pkg/kubelet/apis/pods/server.go +++ b/pkg/kubelet/apis/pods/server.go @@ -48,17 +48,17 @@ type broadcaster struct { incoming chan PodWatchEvent } -func NewBroadcaster() *broadcaster { +func NewBroadcaster(ctx context.Context) *broadcaster { b := &broadcaster{ clients: make(map[chan PodWatchEvent]struct{}), incoming: make(chan PodWatchEvent, 1000), } - go b.run() + go b.run(ctx) return b } -func (b *broadcaster) run() { - logger := klog.FromContext(context.Background()) +func (b *broadcaster) run(ctx context.Context) { + logger := klog.FromContext(ctx) for event := range b.incoming { b.lock.RLock() // We collect clients to drop to avoid modifying the map during iteration @@ -89,31 +89,28 @@ func (b *broadcaster) run() { } } -func (b *broadcaster) Register(client chan PodWatchEvent) { +func (b *broadcaster) Register(logger klog.Logger, client chan PodWatchEvent) { b.lock.Lock() defer b.lock.Unlock() b.clients[client] = struct{}{} - logger := klog.FromContext(context.Background()) logger.Info("Registered new watch client", "totalClients", len(b.clients)) } -func (b *broadcaster) Unregister(client chan PodWatchEvent) { +func (b *broadcaster) Unregister(logger klog.Logger, client chan PodWatchEvent) { b.lock.Lock() defer b.lock.Unlock() if _, ok := b.clients[client]; ok { delete(b.clients, client) close(client) } - logger := klog.FromContext(context.Background()) logger.Info("Unregistered watch client", "totalClients", len(b.clients)) } -func (b *broadcaster) Broadcast(event PodWatchEvent) { +func (b *broadcaster) Broadcast(logger klog.Logger, event PodWatchEvent) { select { case b.incoming <- event: default: // This should realistically never happen because run() purges slow clients. - logger := klog.FromContext(context.Background()) logger.Info("Broadcaster internal buffer full, dropping event.") metrics.PodWatchEventsDroppedTotal.Inc() } @@ -136,7 +133,7 @@ func NewPodsServer(broadcaster *broadcaster, podManager pod.Manager, statusProvi } } -// NewPodsServerForTest creates a new PodServer with an injectable ticker for testing. +// NewPodsServerForTest creates a new PodServer for testing. func NewPodsServerForTest(broadcaster *broadcaster, podManager pod.Manager, statusProvider podstatus.PodStatusProvider) *PodsServer { return &PodsServer{ podManager: podManager, @@ -146,20 +143,19 @@ func NewPodsServerForTest(broadcaster *broadcaster, podManager pod.Manager, stat } // OnPodUpdated is called when a pod's spec and status are coherent. -func (s *PodsServer) OnPodUpdated(pod *v1.Pod, status v1.PodStatus, isAdded bool) { +func (s *PodsServer) OnPodUpdated(logger klog.Logger, pod *v1.Pod, status v1.PodStatus, isAdded bool) { eventType := watch.Modified if isAdded { eventType = watch.Added } podCopy := *pod podCopy.Status = status - s.broadcaster.Broadcast(PodWatchEvent{Type: eventType, UID: pod.UID, Pod: &podCopy}) - logger := klog.FromContext(context.Background()) + s.broadcaster.Broadcast(logger, PodWatchEvent{Type: eventType, UID: pod.UID, Pod: &podCopy}) logger.Info("Pod update broadcasted", "podUID", pod.UID, "type", eventType) } // OnPodRemoved is called when a pod is removed. -func (s *PodsServer) OnPodRemoved(pod *v1.Pod) { +func (s *PodsServer) OnPodRemoved(logger klog.Logger, pod *v1.Pod) { minimalPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: pod.Name, @@ -167,8 +163,7 @@ func (s *PodsServer) OnPodRemoved(pod *v1.Pod) { UID: pod.UID, }, } - s.broadcaster.Broadcast(PodWatchEvent{Type: watch.Deleted, UID: pod.UID, Pod: minimalPod}) - logger := klog.FromContext(context.Background()) + s.broadcaster.Broadcast(logger, PodWatchEvent{Type: watch.Deleted, UID: pod.UID, Pod: minimalPod}) logger.Info("Pod removed broadcasted", "podUID", pod.UID) } @@ -237,9 +232,9 @@ func (s *PodsServer) WatchPods(req *podsv1alpha1.WatchPodsRequest, stream podsv1 logger.Info("WatchPods called", "client", clientAddr) clientChannel := make(chan PodWatchEvent, 100) - s.broadcaster.Register(clientChannel) + s.broadcaster.Register(logger, clientChannel) defer func() { - s.broadcaster.Unregister(clientChannel) + s.broadcaster.Unregister(logger, clientChannel) logger.Info("Watch client disconnected", "client", clientAddr) }() diff --git a/pkg/kubelet/apis/pods/server_unit_test.go b/pkg/kubelet/apis/pods/server_unit_test.go index 66059450a9e..1ea39a08ec9 100644 --- a/pkg/kubelet/apis/pods/server_unit_test.go +++ b/pkg/kubelet/apis/pods/server_unit_test.go @@ -45,22 +45,24 @@ import ( "k8s.io/kubernetes/pkg/kubelet/metrics" kubepodtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" statustest "k8s.io/kubernetes/pkg/kubelet/status/testing" + "k8s.io/kubernetes/test/utils/ktesting" ) func TestStartEventLoop(t *testing.T) { - broadcaster := podsapi.NewBroadcaster() + logger, tCtx := ktesting.NewTestContext(t) + broadcaster := podsapi.NewBroadcaster(tCtx) mockManager := new(kubepodtest.MockManager) mockStatus := new(statustest.MockPodStatusProvider) server := podsapi.NewPodsServerForTest(broadcaster, mockManager, mockStatus) pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "pod1-uid", Name: "pod1", Namespace: "ns1"}} clientChannel := make(chan podsapi.PodWatchEvent, 100) - broadcaster.Register(clientChannel) - defer broadcaster.Unregister(clientChannel) + broadcaster.Register(logger, clientChannel) + defer broadcaster.Unregister(logger, clientChannel) - server.OnPodUpdated(pod1, v1.PodStatus{Phase: v1.PodPending}, true) - server.OnPodUpdated(pod1, v1.PodStatus{Phase: v1.PodSucceeded}, false) - server.OnPodRemoved(pod1) + server.OnPodUpdated(logger, pod1, v1.PodStatus{Phase: v1.PodPending}, true) + server.OnPodUpdated(logger, pod1, v1.PodStatus{Phase: v1.PodSucceeded}, false) + server.OnPodRemoved(logger, pod1) event := <-clientChannel assert.Equal(t, watch.Added, event.Type) @@ -108,7 +110,8 @@ func (m *MockWatchPodsServer) Context() context.Context { } func TestWatchPods(t *testing.T) { - broadcaster := podsapi.NewBroadcaster() + logger, tCtx := ktesting.NewTestContext(t) + broadcaster := podsapi.NewBroadcaster(tCtx) mockManager := new(kubepodtest.MockManager) mockStatus := new(statustest.MockPodStatusProvider) server := podsapi.NewPodsServerForTest(broadcaster, mockManager, mockStatus) @@ -118,11 +121,11 @@ func TestWatchPods(t *testing.T) { mockManager.On("GetPodByUID", types.UID("pod1-uid")).Return(pod1, true) mockStatus.On("GetPodStatus", mock.Anything).Return(v1.PodStatus{}, false) - ctx, cancel := context.WithCancel(context.Background()) + streamCtx, cancel := context.WithCancel(tCtx) defer cancel() mockStream := &MockWatchPodsServer{ - Ctx: ctx, + Ctx: streamCtx, EventCh: make(chan *podsv1alpha1.WatchPodsEvent, 10), } @@ -144,7 +147,7 @@ func TestWatchPods(t *testing.T) { assert.Empty(t, event.Pod) // Trigger an update - server.OnPodUpdated(pod1, pod1.Status, false) + server.OnPodUpdated(logger, pod1, pod1.Status, false) // Verify MODIFIED event event = <-mockStream.EventCh @@ -155,7 +158,7 @@ func TestWatchPods(t *testing.T) { assert.Equal(t, "pod1", podOut.Name) // Trigger a removal - server.OnPodRemoved(pod1) + server.OnPodRemoved(logger, pod1) // Verify DELETED event event = <-mockStream.EventCh @@ -170,7 +173,8 @@ func TestWatchPods(t *testing.T) { } func TestListPods(t *testing.T) { - broadcaster := podsapi.NewBroadcaster() + tCtx := ktesting.Init(t) + broadcaster := podsapi.NewBroadcaster(tCtx) mockManager := new(kubepodtest.MockManager) mockStatus := new(statustest.MockPodStatusProvider) server := podsapi.NewPodsServerForTest(broadcaster, mockManager, mockStatus) @@ -180,7 +184,7 @@ func TestListPods(t *testing.T) { mockManager.On("GetPods").Return([]*v1.Pod{pod1, pod2}) mockStatus.On("GetPodStatus", mock.Anything).Return(v1.PodStatus{}, false) - resp, err := server.ListPods(context.Background(), &podsv1alpha1.ListPodsRequest{}) + resp, err := server.ListPods(tCtx, &podsv1alpha1.ListPodsRequest{}) require.NoError(t, err) pod1Out := &v1.Pod{} err = pod1Out.Unmarshal(resp.Pods[0]) @@ -193,7 +197,8 @@ func TestListPods(t *testing.T) { } func TestGetPod(t *testing.T) { - broadcaster := podsapi.NewBroadcaster() + tCtx := ktesting.Init(t) + broadcaster := podsapi.NewBroadcaster(tCtx) mockManager := new(kubepodtest.MockManager) mockStatus := new(statustest.MockPodStatusProvider) server := podsapi.NewPodsServerForTest(broadcaster, mockManager, mockStatus) @@ -214,7 +219,7 @@ func TestGetPod(t *testing.T) { mockManager.On("GetPodByUID", mock.Anything).Return(pod1, true) mockStatus.On("GetPodStatus", mock.Anything).Return(v1.PodStatus{}, false) - resp, err := server.GetPod(context.Background(), &podsv1alpha1.GetPodRequest{PodUID: "pod1-uid"}) + resp, err := server.GetPod(tCtx, &podsv1alpha1.GetPodRequest{PodUID: "pod1-uid"}) require.NoError(t, err) podOut := &v1.Pod{} err = podOut.Unmarshal(resp.Pod) @@ -230,7 +235,8 @@ func TestGetPod(t *testing.T) { } func TestStaticPod(t *testing.T) { - broadcaster := podsapi.NewBroadcaster() + tCtx := ktesting.Init(t) + broadcaster := podsapi.NewBroadcaster(tCtx) mockManager := new(kubepodtest.MockManager) mockStatus := new(statustest.MockPodStatusProvider) server := podsapi.NewPodsServerForTest(broadcaster, mockManager, mockStatus) @@ -249,7 +255,7 @@ func TestStaticPod(t *testing.T) { mockManager.On("GetPodByUID", types.UID("static-pod-uid")).Return(staticPod, true) mockStatus.On("GetPodStatus", mock.Anything).Return(v1.PodStatus{}, false) - resp, err := server.GetPod(context.Background(), &podsv1alpha1.GetPodRequest{PodUID: "static-pod-uid"}) + resp, err := server.GetPod(tCtx, &podsv1alpha1.GetPodRequest{PodUID: "static-pod-uid"}) require.NoError(t, err) podOut := &v1.Pod{} @@ -263,27 +269,28 @@ func TestErrorsAndMetrics(t *testing.T) { metrics.Register() t.Run("DroppedWatchEventIncrementsMetric", func(t *testing.T) { - broadcaster := podsapi.NewBroadcaster() + logger, tCtx := ktesting.NewTestContext(t) + broadcaster := podsapi.NewBroadcaster(tCtx) mockManager := new(kubepodtest.MockManager) mockStatus := new(statustest.MockPodStatusProvider) server := podsapi.NewPodsServerForTest(broadcaster, mockManager, mockStatus) pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "pod1-uid", Name: "pod1", Namespace: "ns1"}} mockStatus.On("GetPodStatus", mock.Anything).Return(v1.PodStatus{}, false) - server.OnPodUpdated(pod1, pod1.Status, true) + server.OnPodUpdated(logger, pod1, pod1.Status, true) // Reset the metric before the test metrics.PodWatchEventsDroppedTotal.Reset() clientChannel := make(chan podsapi.PodWatchEvent, 1) // Buffered channel of size 1 - broadcaster.Register(clientChannel) - defer broadcaster.Unregister(clientChannel) + broadcaster.Register(logger, clientChannel) + defer broadcaster.Unregister(logger, clientChannel) // Send two events. The first one should fill the buffer. - broadcaster.Broadcast(podsapi.PodWatchEvent{UID: "1"}) + broadcaster.Broadcast(logger, podsapi.PodWatchEvent{UID: "1"}) // The second one should be dropped and increment the metric. - broadcaster.Broadcast(podsapi.PodWatchEvent{UID: "2"}) + broadcaster.Broadcast(logger, podsapi.PodWatchEvent{UID: "2"}) // Wait for the background goroutine to process the events and update metrics - err := wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 2*time.Second, true, func(ctx context.Context) (bool, error) { + err := wait.PollUntilContextTimeout(tCtx, 10*time.Millisecond, 2*time.Second, true, func(ctx context.Context) (bool, error) { count, err := testutil.GetCounterMetricValue(metrics.PodWatchEventsDroppedTotal) if err != nil { return false, err @@ -336,21 +343,22 @@ func TestSerialize(t *testing.T) { } func TestBroadcaster_SlowClient(t *testing.T) { - broadcaster := podsapi.NewBroadcaster() + logger, tCtx := ktesting.NewTestContext(t) + broadcaster := podsapi.NewBroadcaster(tCtx) numFastClients := 3 numEvents := 10 fastClients := make([]chan podsapi.PodWatchEvent, numFastClients) for i := range numFastClients { fastClients[i] = make(chan podsapi.PodWatchEvent, numEvents) - broadcaster.Register(fastClients[i]) + broadcaster.Register(logger, fastClients[i]) } slowClient := make(chan podsapi.PodWatchEvent) - broadcaster.Register(slowClient) + broadcaster.Register(logger, slowClient) for i := range numEvents { - broadcaster.Broadcast(podsapi.PodWatchEvent{UID: types.UID(fmt.Sprintf("event-%d", i))}) + broadcaster.Broadcast(logger, podsapi.PodWatchEvent{UID: types.UID(fmt.Sprintf("event-%d", i))}) } for i := range numFastClients { @@ -365,7 +373,7 @@ func TestBroadcaster_SlowClient(t *testing.T) { } } - err := wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 2*time.Second, true, func(ctx context.Context) (bool, error) { + err := wait.PollUntilContextTimeout(tCtx, 10*time.Millisecond, 2*time.Second, true, func(ctx context.Context) (bool, error) { select { case _, ok := <-slowClient: if !ok { @@ -381,7 +389,8 @@ func TestBroadcaster_SlowClient(t *testing.T) { } func TestStatusOverlay(t *testing.T) { - broadcaster := podsapi.NewBroadcaster() + tCtx := ktesting.Init(t) + broadcaster := podsapi.NewBroadcaster(tCtx) mockManager := new(kubepodtest.MockManager) mockStatus := new(statustest.MockPodStatusProvider) server := podsapi.NewPodsServerForTest(broadcaster, mockManager, mockStatus) @@ -396,7 +405,7 @@ func TestStatusOverlay(t *testing.T) { mockStatus.On("GetPodStatus", pod1.UID).Return(overlaidStatus, true) t.Run("GetPod overlays status", func(t *testing.T) { - resp, err := server.GetPod(context.Background(), &podsv1alpha1.GetPodRequest{PodUID: string(pod1.UID)}) + resp, err := server.GetPod(tCtx, &podsv1alpha1.GetPodRequest{PodUID: string(pod1.UID)}) require.NoError(t, err) podOut := &v1.Pod{} err = podOut.Unmarshal(resp.Pod) @@ -405,7 +414,7 @@ func TestStatusOverlay(t *testing.T) { }) t.Run("ListPods overlays status", func(t *testing.T) { - resp, err := server.ListPods(context.Background(), &podsv1alpha1.ListPodsRequest{}) + resp, err := server.ListPods(tCtx, &podsv1alpha1.ListPodsRequest{}) require.NoError(t, err) podOut := &v1.Pod{} err = podOut.Unmarshal(resp.Pods[0]) @@ -414,10 +423,10 @@ func TestStatusOverlay(t *testing.T) { }) t.Run("WatchPods overlays status in initial sync", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + streamCtx, cancel := context.WithCancel(tCtx) defer cancel() mockStream := &MockWatchPodsServer{ - Ctx: ctx, + Ctx: streamCtx, EventCh: make(chan *podsv1alpha1.WatchPodsEvent, 10), } go func() { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 0580546f151..9379fe3e40c 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -697,7 +697,7 @@ func NewMainKubelet(ctx context.Context, klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet, kubeDeps.PodStartupLatencyTracker) if utilfeature.DefaultFeatureGate.Enabled(features.PodsAPI) { - broadcaster := pods.NewBroadcaster() + broadcaster := pods.NewBroadcaster(ctx) klet.podsServer = pods.NewPodsServer(broadcaster, klet.podManager, klet.statusManager) klet.statusManager.AddPodUpdateNotifier(klet.podsServer) } diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index bac491cb7d2..f5d8d1c8ac3 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -131,9 +131,9 @@ type PodStartupLatencyStateHelper interface { // PodUpdateNotifier is an interface for receiving pod status updates. type PodUpdateNotifier interface { // OnPodUpdated is called when a pod's status is updated. - OnPodUpdated(pod *v1.Pod, status v1.PodStatus, isAdded bool) + OnPodUpdated(logger klog.Logger, pod *v1.Pod, status v1.PodStatus, isAdded bool) // OnPodRemoved is called when a pod is terminated. - OnPodRemoved(pod *v1.Pod) + OnPodRemoved(logger klog.Logger, pod *v1.Pod) } type podStatusNotification struct { @@ -465,7 +465,7 @@ func (m *manager) SetPodStatus(logger klog.Logger, pod *v1.Pod, status v1.PodSta var notification *podStatusNotification defer func() { if notification != nil { - m.sendNotification(notification) + m.sendNotification(logger, notification) } }() @@ -491,7 +491,7 @@ func (m *manager) SetContainerReadiness(logger klog.Logger, podUID types.UID, co var notification *podStatusNotification defer func() { if notification != nil { - m.sendNotification(notification) + m.sendNotification(logger, notification) } }() @@ -561,7 +561,7 @@ func (m *manager) SetContainerStartup(logger klog.Logger, podUID types.UID, cont var notification *podStatusNotification defer func() { if notification != nil { - m.sendNotification(notification) + m.sendNotification(logger, notification) } }() @@ -638,7 +638,7 @@ func (m *manager) TerminatePod(logger klog.Logger, pod *v1.Pod) { var notification *podStatusNotification defer func() { if notification != nil { - m.sendNotification(notification) + m.sendNotification(logger, notification) } }() @@ -1011,12 +1011,12 @@ func (m *manager) updateStatusInternal(logger klog.Logger, pod *v1.Pod, status v return true, notification } -func (m *manager) sendNotification(n *podStatusNotification) { +func (m *manager) sendNotification(logger klog.Logger, n *podStatusNotification) { for _, notifier := range m.notifiers { if !n.podIsFinished { - notifier.OnPodUpdated(n.pod, n.status, n.isAdded) + notifier.OnPodUpdated(logger, n.pod, n.status, n.isAdded) } else { - notifier.OnPodRemoved(n.pod) + notifier.OnPodRemoved(logger, n.pod) } } }