From fd330c303da4816e44a264cecfbb83f1ee598ada Mon Sep 17 00:00:00 2001 From: Brian Sonnenberg Date: Tue, 16 Dec 2025 20:29:55 +0000 Subject: [PATCH] Refactor PodsServer to use PodManager as source of truth - Fixed version in kube_features.go after rebase (1.35->1.36) - Removed internal pod cache in PodsServer to reduce memory footprint and avoid duplication. - Injected pod.Manager into PodsServer to serve as the single source of truth for pod data. - Refactored WatchPods to broadcast UIDs and fetch fresh pod data from podManager, ensuring consistency. - Updated convertWatchEventType to safely handle unknown event types. - Refactored unit tests to use MockManager and added a test case for static pods. - Updated e2e suite with static pod test --- pkg/features/kube_features.go | 2 +- pkg/kubelet/apis/pods/server.go | 117 ++++++++++------------ pkg/kubelet/apis/pods/server_unit_test.go | 84 +++++++++++----- pkg/kubelet/kubelet.go | 2 +- test/e2e_node/pods_api_test.go | 92 +++++++++++++++++ 5 files changed, 208 insertions(+), 89 deletions(-) diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 5c5b94b11a3..5c51fcd0cb3 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -1734,7 +1734,7 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate }, PodsAPI: { - {Version: version.MustParse("1.35"), Default: false, PreRelease: featuregate.Alpha}, + {Version: version.MustParse("1.36"), Default: false, PreRelease: featuregate.Alpha}, }, PortForwardWebsockets: { diff --git a/pkg/kubelet/apis/pods/server.go b/pkg/kubelet/apis/pods/server.go index d94eb8a8484..85413e29644 100644 --- a/pkg/kubelet/apis/pods/server.go +++ b/pkg/kubelet/apis/pods/server.go @@ -31,10 +31,12 @@ import ( "k8s.io/klog/v2" podsv1alpha1 "k8s.io/kubelet/pkg/apis/pods/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/metrics" + "k8s.io/kubernetes/pkg/kubelet/pod" ) type PodWatchEvent struct { Type watch.EventType + UID types.UID Pod *v1.Pod } @@ -82,92 +84,52 @@ func (b *broadcaster) Broadcast(event PodWatchEvent) { // PodsServer is the gRPC server that provides pod information. type PodsServer struct { podsv1alpha1.UnimplementedPodsServer - lock sync.RWMutex - pods map[types.UID]*v1.Pod + podManager pod.Manager broadcaster *broadcaster } // NewPodsServer creates a new PodServer for production use. -func NewPodsServer(broadcaster *broadcaster) *PodsServer { +func NewPodsServer(broadcaster *broadcaster, podManager pod.Manager) *PodsServer { return &PodsServer{ - pods: make(map[types.UID]*v1.Pod), + podManager: podManager, broadcaster: broadcaster, } } // NewPodsServerForTest creates a new PodServer with an injectable ticker for testing. -func NewPodsServerForTest(broadcaster *broadcaster) *PodsServer { +func NewPodsServerForTest(broadcaster *broadcaster, podManager pod.Manager) *PodsServer { return &PodsServer{ - pods: make(map[types.UID]*v1.Pod), + podManager: podManager, broadcaster: broadcaster, } } // OnPodAdded is called when a pod is added. func (s *PodsServer) OnPodAdded(pod *v1.Pod) { - s.lock.Lock() - defer s.lock.Unlock() - s.pods[pod.UID] = pod.DeepCopy() - s.broadcaster.Broadcast(PodWatchEvent{Type: watch.Added, Pod: pod}) + s.broadcaster.Broadcast(PodWatchEvent{Type: watch.Added, UID: pod.UID}) logger := klog.FromContext(context.Background()) logger.Info("Pod added to storage", "podUID", pod.UID) } // OnPodUpdated is called when a pod is updated. func (s *PodsServer) OnPodUpdated(pod *v1.Pod) { - s.lock.Lock() - defer s.lock.Unlock() - s.pods[pod.UID] = pod.DeepCopy() - s.broadcaster.Broadcast(PodWatchEvent{Type: watch.Modified, Pod: pod}) + s.broadcaster.Broadcast(PodWatchEvent{Type: watch.Modified, UID: pod.UID}) logger := klog.FromContext(context.Background()) logger.Info("Pod updated in storage", "podUID", pod.UID) } // OnPodRemoved is called when a pod is removed. func (s *PodsServer) OnPodRemoved(pod *v1.Pod) { - s.lock.Lock() - defer s.lock.Unlock() - delete(s.pods, pod.UID) - s.broadcaster.Broadcast(PodWatchEvent{Type: watch.Deleted, Pod: pod.DeepCopy()}) + s.broadcaster.Broadcast(PodWatchEvent{Type: watch.Deleted, UID: pod.UID, Pod: pod}) logger := klog.FromContext(context.Background()) logger.Info("Pod removed from storage", "podUID", pod.UID) } // OnPodStatusUpdated is called when a pod's status is updated. func (s *PodsServer) OnPodStatusUpdated(pod *v1.Pod, status v1.PodStatus) { - s.lock.Lock() - defer s.lock.Unlock() - if storedPod, ok := s.pods[pod.UID]; ok { - storedPod.Status = status - s.broadcaster.Broadcast(PodWatchEvent{Type: watch.Modified, Pod: storedPod}) - logger := klog.FromContext(context.Background()) - logger.Info("Pod status updated in storage", "podUID", pod.UID) - } -} - -// Get returns a pod by UID. -func (s *PodsServer) Get(uid types.UID) (*v1.Pod, bool) { - s.lock.RLock() - defer s.lock.RUnlock() - pod, ok := s.pods[uid] - if !ok { - return nil, false - } - return pod.DeepCopy(), true -} - -// List returns all pods. -func (s *PodsServer) List() []*v1.Pod { - s.lock.RLock() - defer s.lock.RUnlock() - pods := make([]*v1.Pod, 0, len(s.pods)) - for _, pod := range s.pods { - pods = append(pods, pod.DeepCopy()) - } - sort.Slice(pods, func(i, j int) bool { - return pods[i].UID < pods[j].UID - }) - return pods + s.broadcaster.Broadcast(PodWatchEvent{Type: watch.Modified, UID: pod.UID}) + logger := klog.FromContext(context.Background()) + logger.Info("Pod status updated in storage", "podUID", pod.UID) } // ListPods returns a list of pods. @@ -176,7 +138,10 @@ func (s *PodsServer) ListPods(ctx context.Context, req *podsv1alpha1.ListPodsReq logger.Info("ListPods called") // TODO: Implement filtering based on req.Filter, pagination with req.PageToken and req.PageSize - podsToReturn := s.List() + podsToReturn := s.podManager.GetPods() + sort.Slice(podsToReturn, func(i, j int) bool { + return podsToReturn[i].UID < podsToReturn[j].UID + }) protoPods := make([][]byte, len(podsToReturn)) for i, p := range podsToReturn { @@ -196,7 +161,7 @@ func (s *PodsServer) GetPod(ctx context.Context, req *podsv1alpha1.GetPodRequest logger.Info("GetPod called", "podUID", req.PodUID) podUID := types.UID(req.PodUID) - pod, ok := s.Get(podUID) + pod, ok := s.podManager.GetPodByUID(podUID) if !ok { return nil, status.Errorf(codes.NotFound, "pod with UID %s not found", req.PodUID) } @@ -226,7 +191,11 @@ func (s *PodsServer) WatchPods(req *podsv1alpha1.WatchPodsRequest, stream podsv1 }() // Send initial ADDED events - initialPods := s.List() + initialPods := s.podManager.GetPods() + sort.Slice(initialPods, func(i, j int) bool { + return initialPods[i].UID < initialPods[j].UID + }) + for _, p := range initialPods { podBytes, err := p.Marshal() if err != nil { @@ -249,14 +218,38 @@ func (s *PodsServer) WatchPods(req *podsv1alpha1.WatchPodsRequest, stream podsv1 logger.Info("Watch context cancelled", "client", clientAddr) return stream.Context().Err() case event := <-clientChannel: - podBytes, err := event.Pod.Marshal() + var podToMarshal *v1.Pod + if event.Type == watch.Deleted { + podToMarshal = event.Pod + } else { + p, ok := s.podManager.GetPodByUID(event.UID) + if ok { + podToMarshal = p + } else { + logger.Info("Pod not found in manager during watch event processing", "uid", event.UID, "type", event.Type) + continue + } + } + + if podToMarshal == nil { + continue + } + + podBytes, err := podToMarshal.Marshal() if err != nil { logger.Error(err, "Error marshalling watch event pod") metrics.PodWatchEventsDroppedTotal.Inc() continue } + + eventType, err := convertWatchEventType(event.Type) + if err != nil { + logger.Error(err, "Unknown watch event type") + continue + } + if err := stream.Send(&podsv1alpha1.WatchPodsEvent{ - Type: convertWatchEventType(event.Type), + Type: eventType, Pod: podBytes, }); err != nil { logger.Error(err, "Error sending watch event to client", "client", clientAddr) @@ -266,15 +259,15 @@ func (s *PodsServer) WatchPods(req *podsv1alpha1.WatchPodsRequest, stream podsv1 } } -func convertWatchEventType(watchType watch.EventType) podsv1alpha1.EventType { +func convertWatchEventType(watchType watch.EventType) (podsv1alpha1.EventType, error) { switch watchType { case watch.Added: - return podsv1alpha1.EventType_ADDED + return podsv1alpha1.EventType_ADDED, nil case watch.Modified: - return podsv1alpha1.EventType_MODIFIED + return podsv1alpha1.EventType_MODIFIED, nil case watch.Deleted: - return podsv1alpha1.EventType_DELETED + return podsv1alpha1.EventType_DELETED, nil default: - return podsv1alpha1.EventType_ADDED + return podsv1alpha1.EventType_ADDED, status.Errorf(codes.Internal, "unknown watch event type: %v", watchType) } -} +} \ No newline at end of file diff --git a/pkg/kubelet/apis/pods/server_unit_test.go b/pkg/kubelet/apis/pods/server_unit_test.go index 8e86422ee12..003398dfb69 100644 --- a/pkg/kubelet/apis/pods/server_unit_test.go +++ b/pkg/kubelet/apis/pods/server_unit_test.go @@ -30,50 +30,53 @@ import ( apiequality "k8s.io/apimachinery/pkg/api/equality" metafuzzer "k8s.io/apimachinery/pkg/apis/meta/fuzzer" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/component-base/metrics/testutil" podsv1alpha1 "k8s.io/kubelet/pkg/apis/pods/v1alpha1" "k8s.io/kubernetes/pkg/api/legacyscheme" corefuzzer "k8s.io/kubernetes/pkg/apis/core/fuzzer" - podsapi "k8s.io/kubernetes/pkg/kubelet/apis/pods" + podsapi "k8s.io/kubernetes/pkg/kubele t/apis/pods" "k8s.io/kubernetes/pkg/kubelet/metrics" + kubepodtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" ) func TestStartEventLoop(t *testing.T) { broadcaster := podsapi.NewBroadcaster() - server := podsapi.NewPodsServerForTest(broadcaster) + mockManager := new(kubepodtest.MockManager) + server := podsapi.NewPodsServerForTest(broadcaster, mockManager) pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "pod1-uid", Name: "pod1", Namespace: "ns1"}} - pod1Running := pod1.DeepCopy() - pod1Running.Status = v1.PodStatus{Phase: v1.PodRunning} - pod1Succeeded := pod1.DeepCopy() - pod1Succeeded.Status = v1.PodStatus{Phase: v1.PodSucceeded} + clientChannel := make(chan podsapi.PodWatchEvent, 100) broadcaster.Register(clientChannel) defer broadcaster.Unregister(clientChannel) - server.OnPodAdded(pod1Running) - server.OnPodStatusUpdated(pod1, pod1Succeeded.Status) + + server.OnPodAdded(pod1) + server.OnPodStatusUpdated(pod1, v1.PodStatus{Phase: v1.PodSucceeded}) server.OnPodRemoved(pod1) + event := <-clientChannel assert.Equal(t, "ADDED", string(event.Type)) - assert.Equal(t, pod1.UID, event.Pod.UID) + assert.Equal(t, pod1.UID, event.UID) + event = <-clientChannel assert.Equal(t, "MODIFIED", string(event.Type)) - assert.Equal(t, v1.PodSucceeded, event.Pod.Status.Phase) + assert.Equal(t, pod1.UID, event.UID) + event = <-clientChannel assert.Equal(t, "DELETED", string(event.Type)) - assert.Equal(t, pod1.UID, event.Pod.UID) + assert.Equal(t, pod1.UID, event.UID) + assert.Equal(t, pod1, event.Pod) } func TestListPods(t *testing.T) { broadcaster := podsapi.NewBroadcaster() - server := podsapi.NewPodsServerForTest(broadcaster) + mockManager := new(kubepodtest.MockManager) + server := podsapi.NewPodsServerForTest(broadcaster, mockManager) pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "pod1-uid", Name: "pod1", Namespace: "ns1"}} pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "pod2-uid", Name: "pod2", Namespace: "ns2"}} - status1 := v1.PodStatus{Phase: v1.PodRunning} - status2 := v1.PodStatus{Phase: v1.PodSucceeded} - server.OnPodAdded(pod1) - server.OnPodAdded(pod2) - server.OnPodStatusUpdated(pod1, status1) - server.OnPodStatusUpdated(pod2, status2) + + mockManager.On("GetPods").Return([]*v1.Pod{pod1, pod2}) + resp, err := server.ListPods(context.Background(), &podsv1alpha1.ListPodsRequest{}) require.NoError(t, err) pod1Out := &v1.Pod{} @@ -88,7 +91,8 @@ func TestListPods(t *testing.T) { func TestGetPod(t *testing.T) { broadcaster := podsapi.NewBroadcaster() - server := podsapi.NewPodsServerForTest(broadcaster) + mockManager := new(kubepodtest.MockManager) + server := podsapi.NewPodsServerForTest(broadcaster, mockManager) pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "pod1-uid", Name: "pod1", Namespace: "ns1"}} pod1.Spec.EphemeralContainers = []v1.EphemeralContainer{ { @@ -101,9 +105,10 @@ func TestGetPod(t *testing.T) { }, }, } - status1 := v1.PodStatus{Phase: v1.PodRunning} - server.OnPodAdded(pod1) - server.OnPodStatusUpdated(pod1, status1) + pod1.Status = v1.PodStatus{Phase: v1.PodRunning} + + mockManager.On("GetPodByUID", types.UID("pod1-uid")).Return(pod1, true) + resp, err := server.GetPod(context.Background(), &podsv1alpha1.GetPodRequest{PodUID: "pod1-uid"}) require.NoError(t, err) podOut := &v1.Pod{} @@ -119,12 +124,41 @@ func TestGetPod(t *testing.T) { assert.True(t, podOut.Spec.EphemeralContainers[0].TTY) } +func TestStaticPod(t *testing.T) { + broadcaster := podsapi.NewBroadcaster() + mockManager := new(kubepodtest.MockManager) + server := podsapi.NewPodsServerForTest(broadcaster, mockManager) + + staticPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "static-pod-uid", + Name: "static-pod", + Namespace: "default", + Annotations: map[string]string{ + "kubernetes.io/config.source": "file", + }, + }, + } + + mockManager.On("GetPodByUID", types.UID("static-pod-uid")).Return(staticPod, true) + + resp, err := server.GetPod(context.Background(), &podsv1alpha1.GetPodRequest{PodUID: "static-pod-uid"}) + require.NoError(t, err) + + podOut := &v1.Pod{} + err = podOut.Unmarshal(resp.Pod) + require.NoError(t, err) + assert.Equal(t, "static-pod", podOut.Name) + assert.Equal(t, "file", podOut.Annotations["kubernetes.io/config.source"]) +} + func TestErrorsAndMetrics(t *testing.T) { metrics.Register() t.Run("DroppedWatchEventIncrementsMetric", func(t *testing.T) { broadcaster := podsapi.NewBroadcaster() - server := podsapi.NewPodsServerForTest(broadcaster) + mockManager := new(kubepodtest.MockManager) + server := podsapi.NewPodsServerForTest(broadcaster, mockManager) pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "pod1-uid", Name: "pod1", Namespace: "ns1"}} server.OnPodAdded(pod1) @@ -135,9 +169,9 @@ func TestErrorsAndMetrics(t *testing.T) { defer broadcaster.Unregister(clientChannel) // Send two events. The first one should fill the buffer. - broadcaster.Broadcast(podsapi.PodWatchEvent{}) + broadcaster.Broadcast(podsapi.PodWatchEvent{UID: "1"}) // The second one should be dropped and increment the metric. - broadcaster.Broadcast(podsapi.PodWatchEvent{}) + broadcaster.Broadcast(podsapi.PodWatchEvent{UID: "2"}) err := testutil.CollectAndCompare(metrics.PodWatchEventsDroppedTotal, strings.NewReader(` # HELP kubelet_pod_watch_events_dropped_total [ALPHA] Cumulative number of pod watch events dropped. diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 75053c61622..8d8fd3c42f2 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -698,7 +698,7 @@ func NewMainKubelet(ctx context.Context, if utilfeature.DefaultFeatureGate.Enabled(features.PodsAPI) { broadcaster := pods.NewBroadcaster() - klet.podsServer = pods.NewPodsServer(broadcaster) + klet.podsServer = pods.NewPodsServer(broadcaster, klet.podManager) podSubscribers = append(podSubscribers, klet.podsServer) statusSubscribers = append(statusSubscribers, klet.podsServer) } diff --git a/test/e2e_node/pods_api_test.go b/test/e2e_node/pods_api_test.go index c3033cce754..383e8ae3228 100644 --- a/test/e2e_node/pods_api_test.go +++ b/test/e2e_node/pods_api_test.go @@ -20,6 +20,9 @@ package e2enode import ( "context" + "encoding/json" + "os" + "path/filepath" "time" "github.com/onsi/ginkgo/v2" @@ -39,6 +42,7 @@ import ( "k8s.io/kubernetes/test/e2e/framework" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" admissionapi "k8s.io/pod-security-admission/api" + "sigs.k8s.io/yaml" ) // podsAPISuite is a Ginkgo test suite for the Kubelet Pods API. @@ -195,5 +199,93 @@ var _ = SIGDescribe("Kubelet Pods API", framework.WithSerial(), func() { )), "did not receive DELETED event for the test pod") }) + ginkgo.It("should be able to list and watch static pods", func(ctx context.Context) { + ginkgo.By("watching for pod events") + watchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() + watchClient, err := client.WatchPods(watchCtx, &podsv1alpha1.WatchPodsRequest{}) + framework.ExpectNoError(err, "failed to watch pods") + + eventsCh := make(chan *podsv1alpha1.WatchPodsEvent, 100) + go func() { + defer ginkgo.GinkgoRecover() + for { + ev, err := watchClient.Recv() + if err != nil { + return + } + select { + case eventsCh <- ev: + case <-watchCtx.Done(): + return + } + } + }() + + ginkgo.By("creating a static pod manifest") + staticPodName := "static-pod-" + string(uuid.NewUUID()) + staticPodPath := filepath.Join(kubeletCfg.StaticPodPath, f.Namespace.Name+"-"+staticPodName+".yaml") + + pod := &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: staticPodName, + Namespace: f.Namespace.Name, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "test-container", + Image: "busybox:1.36", + Command: []string{"sleep", "3600"}, + }, + }, + }, + } + + podBytes, err := json.Marshal(pod) + framework.ExpectNoError(err, "failed to marshal pod") + podYaml, err := yaml.JSONToYAML(podBytes) + framework.ExpectNoError(err, "failed to convert to yaml") + + err = os.WriteFile(staticPodPath, podYaml, 0644) + framework.ExpectNoError(err, "failed to write static pod file") + + defer func() { + os.Remove(staticPodPath) + }() + + ginkgo.By("waiting for the static pod ADDED event") + gomega.Eventually(eventsCh, "2m").Should(gomega.Receive(gomega.SatisfyAll( + gomega.WithTransform(func(e *podsv1alpha1.WatchPodsEvent) podsv1alpha1.EventType { return e.Type }, gomega.Equal(podsv1alpha1.EventType_ADDED)), + gomega.WithTransform(func(e *podsv1alpha1.WatchPodsEvent) bool { + var p v1.Pod + if err := p.Unmarshal(e.Pod); err != nil { + return false + } + return p.Namespace == f.Namespace.Name && p.Name == staticPodName+"-"+framework.TestContext.NodeName + }, gomega.BeTrue()), + )), "did not receive ADDED event") + + ginkgo.By("deleting the static pod manifest") + err = os.Remove(staticPodPath) + framework.ExpectNoError(err, "failed to delete static pod file") + + ginkgo.By("waiting for the static pod DELETED event") + gomega.Eventually(eventsCh, "2m").Should(gomega.Receive(gomega.SatisfyAll( + gomega.WithTransform(func(e *podsv1alpha1.WatchPodsEvent) podsv1alpha1.EventType { return e.Type }, gomega.Equal(podsv1alpha1.EventType_DELETED)), + gomega.WithTransform(func(e *podsv1alpha1.WatchPodsEvent) bool { + var p v1.Pod + if err := p.Unmarshal(e.Pod); err != nil { + return false + } + return p.Namespace == f.Namespace.Name && p.Name == staticPodName+"-"+framework.TestContext.NodeName + }, gomega.BeTrue()), + )), "did not receive DELETED event") + }) + }) })