Merge pull request #138876 from hoteye/cleanup-kubelet-pods-server-logger

kubelet: pass logger through pod API server
This commit is contained in:
Kubernetes Prow Robot 2026-05-13 05:32:28 +05:30 committed by GitHub
commit 7a4ea9be33
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 67 additions and 63 deletions

View file

@ -48,17 +48,17 @@ type broadcaster struct {
incoming chan PodWatchEvent
}
func NewBroadcaster() *broadcaster {
func NewBroadcaster(ctx context.Context) *broadcaster {
b := &broadcaster{
clients: make(map[chan PodWatchEvent]struct{}),
incoming: make(chan PodWatchEvent, 1000),
}
go b.run()
go b.run(ctx)
return b
}
func (b *broadcaster) run() {
logger := klog.FromContext(context.Background())
func (b *broadcaster) run(ctx context.Context) {
logger := klog.FromContext(ctx)
for event := range b.incoming {
b.lock.RLock()
// We collect clients to drop to avoid modifying the map during iteration
@ -89,31 +89,28 @@ func (b *broadcaster) run() {
}
}
func (b *broadcaster) Register(client chan PodWatchEvent) {
func (b *broadcaster) Register(logger klog.Logger, client chan PodWatchEvent) {
b.lock.Lock()
defer b.lock.Unlock()
b.clients[client] = struct{}{}
logger := klog.FromContext(context.Background())
logger.Info("Registered new watch client", "totalClients", len(b.clients))
}
func (b *broadcaster) Unregister(client chan PodWatchEvent) {
func (b *broadcaster) Unregister(logger klog.Logger, client chan PodWatchEvent) {
b.lock.Lock()
defer b.lock.Unlock()
if _, ok := b.clients[client]; ok {
delete(b.clients, client)
close(client)
}
logger := klog.FromContext(context.Background())
logger.Info("Unregistered watch client", "totalClients", len(b.clients))
}
func (b *broadcaster) Broadcast(event PodWatchEvent) {
func (b *broadcaster) Broadcast(logger klog.Logger, event PodWatchEvent) {
select {
case b.incoming <- event:
default:
// This should realistically never happen because run() purges slow clients.
logger := klog.FromContext(context.Background())
logger.Info("Broadcaster internal buffer full, dropping event.")
metrics.PodWatchEventsDroppedTotal.Inc()
}
@ -136,7 +133,7 @@ func NewPodsServer(broadcaster *broadcaster, podManager pod.Manager, statusProvi
}
}
// NewPodsServerForTest creates a new PodServer with an injectable ticker for testing.
// NewPodsServerForTest creates a new PodServer for testing.
func NewPodsServerForTest(broadcaster *broadcaster, podManager pod.Manager, statusProvider podstatus.PodStatusProvider) *PodsServer {
return &PodsServer{
podManager: podManager,
@ -146,20 +143,19 @@ func NewPodsServerForTest(broadcaster *broadcaster, podManager pod.Manager, stat
}
// OnPodUpdated is called when a pod's spec and status are coherent.
func (s *PodsServer) OnPodUpdated(pod *v1.Pod, status v1.PodStatus, isAdded bool) {
func (s *PodsServer) OnPodUpdated(logger klog.Logger, pod *v1.Pod, status v1.PodStatus, isAdded bool) {
eventType := watch.Modified
if isAdded {
eventType = watch.Added
}
podCopy := *pod
podCopy.Status = status
s.broadcaster.Broadcast(PodWatchEvent{Type: eventType, UID: pod.UID, Pod: &podCopy})
logger := klog.FromContext(context.Background())
s.broadcaster.Broadcast(logger, PodWatchEvent{Type: eventType, UID: pod.UID, Pod: &podCopy})
logger.Info("Pod update broadcasted", "podUID", pod.UID, "type", eventType)
}
// OnPodRemoved is called when a pod is removed.
func (s *PodsServer) OnPodRemoved(pod *v1.Pod) {
func (s *PodsServer) OnPodRemoved(logger klog.Logger, pod *v1.Pod) {
minimalPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
@ -167,8 +163,7 @@ func (s *PodsServer) OnPodRemoved(pod *v1.Pod) {
UID: pod.UID,
},
}
s.broadcaster.Broadcast(PodWatchEvent{Type: watch.Deleted, UID: pod.UID, Pod: minimalPod})
logger := klog.FromContext(context.Background())
s.broadcaster.Broadcast(logger, PodWatchEvent{Type: watch.Deleted, UID: pod.UID, Pod: minimalPod})
logger.Info("Pod removed broadcasted", "podUID", pod.UID)
}
@ -237,9 +232,9 @@ func (s *PodsServer) WatchPods(req *podsv1alpha1.WatchPodsRequest, stream podsv1
logger.Info("WatchPods called", "client", clientAddr)
clientChannel := make(chan PodWatchEvent, 100)
s.broadcaster.Register(clientChannel)
s.broadcaster.Register(logger, clientChannel)
defer func() {
s.broadcaster.Unregister(clientChannel)
s.broadcaster.Unregister(logger, clientChannel)
logger.Info("Watch client disconnected", "client", clientAddr)
}()

View file

@ -45,22 +45,24 @@ import (
"k8s.io/kubernetes/pkg/kubelet/metrics"
kubepodtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
"k8s.io/kubernetes/test/utils/ktesting"
)
func TestStartEventLoop(t *testing.T) {
broadcaster := podsapi.NewBroadcaster()
logger, tCtx := ktesting.NewTestContext(t)
broadcaster := podsapi.NewBroadcaster(tCtx)
mockManager := new(kubepodtest.MockManager)
mockStatus := new(statustest.MockPodStatusProvider)
server := podsapi.NewPodsServerForTest(broadcaster, mockManager, mockStatus)
pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "pod1-uid", Name: "pod1", Namespace: "ns1"}}
clientChannel := make(chan podsapi.PodWatchEvent, 100)
broadcaster.Register(clientChannel)
defer broadcaster.Unregister(clientChannel)
broadcaster.Register(logger, clientChannel)
defer broadcaster.Unregister(logger, clientChannel)
server.OnPodUpdated(pod1, v1.PodStatus{Phase: v1.PodPending}, true)
server.OnPodUpdated(pod1, v1.PodStatus{Phase: v1.PodSucceeded}, false)
server.OnPodRemoved(pod1)
server.OnPodUpdated(logger, pod1, v1.PodStatus{Phase: v1.PodPending}, true)
server.OnPodUpdated(logger, pod1, v1.PodStatus{Phase: v1.PodSucceeded}, false)
server.OnPodRemoved(logger, pod1)
event := <-clientChannel
assert.Equal(t, watch.Added, event.Type)
@ -108,7 +110,8 @@ func (m *MockWatchPodsServer) Context() context.Context {
}
func TestWatchPods(t *testing.T) {
broadcaster := podsapi.NewBroadcaster()
logger, tCtx := ktesting.NewTestContext(t)
broadcaster := podsapi.NewBroadcaster(tCtx)
mockManager := new(kubepodtest.MockManager)
mockStatus := new(statustest.MockPodStatusProvider)
server := podsapi.NewPodsServerForTest(broadcaster, mockManager, mockStatus)
@ -118,11 +121,11 @@ func TestWatchPods(t *testing.T) {
mockManager.On("GetPodByUID", types.UID("pod1-uid")).Return(pod1, true)
mockStatus.On("GetPodStatus", mock.Anything).Return(v1.PodStatus{}, false)
ctx, cancel := context.WithCancel(context.Background())
streamCtx, cancel := context.WithCancel(tCtx)
defer cancel()
mockStream := &MockWatchPodsServer{
Ctx: ctx,
Ctx: streamCtx,
EventCh: make(chan *podsv1alpha1.WatchPodsEvent, 10),
}
@ -144,7 +147,7 @@ func TestWatchPods(t *testing.T) {
assert.Empty(t, event.Pod)
// Trigger an update
server.OnPodUpdated(pod1, pod1.Status, false)
server.OnPodUpdated(logger, pod1, pod1.Status, false)
// Verify MODIFIED event
event = <-mockStream.EventCh
@ -155,7 +158,7 @@ func TestWatchPods(t *testing.T) {
assert.Equal(t, "pod1", podOut.Name)
// Trigger a removal
server.OnPodRemoved(pod1)
server.OnPodRemoved(logger, pod1)
// Verify DELETED event
event = <-mockStream.EventCh
@ -170,7 +173,8 @@ func TestWatchPods(t *testing.T) {
}
func TestListPods(t *testing.T) {
broadcaster := podsapi.NewBroadcaster()
tCtx := ktesting.Init(t)
broadcaster := podsapi.NewBroadcaster(tCtx)
mockManager := new(kubepodtest.MockManager)
mockStatus := new(statustest.MockPodStatusProvider)
server := podsapi.NewPodsServerForTest(broadcaster, mockManager, mockStatus)
@ -180,7 +184,7 @@ func TestListPods(t *testing.T) {
mockManager.On("GetPods").Return([]*v1.Pod{pod1, pod2})
mockStatus.On("GetPodStatus", mock.Anything).Return(v1.PodStatus{}, false)
resp, err := server.ListPods(context.Background(), &podsv1alpha1.ListPodsRequest{})
resp, err := server.ListPods(tCtx, &podsv1alpha1.ListPodsRequest{})
require.NoError(t, err)
pod1Out := &v1.Pod{}
err = pod1Out.Unmarshal(resp.Pods[0])
@ -193,7 +197,8 @@ func TestListPods(t *testing.T) {
}
func TestGetPod(t *testing.T) {
broadcaster := podsapi.NewBroadcaster()
tCtx := ktesting.Init(t)
broadcaster := podsapi.NewBroadcaster(tCtx)
mockManager := new(kubepodtest.MockManager)
mockStatus := new(statustest.MockPodStatusProvider)
server := podsapi.NewPodsServerForTest(broadcaster, mockManager, mockStatus)
@ -214,7 +219,7 @@ func TestGetPod(t *testing.T) {
mockManager.On("GetPodByUID", mock.Anything).Return(pod1, true)
mockStatus.On("GetPodStatus", mock.Anything).Return(v1.PodStatus{}, false)
resp, err := server.GetPod(context.Background(), &podsv1alpha1.GetPodRequest{PodUID: "pod1-uid"})
resp, err := server.GetPod(tCtx, &podsv1alpha1.GetPodRequest{PodUID: "pod1-uid"})
require.NoError(t, err)
podOut := &v1.Pod{}
err = podOut.Unmarshal(resp.Pod)
@ -230,7 +235,8 @@ func TestGetPod(t *testing.T) {
}
func TestStaticPod(t *testing.T) {
broadcaster := podsapi.NewBroadcaster()
tCtx := ktesting.Init(t)
broadcaster := podsapi.NewBroadcaster(tCtx)
mockManager := new(kubepodtest.MockManager)
mockStatus := new(statustest.MockPodStatusProvider)
server := podsapi.NewPodsServerForTest(broadcaster, mockManager, mockStatus)
@ -249,7 +255,7 @@ func TestStaticPod(t *testing.T) {
mockManager.On("GetPodByUID", types.UID("static-pod-uid")).Return(staticPod, true)
mockStatus.On("GetPodStatus", mock.Anything).Return(v1.PodStatus{}, false)
resp, err := server.GetPod(context.Background(), &podsv1alpha1.GetPodRequest{PodUID: "static-pod-uid"})
resp, err := server.GetPod(tCtx, &podsv1alpha1.GetPodRequest{PodUID: "static-pod-uid"})
require.NoError(t, err)
podOut := &v1.Pod{}
@ -263,27 +269,28 @@ func TestErrorsAndMetrics(t *testing.T) {
metrics.Register()
t.Run("DroppedWatchEventIncrementsMetric", func(t *testing.T) {
broadcaster := podsapi.NewBroadcaster()
logger, tCtx := ktesting.NewTestContext(t)
broadcaster := podsapi.NewBroadcaster(tCtx)
mockManager := new(kubepodtest.MockManager)
mockStatus := new(statustest.MockPodStatusProvider)
server := podsapi.NewPodsServerForTest(broadcaster, mockManager, mockStatus)
pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "pod1-uid", Name: "pod1", Namespace: "ns1"}}
mockStatus.On("GetPodStatus", mock.Anything).Return(v1.PodStatus{}, false)
server.OnPodUpdated(pod1, pod1.Status, true)
server.OnPodUpdated(logger, pod1, pod1.Status, true)
// Reset the metric before the test
metrics.PodWatchEventsDroppedTotal.Reset()
clientChannel := make(chan podsapi.PodWatchEvent, 1) // Buffered channel of size 1
broadcaster.Register(clientChannel)
defer broadcaster.Unregister(clientChannel)
broadcaster.Register(logger, clientChannel)
defer broadcaster.Unregister(logger, clientChannel)
// Send two events. The first one should fill the buffer.
broadcaster.Broadcast(podsapi.PodWatchEvent{UID: "1"})
broadcaster.Broadcast(logger, podsapi.PodWatchEvent{UID: "1"})
// The second one should be dropped and increment the metric.
broadcaster.Broadcast(podsapi.PodWatchEvent{UID: "2"})
broadcaster.Broadcast(logger, podsapi.PodWatchEvent{UID: "2"})
// Wait for the background goroutine to process the events and update metrics
err := wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 2*time.Second, true, func(ctx context.Context) (bool, error) {
err := wait.PollUntilContextTimeout(tCtx, 10*time.Millisecond, 2*time.Second, true, func(ctx context.Context) (bool, error) {
count, err := testutil.GetCounterMetricValue(metrics.PodWatchEventsDroppedTotal)
if err != nil {
return false, err
@ -336,21 +343,22 @@ func TestSerialize(t *testing.T) {
}
func TestBroadcaster_SlowClient(t *testing.T) {
broadcaster := podsapi.NewBroadcaster()
logger, tCtx := ktesting.NewTestContext(t)
broadcaster := podsapi.NewBroadcaster(tCtx)
numFastClients := 3
numEvents := 10
fastClients := make([]chan podsapi.PodWatchEvent, numFastClients)
for i := range numFastClients {
fastClients[i] = make(chan podsapi.PodWatchEvent, numEvents)
broadcaster.Register(fastClients[i])
broadcaster.Register(logger, fastClients[i])
}
slowClient := make(chan podsapi.PodWatchEvent)
broadcaster.Register(slowClient)
broadcaster.Register(logger, slowClient)
for i := range numEvents {
broadcaster.Broadcast(podsapi.PodWatchEvent{UID: types.UID(fmt.Sprintf("event-%d", i))})
broadcaster.Broadcast(logger, podsapi.PodWatchEvent{UID: types.UID(fmt.Sprintf("event-%d", i))})
}
for i := range numFastClients {
@ -365,7 +373,7 @@ func TestBroadcaster_SlowClient(t *testing.T) {
}
}
err := wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 2*time.Second, true, func(ctx context.Context) (bool, error) {
err := wait.PollUntilContextTimeout(tCtx, 10*time.Millisecond, 2*time.Second, true, func(ctx context.Context) (bool, error) {
select {
case _, ok := <-slowClient:
if !ok {
@ -381,7 +389,8 @@ func TestBroadcaster_SlowClient(t *testing.T) {
}
func TestStatusOverlay(t *testing.T) {
broadcaster := podsapi.NewBroadcaster()
tCtx := ktesting.Init(t)
broadcaster := podsapi.NewBroadcaster(tCtx)
mockManager := new(kubepodtest.MockManager)
mockStatus := new(statustest.MockPodStatusProvider)
server := podsapi.NewPodsServerForTest(broadcaster, mockManager, mockStatus)
@ -396,7 +405,7 @@ func TestStatusOverlay(t *testing.T) {
mockStatus.On("GetPodStatus", pod1.UID).Return(overlaidStatus, true)
t.Run("GetPod overlays status", func(t *testing.T) {
resp, err := server.GetPod(context.Background(), &podsv1alpha1.GetPodRequest{PodUID: string(pod1.UID)})
resp, err := server.GetPod(tCtx, &podsv1alpha1.GetPodRequest{PodUID: string(pod1.UID)})
require.NoError(t, err)
podOut := &v1.Pod{}
err = podOut.Unmarshal(resp.Pod)
@ -405,7 +414,7 @@ func TestStatusOverlay(t *testing.T) {
})
t.Run("ListPods overlays status", func(t *testing.T) {
resp, err := server.ListPods(context.Background(), &podsv1alpha1.ListPodsRequest{})
resp, err := server.ListPods(tCtx, &podsv1alpha1.ListPodsRequest{})
require.NoError(t, err)
podOut := &v1.Pod{}
err = podOut.Unmarshal(resp.Pods[0])
@ -414,10 +423,10 @@ func TestStatusOverlay(t *testing.T) {
})
t.Run("WatchPods overlays status in initial sync", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
streamCtx, cancel := context.WithCancel(tCtx)
defer cancel()
mockStream := &MockWatchPodsServer{
Ctx: ctx,
Ctx: streamCtx,
EventCh: make(chan *podsv1alpha1.WatchPodsEvent, 10),
}
go func() {

View file

@ -697,7 +697,7 @@ func NewMainKubelet(ctx context.Context,
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet, kubeDeps.PodStartupLatencyTracker)
if utilfeature.DefaultFeatureGate.Enabled(features.PodsAPI) {
broadcaster := pods.NewBroadcaster()
broadcaster := pods.NewBroadcaster(ctx)
klet.podsServer = pods.NewPodsServer(broadcaster, klet.podManager, klet.statusManager)
klet.statusManager.AddPodUpdateNotifier(klet.podsServer)
}

View file

@ -131,9 +131,9 @@ type PodStartupLatencyStateHelper interface {
// PodUpdateNotifier is an interface for receiving pod status updates.
type PodUpdateNotifier interface {
// OnPodUpdated is called when a pod's status is updated.
OnPodUpdated(pod *v1.Pod, status v1.PodStatus, isAdded bool)
OnPodUpdated(logger klog.Logger, pod *v1.Pod, status v1.PodStatus, isAdded bool)
// OnPodRemoved is called when a pod is terminated.
OnPodRemoved(pod *v1.Pod)
OnPodRemoved(logger klog.Logger, pod *v1.Pod)
}
type podStatusNotification struct {
@ -465,7 +465,7 @@ func (m *manager) SetPodStatus(logger klog.Logger, pod *v1.Pod, status v1.PodSta
var notification *podStatusNotification
defer func() {
if notification != nil {
m.sendNotification(notification)
m.sendNotification(logger, notification)
}
}()
@ -491,7 +491,7 @@ func (m *manager) SetContainerReadiness(logger klog.Logger, podUID types.UID, co
var notification *podStatusNotification
defer func() {
if notification != nil {
m.sendNotification(notification)
m.sendNotification(logger, notification)
}
}()
@ -561,7 +561,7 @@ func (m *manager) SetContainerStartup(logger klog.Logger, podUID types.UID, cont
var notification *podStatusNotification
defer func() {
if notification != nil {
m.sendNotification(notification)
m.sendNotification(logger, notification)
}
}()
@ -638,7 +638,7 @@ func (m *manager) TerminatePod(logger klog.Logger, pod *v1.Pod) {
var notification *podStatusNotification
defer func() {
if notification != nil {
m.sendNotification(notification)
m.sendNotification(logger, notification)
}
}()
@ -1011,12 +1011,12 @@ func (m *manager) updateStatusInternal(logger klog.Logger, pod *v1.Pod, status v
return true, notification
}
func (m *manager) sendNotification(n *podStatusNotification) {
func (m *manager) sendNotification(logger klog.Logger, n *podStatusNotification) {
for _, notifier := range m.notifiers {
if !n.podIsFinished {
notifier.OnPodUpdated(n.pod, n.status, n.isAdded)
notifier.OnPodUpdated(logger, n.pod, n.status, n.isAdded)
} else {
notifier.OnPodRemoved(n.pod)
notifier.OnPodRemoved(logger, n.pod)
}
}
}