Refactor PodsServer to use PodManager as source of truth

- Fixed version in kube_features.go after rebase (1.35->1.36)
- Removed internal pod cache in PodsServer to reduce memory footprint and avoid duplication.
- Injected pod.Manager into PodsServer to serve as the single source of truth for pod data.
- Refactored WatchPods to broadcast UIDs and fetch fresh pod data from podManager, ensuring consistency.
- Updated convertWatchEventType to safely handle unknown event types.
- Refactored unit tests to use MockManager and added a test case for static pods.
- Updated e2e suite with static pod test
This commit is contained in:
Brian Sonnenberg 2025-12-16 20:29:55 +00:00
parent 044f65ca5c
commit fd330c303d
5 changed files with 208 additions and 89 deletions

View file

@ -1734,7 +1734,7 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
},
PodsAPI: {
{Version: version.MustParse("1.35"), Default: false, PreRelease: featuregate.Alpha},
{Version: version.MustParse("1.36"), Default: false, PreRelease: featuregate.Alpha},
},
PortForwardWebsockets: {

View file

@ -31,10 +31,12 @@ import (
"k8s.io/klog/v2"
podsv1alpha1 "k8s.io/kubelet/pkg/apis/pods/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/pod"
)
type PodWatchEvent struct {
Type watch.EventType
UID types.UID
Pod *v1.Pod
}
@ -82,92 +84,52 @@ func (b *broadcaster) Broadcast(event PodWatchEvent) {
// PodsServer is the gRPC server that provides pod information.
type PodsServer struct {
podsv1alpha1.UnimplementedPodsServer
lock sync.RWMutex
pods map[types.UID]*v1.Pod
podManager pod.Manager
broadcaster *broadcaster
}
// NewPodsServer creates a new PodServer for production use.
func NewPodsServer(broadcaster *broadcaster) *PodsServer {
func NewPodsServer(broadcaster *broadcaster, podManager pod.Manager) *PodsServer {
return &PodsServer{
pods: make(map[types.UID]*v1.Pod),
podManager: podManager,
broadcaster: broadcaster,
}
}
// NewPodsServerForTest creates a new PodServer with an injectable ticker for testing.
func NewPodsServerForTest(broadcaster *broadcaster) *PodsServer {
func NewPodsServerForTest(broadcaster *broadcaster, podManager pod.Manager) *PodsServer {
return &PodsServer{
pods: make(map[types.UID]*v1.Pod),
podManager: podManager,
broadcaster: broadcaster,
}
}
// OnPodAdded is called when a pod is added.
func (s *PodsServer) OnPodAdded(pod *v1.Pod) {
s.lock.Lock()
defer s.lock.Unlock()
s.pods[pod.UID] = pod.DeepCopy()
s.broadcaster.Broadcast(PodWatchEvent{Type: watch.Added, Pod: pod})
s.broadcaster.Broadcast(PodWatchEvent{Type: watch.Added, UID: pod.UID})
logger := klog.FromContext(context.Background())
logger.Info("Pod added to storage", "podUID", pod.UID)
}
// OnPodUpdated is called when a pod is updated.
func (s *PodsServer) OnPodUpdated(pod *v1.Pod) {
s.lock.Lock()
defer s.lock.Unlock()
s.pods[pod.UID] = pod.DeepCopy()
s.broadcaster.Broadcast(PodWatchEvent{Type: watch.Modified, Pod: pod})
s.broadcaster.Broadcast(PodWatchEvent{Type: watch.Modified, UID: pod.UID})
logger := klog.FromContext(context.Background())
logger.Info("Pod updated in storage", "podUID", pod.UID)
}
// OnPodRemoved is called when a pod is removed.
func (s *PodsServer) OnPodRemoved(pod *v1.Pod) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.pods, pod.UID)
s.broadcaster.Broadcast(PodWatchEvent{Type: watch.Deleted, Pod: pod.DeepCopy()})
s.broadcaster.Broadcast(PodWatchEvent{Type: watch.Deleted, UID: pod.UID, Pod: pod})
logger := klog.FromContext(context.Background())
logger.Info("Pod removed from storage", "podUID", pod.UID)
}
// OnPodStatusUpdated is called when a pod's status is updated.
func (s *PodsServer) OnPodStatusUpdated(pod *v1.Pod, status v1.PodStatus) {
s.lock.Lock()
defer s.lock.Unlock()
if storedPod, ok := s.pods[pod.UID]; ok {
storedPod.Status = status
s.broadcaster.Broadcast(PodWatchEvent{Type: watch.Modified, Pod: storedPod})
logger := klog.FromContext(context.Background())
logger.Info("Pod status updated in storage", "podUID", pod.UID)
}
}
// Get returns a pod by UID.
func (s *PodsServer) Get(uid types.UID) (*v1.Pod, bool) {
s.lock.RLock()
defer s.lock.RUnlock()
pod, ok := s.pods[uid]
if !ok {
return nil, false
}
return pod.DeepCopy(), true
}
// List returns all pods.
func (s *PodsServer) List() []*v1.Pod {
s.lock.RLock()
defer s.lock.RUnlock()
pods := make([]*v1.Pod, 0, len(s.pods))
for _, pod := range s.pods {
pods = append(pods, pod.DeepCopy())
}
sort.Slice(pods, func(i, j int) bool {
return pods[i].UID < pods[j].UID
})
return pods
s.broadcaster.Broadcast(PodWatchEvent{Type: watch.Modified, UID: pod.UID})
logger := klog.FromContext(context.Background())
logger.Info("Pod status updated in storage", "podUID", pod.UID)
}
// ListPods returns a list of pods.
@ -176,7 +138,10 @@ func (s *PodsServer) ListPods(ctx context.Context, req *podsv1alpha1.ListPodsReq
logger.Info("ListPods called")
// TODO: Implement filtering based on req.Filter, pagination with req.PageToken and req.PageSize
podsToReturn := s.List()
podsToReturn := s.podManager.GetPods()
sort.Slice(podsToReturn, func(i, j int) bool {
return podsToReturn[i].UID < podsToReturn[j].UID
})
protoPods := make([][]byte, len(podsToReturn))
for i, p := range podsToReturn {
@ -196,7 +161,7 @@ func (s *PodsServer) GetPod(ctx context.Context, req *podsv1alpha1.GetPodRequest
logger.Info("GetPod called", "podUID", req.PodUID)
podUID := types.UID(req.PodUID)
pod, ok := s.Get(podUID)
pod, ok := s.podManager.GetPodByUID(podUID)
if !ok {
return nil, status.Errorf(codes.NotFound, "pod with UID %s not found", req.PodUID)
}
@ -226,7 +191,11 @@ func (s *PodsServer) WatchPods(req *podsv1alpha1.WatchPodsRequest, stream podsv1
}()
// Send initial ADDED events
initialPods := s.List()
initialPods := s.podManager.GetPods()
sort.Slice(initialPods, func(i, j int) bool {
return initialPods[i].UID < initialPods[j].UID
})
for _, p := range initialPods {
podBytes, err := p.Marshal()
if err != nil {
@ -249,14 +218,38 @@ func (s *PodsServer) WatchPods(req *podsv1alpha1.WatchPodsRequest, stream podsv1
logger.Info("Watch context cancelled", "client", clientAddr)
return stream.Context().Err()
case event := <-clientChannel:
podBytes, err := event.Pod.Marshal()
var podToMarshal *v1.Pod
if event.Type == watch.Deleted {
podToMarshal = event.Pod
} else {
p, ok := s.podManager.GetPodByUID(event.UID)
if ok {
podToMarshal = p
} else {
logger.Info("Pod not found in manager during watch event processing", "uid", event.UID, "type", event.Type)
continue
}
}
if podToMarshal == nil {
continue
}
podBytes, err := podToMarshal.Marshal()
if err != nil {
logger.Error(err, "Error marshalling watch event pod")
metrics.PodWatchEventsDroppedTotal.Inc()
continue
}
eventType, err := convertWatchEventType(event.Type)
if err != nil {
logger.Error(err, "Unknown watch event type")
continue
}
if err := stream.Send(&podsv1alpha1.WatchPodsEvent{
Type: convertWatchEventType(event.Type),
Type: eventType,
Pod: podBytes,
}); err != nil {
logger.Error(err, "Error sending watch event to client", "client", clientAddr)
@ -266,15 +259,15 @@ func (s *PodsServer) WatchPods(req *podsv1alpha1.WatchPodsRequest, stream podsv1
}
}
func convertWatchEventType(watchType watch.EventType) podsv1alpha1.EventType {
func convertWatchEventType(watchType watch.EventType) (podsv1alpha1.EventType, error) {
switch watchType {
case watch.Added:
return podsv1alpha1.EventType_ADDED
return podsv1alpha1.EventType_ADDED, nil
case watch.Modified:
return podsv1alpha1.EventType_MODIFIED
return podsv1alpha1.EventType_MODIFIED, nil
case watch.Deleted:
return podsv1alpha1.EventType_DELETED
return podsv1alpha1.EventType_DELETED, nil
default:
return podsv1alpha1.EventType_ADDED
return podsv1alpha1.EventType_ADDED, status.Errorf(codes.Internal, "unknown watch event type: %v", watchType)
}
}
}

View file

@ -30,50 +30,53 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality"
metafuzzer "k8s.io/apimachinery/pkg/apis/meta/fuzzer"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/component-base/metrics/testutil"
podsv1alpha1 "k8s.io/kubelet/pkg/apis/pods/v1alpha1"
"k8s.io/kubernetes/pkg/api/legacyscheme"
corefuzzer "k8s.io/kubernetes/pkg/apis/core/fuzzer"
podsapi "k8s.io/kubernetes/pkg/kubelet/apis/pods"
podsapi "k8s.io/kubernetes/pkg/kubele t/apis/pods"
"k8s.io/kubernetes/pkg/kubelet/metrics"
kubepodtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
)
func TestStartEventLoop(t *testing.T) {
broadcaster := podsapi.NewBroadcaster()
server := podsapi.NewPodsServerForTest(broadcaster)
mockManager := new(kubepodtest.MockManager)
server := podsapi.NewPodsServerForTest(broadcaster, mockManager)
pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "pod1-uid", Name: "pod1", Namespace: "ns1"}}
pod1Running := pod1.DeepCopy()
pod1Running.Status = v1.PodStatus{Phase: v1.PodRunning}
pod1Succeeded := pod1.DeepCopy()
pod1Succeeded.Status = v1.PodStatus{Phase: v1.PodSucceeded}
clientChannel := make(chan podsapi.PodWatchEvent, 100)
broadcaster.Register(clientChannel)
defer broadcaster.Unregister(clientChannel)
server.OnPodAdded(pod1Running)
server.OnPodStatusUpdated(pod1, pod1Succeeded.Status)
server.OnPodAdded(pod1)
server.OnPodStatusUpdated(pod1, v1.PodStatus{Phase: v1.PodSucceeded})
server.OnPodRemoved(pod1)
event := <-clientChannel
assert.Equal(t, "ADDED", string(event.Type))
assert.Equal(t, pod1.UID, event.Pod.UID)
assert.Equal(t, pod1.UID, event.UID)
event = <-clientChannel
assert.Equal(t, "MODIFIED", string(event.Type))
assert.Equal(t, v1.PodSucceeded, event.Pod.Status.Phase)
assert.Equal(t, pod1.UID, event.UID)
event = <-clientChannel
assert.Equal(t, "DELETED", string(event.Type))
assert.Equal(t, pod1.UID, event.Pod.UID)
assert.Equal(t, pod1.UID, event.UID)
assert.Equal(t, pod1, event.Pod)
}
func TestListPods(t *testing.T) {
broadcaster := podsapi.NewBroadcaster()
server := podsapi.NewPodsServerForTest(broadcaster)
mockManager := new(kubepodtest.MockManager)
server := podsapi.NewPodsServerForTest(broadcaster, mockManager)
pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "pod1-uid", Name: "pod1", Namespace: "ns1"}}
pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "pod2-uid", Name: "pod2", Namespace: "ns2"}}
status1 := v1.PodStatus{Phase: v1.PodRunning}
status2 := v1.PodStatus{Phase: v1.PodSucceeded}
server.OnPodAdded(pod1)
server.OnPodAdded(pod2)
server.OnPodStatusUpdated(pod1, status1)
server.OnPodStatusUpdated(pod2, status2)
mockManager.On("GetPods").Return([]*v1.Pod{pod1, pod2})
resp, err := server.ListPods(context.Background(), &podsv1alpha1.ListPodsRequest{})
require.NoError(t, err)
pod1Out := &v1.Pod{}
@ -88,7 +91,8 @@ func TestListPods(t *testing.T) {
func TestGetPod(t *testing.T) {
broadcaster := podsapi.NewBroadcaster()
server := podsapi.NewPodsServerForTest(broadcaster)
mockManager := new(kubepodtest.MockManager)
server := podsapi.NewPodsServerForTest(broadcaster, mockManager)
pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "pod1-uid", Name: "pod1", Namespace: "ns1"}}
pod1.Spec.EphemeralContainers = []v1.EphemeralContainer{
{
@ -101,9 +105,10 @@ func TestGetPod(t *testing.T) {
},
},
}
status1 := v1.PodStatus{Phase: v1.PodRunning}
server.OnPodAdded(pod1)
server.OnPodStatusUpdated(pod1, status1)
pod1.Status = v1.PodStatus{Phase: v1.PodRunning}
mockManager.On("GetPodByUID", types.UID("pod1-uid")).Return(pod1, true)
resp, err := server.GetPod(context.Background(), &podsv1alpha1.GetPodRequest{PodUID: "pod1-uid"})
require.NoError(t, err)
podOut := &v1.Pod{}
@ -119,12 +124,41 @@ func TestGetPod(t *testing.T) {
assert.True(t, podOut.Spec.EphemeralContainers[0].TTY)
}
func TestStaticPod(t *testing.T) {
broadcaster := podsapi.NewBroadcaster()
mockManager := new(kubepodtest.MockManager)
server := podsapi.NewPodsServerForTest(broadcaster, mockManager)
staticPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "static-pod-uid",
Name: "static-pod",
Namespace: "default",
Annotations: map[string]string{
"kubernetes.io/config.source": "file",
},
},
}
mockManager.On("GetPodByUID", types.UID("static-pod-uid")).Return(staticPod, true)
resp, err := server.GetPod(context.Background(), &podsv1alpha1.GetPodRequest{PodUID: "static-pod-uid"})
require.NoError(t, err)
podOut := &v1.Pod{}
err = podOut.Unmarshal(resp.Pod)
require.NoError(t, err)
assert.Equal(t, "static-pod", podOut.Name)
assert.Equal(t, "file", podOut.Annotations["kubernetes.io/config.source"])
}
func TestErrorsAndMetrics(t *testing.T) {
metrics.Register()
t.Run("DroppedWatchEventIncrementsMetric", func(t *testing.T) {
broadcaster := podsapi.NewBroadcaster()
server := podsapi.NewPodsServerForTest(broadcaster)
mockManager := new(kubepodtest.MockManager)
server := podsapi.NewPodsServerForTest(broadcaster, mockManager)
pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "pod1-uid", Name: "pod1", Namespace: "ns1"}}
server.OnPodAdded(pod1)
@ -135,9 +169,9 @@ func TestErrorsAndMetrics(t *testing.T) {
defer broadcaster.Unregister(clientChannel)
// Send two events. The first one should fill the buffer.
broadcaster.Broadcast(podsapi.PodWatchEvent{})
broadcaster.Broadcast(podsapi.PodWatchEvent{UID: "1"})
// The second one should be dropped and increment the metric.
broadcaster.Broadcast(podsapi.PodWatchEvent{})
broadcaster.Broadcast(podsapi.PodWatchEvent{UID: "2"})
err := testutil.CollectAndCompare(metrics.PodWatchEventsDroppedTotal, strings.NewReader(`
# HELP kubelet_pod_watch_events_dropped_total [ALPHA] Cumulative number of pod watch events dropped.

View file

@ -698,7 +698,7 @@ func NewMainKubelet(ctx context.Context,
if utilfeature.DefaultFeatureGate.Enabled(features.PodsAPI) {
broadcaster := pods.NewBroadcaster()
klet.podsServer = pods.NewPodsServer(broadcaster)
klet.podsServer = pods.NewPodsServer(broadcaster, klet.podManager)
podSubscribers = append(podSubscribers, klet.podsServer)
statusSubscribers = append(statusSubscribers, klet.podsServer)
}

View file

@ -20,6 +20,9 @@ package e2enode
import (
"context"
"encoding/json"
"os"
"path/filepath"
"time"
"github.com/onsi/ginkgo/v2"
@ -39,6 +42,7 @@ import (
"k8s.io/kubernetes/test/e2e/framework"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
admissionapi "k8s.io/pod-security-admission/api"
"sigs.k8s.io/yaml"
)
// podsAPISuite is a Ginkgo test suite for the Kubelet Pods API.
@ -195,5 +199,93 @@ var _ = SIGDescribe("Kubelet Pods API", framework.WithSerial(), func() {
)), "did not receive DELETED event for the test pod")
})
ginkgo.It("should be able to list and watch static pods", func(ctx context.Context) {
ginkgo.By("watching for pod events")
watchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
watchClient, err := client.WatchPods(watchCtx, &podsv1alpha1.WatchPodsRequest{})
framework.ExpectNoError(err, "failed to watch pods")
eventsCh := make(chan *podsv1alpha1.WatchPodsEvent, 100)
go func() {
defer ginkgo.GinkgoRecover()
for {
ev, err := watchClient.Recv()
if err != nil {
return
}
select {
case eventsCh <- ev:
case <-watchCtx.Done():
return
}
}
}()
ginkgo.By("creating a static pod manifest")
staticPodName := "static-pod-" + string(uuid.NewUUID())
staticPodPath := filepath.Join(kubeletCfg.StaticPodPath, f.Namespace.Name+"-"+staticPodName+".yaml")
pod := &v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: staticPodName,
Namespace: f.Namespace.Name,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "test-container",
Image: "busybox:1.36",
Command: []string{"sleep", "3600"},
},
},
},
}
podBytes, err := json.Marshal(pod)
framework.ExpectNoError(err, "failed to marshal pod")
podYaml, err := yaml.JSONToYAML(podBytes)
framework.ExpectNoError(err, "failed to convert to yaml")
err = os.WriteFile(staticPodPath, podYaml, 0644)
framework.ExpectNoError(err, "failed to write static pod file")
defer func() {
os.Remove(staticPodPath)
}()
ginkgo.By("waiting for the static pod ADDED event")
gomega.Eventually(eventsCh, "2m").Should(gomega.Receive(gomega.SatisfyAll(
gomega.WithTransform(func(e *podsv1alpha1.WatchPodsEvent) podsv1alpha1.EventType { return e.Type }, gomega.Equal(podsv1alpha1.EventType_ADDED)),
gomega.WithTransform(func(e *podsv1alpha1.WatchPodsEvent) bool {
var p v1.Pod
if err := p.Unmarshal(e.Pod); err != nil {
return false
}
return p.Namespace == f.Namespace.Name && p.Name == staticPodName+"-"+framework.TestContext.NodeName
}, gomega.BeTrue()),
)), "did not receive ADDED event")
ginkgo.By("deleting the static pod manifest")
err = os.Remove(staticPodPath)
framework.ExpectNoError(err, "failed to delete static pod file")
ginkgo.By("waiting for the static pod DELETED event")
gomega.Eventually(eventsCh, "2m").Should(gomega.Receive(gomega.SatisfyAll(
gomega.WithTransform(func(e *podsv1alpha1.WatchPodsEvent) podsv1alpha1.EventType { return e.Type }, gomega.Equal(podsv1alpha1.EventType_DELETED)),
gomega.WithTransform(func(e *podsv1alpha1.WatchPodsEvent) bool {
var p v1.Pod
if err := p.Unmarshal(e.Pod); err != nil {
return false
}
return p.Namespace == f.Namespace.Name && p.Name == staticPodName+"-"+framework.TestContext.NodeName
}, gomega.BeTrue()),
)), "did not receive DELETED event")
})
})
})