[KEP-4188] New Kubelet gRPC API with endpoint returning local Pod information

This commit is contained in:
Brian Sonnenberg 2025-10-06 21:35:35 +00:00
parent 7a3a6cf4be
commit 044f65ca5c
27 changed files with 1648 additions and 27 deletions

View file

@ -1286,7 +1286,9 @@ func startKubelet(ctx context.Context, k kubelet.Bootstrap, podCfg *config.PodCo
if kubeCfg.ReadOnlyPort > 0 {
go k.ListenAndServeReadOnly(ctx, netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), kubeDeps.TracerProvider)
}
go k.ListenAndServePodResources(ctx)
go k.ListenAndServePods(ctx)
}
func createAndInitKubelet(

View file

@ -928,6 +928,7 @@ function codegen::protobindings() {
"staging/src/k8s.io/kubelet/pkg/apis/dra"
"staging/src/k8s.io/kubelet/pkg/apis/deviceplugin"
"staging/src/k8s.io/kubelet/pkg/apis/podresources"
"staging/src/k8s.io/kubelet/pkg/apis/pods"
"staging/src/k8s.io/kms/apis"
"staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2"
"staging/src/k8s.io/kubelet/pkg/apis/pluginregistration"

View file

@ -770,6 +770,12 @@ const (
// similar to how `metadata.annotations` behaves.
PodTopologyLabelsAdmission featuregate.Feature = "PodTopologyLabelsAdmission"
// owner: @briansonnenberg
// kep: https://kep.k8s.io/4188
//
// Enables the PodsAPI feature to expose pod information via a gRPC API on the kubelet.
PodsAPI featuregate.Feature = "PodsAPI"
// owner: @seans3
// kep: http://kep.k8s.io/4006
//
@ -1727,6 +1733,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.35"), Default: true, PreRelease: featuregate.Beta},
},
PodsAPI: {
{Version: version.MustParse("1.35"), Default: false, PreRelease: featuregate.Alpha},
},
PortForwardWebsockets: {
{Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Alpha},
{Version: version.MustParse("1.31"), Default: true, PreRelease: featuregate.Beta},
@ -2499,6 +2509,8 @@ var defaultKubernetesFeatureGateDependencies = map[featuregate.Feature][]feature
PodTopologyLabelsAdmission: {},
PodsAPI: {},
PortForwardWebsockets: {},
PreferSameTrafficDistribution: {},

View file

@ -2412,7 +2412,7 @@ func TestRecordPodDeferredAcceptedResizes(t *testing.T) {
func makeAllocationManager(t *testing.T, runtime *containertest.FakeRuntime, allocatedPods []*v1.Pod, nodeConfig *cm.NodeConfig) Manager {
t.Helper()
logger, _ := ktesting.NewTestContext(t)
statusManager := status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(), &statustest.FakePodDeletionSafetyProvider{}, kubeletutil.NewPodStartupLatencyTracker())
statusManager := status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(), &statustest.FakePodDeletionSafetyProvider{}, kubeletutil.NewPodStartupLatencyTracker(), nil)
var containerManager *cm.FakeContainerManager
if nodeConfig == nil {
containerManager = cm.NewFakeContainerManager()

View file

@ -0,0 +1,26 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pods
const (
Resource = "/pods"
Socket = "pods-api"
// Duplicated from PodResources API
DefaultQPS = 100
DefaultBurstTokens = 10
)

View file

@ -0,0 +1,280 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pods
import (
"context"
"sort"
"sync"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog/v2"
podsv1alpha1 "k8s.io/kubelet/pkg/apis/pods/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/metrics"
)
type PodWatchEvent struct {
Type watch.EventType
Pod *v1.Pod
}
type broadcaster struct {
lock sync.Mutex
clients map[chan PodWatchEvent]struct{}
}
func NewBroadcaster() *broadcaster {
return &broadcaster{
clients: make(map[chan PodWatchEvent]struct{}),
}
}
func (b *broadcaster) Register(client chan PodWatchEvent) {
b.lock.Lock()
defer b.lock.Unlock()
b.clients[client] = struct{}{}
logger := klog.FromContext(context.Background())
logger.Info("Registered new watch client", "totalClients", len(b.clients))
}
func (b *broadcaster) Unregister(client chan PodWatchEvent) {
b.lock.Lock()
defer b.lock.Unlock()
delete(b.clients, client)
logger := klog.FromContext(context.Background())
logger.Info("Unregistered watch client", "totalClients", len(b.clients))
}
func (b *broadcaster) Broadcast(event PodWatchEvent) {
b.lock.Lock()
defer b.lock.Unlock()
logger := klog.FromContext(context.Background())
for client := range b.clients {
select {
case client <- event:
default:
logger.Info("Watch client channel is full, dropping event.")
metrics.PodWatchEventsDroppedTotal.Inc()
}
}
}
// PodsServer is the gRPC server that provides pod information.
type PodsServer struct {
podsv1alpha1.UnimplementedPodsServer
lock sync.RWMutex
pods map[types.UID]*v1.Pod
broadcaster *broadcaster
}
// NewPodsServer creates a new PodServer for production use.
func NewPodsServer(broadcaster *broadcaster) *PodsServer {
return &PodsServer{
pods: make(map[types.UID]*v1.Pod),
broadcaster: broadcaster,
}
}
// NewPodsServerForTest creates a new PodServer with an injectable ticker for testing.
func NewPodsServerForTest(broadcaster *broadcaster) *PodsServer {
return &PodsServer{
pods: make(map[types.UID]*v1.Pod),
broadcaster: broadcaster,
}
}
// OnPodAdded is called when a pod is added.
func (s *PodsServer) OnPodAdded(pod *v1.Pod) {
s.lock.Lock()
defer s.lock.Unlock()
s.pods[pod.UID] = pod.DeepCopy()
s.broadcaster.Broadcast(PodWatchEvent{Type: watch.Added, Pod: pod})
logger := klog.FromContext(context.Background())
logger.Info("Pod added to storage", "podUID", pod.UID)
}
// OnPodUpdated is called when a pod is updated.
func (s *PodsServer) OnPodUpdated(pod *v1.Pod) {
s.lock.Lock()
defer s.lock.Unlock()
s.pods[pod.UID] = pod.DeepCopy()
s.broadcaster.Broadcast(PodWatchEvent{Type: watch.Modified, Pod: pod})
logger := klog.FromContext(context.Background())
logger.Info("Pod updated in storage", "podUID", pod.UID)
}
// OnPodRemoved is called when a pod is removed.
func (s *PodsServer) OnPodRemoved(pod *v1.Pod) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.pods, pod.UID)
s.broadcaster.Broadcast(PodWatchEvent{Type: watch.Deleted, Pod: pod.DeepCopy()})
logger := klog.FromContext(context.Background())
logger.Info("Pod removed from storage", "podUID", pod.UID)
}
// OnPodStatusUpdated is called when a pod's status is updated.
func (s *PodsServer) OnPodStatusUpdated(pod *v1.Pod, status v1.PodStatus) {
s.lock.Lock()
defer s.lock.Unlock()
if storedPod, ok := s.pods[pod.UID]; ok {
storedPod.Status = status
s.broadcaster.Broadcast(PodWatchEvent{Type: watch.Modified, Pod: storedPod})
logger := klog.FromContext(context.Background())
logger.Info("Pod status updated in storage", "podUID", pod.UID)
}
}
// Get returns a pod by UID.
func (s *PodsServer) Get(uid types.UID) (*v1.Pod, bool) {
s.lock.RLock()
defer s.lock.RUnlock()
pod, ok := s.pods[uid]
if !ok {
return nil, false
}
return pod.DeepCopy(), true
}
// List returns all pods.
func (s *PodsServer) List() []*v1.Pod {
s.lock.RLock()
defer s.lock.RUnlock()
pods := make([]*v1.Pod, 0, len(s.pods))
for _, pod := range s.pods {
pods = append(pods, pod.DeepCopy())
}
sort.Slice(pods, func(i, j int) bool {
return pods[i].UID < pods[j].UID
})
return pods
}
// ListPods returns a list of pods.
func (s *PodsServer) ListPods(ctx context.Context, req *podsv1alpha1.ListPodsRequest) (*podsv1alpha1.ListPodsResponse, error) {
logger := klog.FromContext(ctx)
logger.Info("ListPods called")
// TODO: Implement filtering based on req.Filter, pagination with req.PageToken and req.PageSize
podsToReturn := s.List()
protoPods := make([][]byte, len(podsToReturn))
for i, p := range podsToReturn {
podBytes, err := p.Marshal()
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to marshal pod: %v", err)
}
protoPods[i] = podBytes
}
return &podsv1alpha1.ListPodsResponse{Pods: protoPods}, nil
}
// GetPod returns a single pod by UID.
func (s *PodsServer) GetPod(ctx context.Context, req *podsv1alpha1.GetPodRequest) (*podsv1alpha1.GetPodResponse, error) {
logger := klog.FromContext(ctx)
logger.Info("GetPod called", "podUID", req.PodUID)
podUID := types.UID(req.PodUID)
pod, ok := s.Get(podUID)
if !ok {
return nil, status.Errorf(codes.NotFound, "pod with UID %s not found", req.PodUID)
}
podBytes, err := pod.Marshal()
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to marshal pod: %v", err)
}
return &podsv1alpha1.GetPodResponse{Pod: podBytes}, nil
}
// WatchPods streams pod events.
func (s *PodsServer) WatchPods(req *podsv1alpha1.WatchPodsRequest, stream podsv1alpha1.Pods_WatchPodsServer) error {
clientAddr := "unknown"
if p, ok := peer.FromContext(stream.Context()); ok {
clientAddr = p.Addr.String()
}
logger := klog.FromContext(stream.Context())
logger.Info("WatchPods called", "client", clientAddr)
clientChannel := make(chan PodWatchEvent, 100)
s.broadcaster.Register(clientChannel)
defer func() {
s.broadcaster.Unregister(clientChannel)
logger.Info("Watch client disconnected", "client", clientAddr)
}()
// Send initial ADDED events
initialPods := s.List()
for _, p := range initialPods {
podBytes, err := p.Marshal()
if err != nil {
logger.Error(err, "Error marshalling initial watch event pod")
metrics.PodWatchEventsDroppedTotal.Inc()
continue
}
if err := stream.Send(&podsv1alpha1.WatchPodsEvent{
Type: podsv1alpha1.EventType_ADDED,
Pod: podBytes,
}); err != nil {
logger.Error(err, "Error sending initial watch event")
return err
}
}
for {
select {
case <-stream.Context().Done():
logger.Info("Watch context cancelled", "client", clientAddr)
return stream.Context().Err()
case event := <-clientChannel:
podBytes, err := event.Pod.Marshal()
if err != nil {
logger.Error(err, "Error marshalling watch event pod")
metrics.PodWatchEventsDroppedTotal.Inc()
continue
}
if err := stream.Send(&podsv1alpha1.WatchPodsEvent{
Type: convertWatchEventType(event.Type),
Pod: podBytes,
}); err != nil {
logger.Error(err, "Error sending watch event to client", "client", clientAddr)
return err
}
}
}
}
func convertWatchEventType(watchType watch.EventType) podsv1alpha1.EventType {
switch watchType {
case watch.Added:
return podsv1alpha1.EventType_ADDED
case watch.Modified:
return podsv1alpha1.EventType_MODIFIED
case watch.Deleted:
return podsv1alpha1.EventType_DELETED
default:
return podsv1alpha1.EventType_ADDED
}
}

View file

@ -0,0 +1,183 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pods_test
import (
"context"
"math/rand"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/apitesting/fuzzer"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metafuzzer "k8s.io/apimachinery/pkg/apis/meta/fuzzer"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/component-base/metrics/testutil"
podsv1alpha1 "k8s.io/kubelet/pkg/apis/pods/v1alpha1"
"k8s.io/kubernetes/pkg/api/legacyscheme"
corefuzzer "k8s.io/kubernetes/pkg/apis/core/fuzzer"
podsapi "k8s.io/kubernetes/pkg/kubelet/apis/pods"
"k8s.io/kubernetes/pkg/kubelet/metrics"
)
func TestStartEventLoop(t *testing.T) {
broadcaster := podsapi.NewBroadcaster()
server := podsapi.NewPodsServerForTest(broadcaster)
pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "pod1-uid", Name: "pod1", Namespace: "ns1"}}
pod1Running := pod1.DeepCopy()
pod1Running.Status = v1.PodStatus{Phase: v1.PodRunning}
pod1Succeeded := pod1.DeepCopy()
pod1Succeeded.Status = v1.PodStatus{Phase: v1.PodSucceeded}
clientChannel := make(chan podsapi.PodWatchEvent, 100)
broadcaster.Register(clientChannel)
defer broadcaster.Unregister(clientChannel)
server.OnPodAdded(pod1Running)
server.OnPodStatusUpdated(pod1, pod1Succeeded.Status)
server.OnPodRemoved(pod1)
event := <-clientChannel
assert.Equal(t, "ADDED", string(event.Type))
assert.Equal(t, pod1.UID, event.Pod.UID)
event = <-clientChannel
assert.Equal(t, "MODIFIED", string(event.Type))
assert.Equal(t, v1.PodSucceeded, event.Pod.Status.Phase)
event = <-clientChannel
assert.Equal(t, "DELETED", string(event.Type))
assert.Equal(t, pod1.UID, event.Pod.UID)
}
func TestListPods(t *testing.T) {
broadcaster := podsapi.NewBroadcaster()
server := podsapi.NewPodsServerForTest(broadcaster)
pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "pod1-uid", Name: "pod1", Namespace: "ns1"}}
pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "pod2-uid", Name: "pod2", Namespace: "ns2"}}
status1 := v1.PodStatus{Phase: v1.PodRunning}
status2 := v1.PodStatus{Phase: v1.PodSucceeded}
server.OnPodAdded(pod1)
server.OnPodAdded(pod2)
server.OnPodStatusUpdated(pod1, status1)
server.OnPodStatusUpdated(pod2, status2)
resp, err := server.ListPods(context.Background(), &podsv1alpha1.ListPodsRequest{})
require.NoError(t, err)
pod1Out := &v1.Pod{}
err = pod1Out.Unmarshal(resp.Pods[0])
require.NoError(t, err)
pod2Out := &v1.Pod{}
err = pod2Out.Unmarshal(resp.Pods[1])
require.NoError(t, err)
require.Equal(t, "pod1", pod1Out.Name)
assert.Equal(t, "pod2", pod2Out.Name)
}
func TestGetPod(t *testing.T) {
broadcaster := podsapi.NewBroadcaster()
server := podsapi.NewPodsServerForTest(broadcaster)
pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "pod1-uid", Name: "pod1", Namespace: "ns1"}}
pod1.Spec.EphemeralContainers = []v1.EphemeralContainer{
{
EphemeralContainerCommon: v1.EphemeralContainerCommon{
Name: "debugger",
Image: "busybox",
Command: []string{"sh"},
Stdin: true,
TTY: true,
},
},
}
status1 := v1.PodStatus{Phase: v1.PodRunning}
server.OnPodAdded(pod1)
server.OnPodStatusUpdated(pod1, status1)
resp, err := server.GetPod(context.Background(), &podsv1alpha1.GetPodRequest{PodUID: "pod1-uid"})
require.NoError(t, err)
podOut := &v1.Pod{}
err = podOut.Unmarshal(resp.Pod)
require.NoError(t, err)
require.Equal(t, "pod1", podOut.Name)
assert.Equal(t, v1.PodRunning, podOut.Status.Phase)
require.Len(t, podOut.Spec.EphemeralContainers, 1)
assert.Equal(t, "debugger", podOut.Spec.EphemeralContainers[0].Name)
assert.Equal(t, "busybox", podOut.Spec.EphemeralContainers[0].Image)
assert.Equal(t, []string{"sh"}, podOut.Spec.EphemeralContainers[0].Command)
assert.True(t, podOut.Spec.EphemeralContainers[0].Stdin)
assert.True(t, podOut.Spec.EphemeralContainers[0].TTY)
}
func TestErrorsAndMetrics(t *testing.T) {
metrics.Register()
t.Run("DroppedWatchEventIncrementsMetric", func(t *testing.T) {
broadcaster := podsapi.NewBroadcaster()
server := podsapi.NewPodsServerForTest(broadcaster)
pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "pod1-uid", Name: "pod1", Namespace: "ns1"}}
server.OnPodAdded(pod1)
// Reset the metric before the test
metrics.PodWatchEventsDroppedTotal.Reset()
clientChannel := make(chan podsapi.PodWatchEvent, 1) // Buffered channel of size 1
broadcaster.Register(clientChannel)
defer broadcaster.Unregister(clientChannel)
// Send two events. The first one should fill the buffer.
broadcaster.Broadcast(podsapi.PodWatchEvent{})
// The second one should be dropped and increment the metric.
broadcaster.Broadcast(podsapi.PodWatchEvent{})
err := testutil.CollectAndCompare(metrics.PodWatchEventsDroppedTotal, strings.NewReader(`
# HELP kubelet_pod_watch_events_dropped_total [ALPHA] Cumulative number of pod watch events dropped.
# TYPE kubelet_pod_watch_events_dropped_total counter
kubelet_pod_watch_events_dropped_total 1
`))
require.NoError(t, err)
})
}
func TestSerialize(t *testing.T) {
apiObjectFuzzer := fuzzer.FuzzerFor(fuzzer.MergeFuzzerFuncs(metafuzzer.Funcs, corefuzzer.Funcs), rand.NewSource(152), legacyscheme.Codecs)
for i := 0; i < 100; i++ {
pod := &v1.Pod{}
apiObjectFuzzer.Fill(pod)
podBytes, err := pod.Marshal()
if err != nil {
t.Fatal(err)
}
resp := &podsv1alpha1.ListPodsResponse{Pods: [][]byte{podBytes}}
data, err := proto.Marshal(resp)
if err != nil {
t.Fatal(err)
}
resp2 := &podsv1alpha1.ListPodsResponse{}
if err := proto.Unmarshal(data, resp2); err != nil {
t.Fatal(err)
}
if !proto.Equal(resp, resp2) {
t.Fatal("round-tripped objects were different")
}
pod2 := &v1.Pod{}
if err := pod2.Unmarshal(resp2.Pods[0]); err != nil {
t.Fatal(err)
}
if !apiequality.Semantic.DeepEqual(pod, pod2) {
t.Fatal("round-tripped objects were different")
}
}
}

View file

@ -91,6 +91,7 @@ import (
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/apis/config/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
"k8s.io/kubernetes/pkg/kubelet/apis/pods"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
"k8s.io/kubernetes/pkg/kubelet/clustertrustbundle"
@ -126,6 +127,7 @@ import (
serverstats "k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/kubelet/stats"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/kubelet/subscription"
"k8s.io/kubernetes/pkg/kubelet/sysctl"
"k8s.io/kubernetes/pkg/kubelet/token"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
@ -302,6 +304,7 @@ type Bootstrap interface {
ListenAndServe(ctx context.Context, kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsConfig *tls.Config, auth server.AuthInterface, tp trace.TracerProvider)
ListenAndServeReadOnly(ctx context.Context, address net.IP, port uint, tp trace.TracerProvider)
ListenAndServePodResources(ctx context.Context)
ListenAndServePods(ctx context.Context)
Run(ctx context.Context, updates <-chan kubetypes.PodUpdate)
}
@ -690,7 +693,17 @@ func NewMainKubelet(ctx context.Context,
klet.mirrorPodClient = kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister)
klet.podManager = kubepod.NewBasicPodManager()
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet, kubeDeps.PodStartupLatencyTracker)
var podSubscribers []subscription.PodUpdateSubscriber
var statusSubscribers []subscription.StatusUpdateSubscriber
if utilfeature.DefaultFeatureGate.Enabled(features.PodsAPI) {
broadcaster := pods.NewBroadcaster()
klet.podsServer = pods.NewPodsServer(broadcaster)
podSubscribers = append(podSubscribers, klet.podsServer)
statusSubscribers = append(statusSubscribers, klet.podsServer)
}
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet, kubeDeps.PodStartupLatencyTracker, statusSubscribers)
klet.allocationManager = allocation.NewManager(
klet.getRootDir(),
klet.statusManager,
@ -732,6 +745,7 @@ func NewMainKubelet(ctx context.Context,
backOffPeriod,
klet.podCache,
klet.allocationManager,
podSubscribers,
)
var singleProcessOOMKill *bool
@ -1570,6 +1584,9 @@ type Kubelet struct {
// flagz is the Reader interface to get flags for flagz page.
flagz flagz.Reader
// podsServer is the server that provides the pods gRPC service.
podsServer *pods.PodsServer
}
// ListPodStats is delegated to StatsProvider, which implements stats.Provider interface
@ -3311,23 +3328,41 @@ func (pp *kubeletPodsProvider) GetPodByName(namespace, name string) (*v1.Pod, bo
return pod, true
}
// ListenAndServePodResources runs the kubelet podresources grpc service
// ListenAndServePodResources runs the kubelet podresources grpc service.
func (kl *Kubelet) ListenAndServePodResources(ctx context.Context) {
endpoint, err := util.LocalEndpoint(kl.getPodResourcesDir(), podresources.Socket)
if err != nil {
klog.FromContext(ctx).V(2).Info("Failed to get local endpoint for PodResources endpoint", "err", err)
return
}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResourcesGet) {
endpoint, err := util.LocalEndpoint(kl.getPodResourcesDir(), podresources.Socket)
if err != nil {
klog.FromContext(ctx).V(2).Info("Failed to get local endpoint for PodResources endpoint", "err", err)
return
}
providers := podresources.PodResourcesProviders{
Pods: &kubeletPodsProvider{kl: kl},
Devices: kl.containerManager,
Cpus: kl.containerManager,
Memory: kl.containerManager,
DynamicResources: kl.containerManager,
}
providers := podresources.PodResourcesProviders{
Pods: &kubeletPodsProvider{kl: kl},
Devices: kl.containerManager,
Cpus: kl.containerManager,
Memory: kl.containerManager,
DynamicResources: kl.containerManager,
}
server.ListenAndServePodResources(ctx, endpoint, providers)
server.ListenAndServePodResources(ctx, endpoint, providers)
}
}
// ListenAndServePod initializes an HTTP server to serve the Pod API.
func (kl *Kubelet) ListenAndServePods(ctx context.Context) {
if utilfeature.DefaultFeatureGate.Enabled(features.PodsAPI) {
endpoint, err := util.LocalEndpoint(kl.getPodsAPIDir(), pods.Socket)
if err != nil {
klog.ErrorS(err, "Failed to get local endpoint for pod api")
return
}
server.ListenAndServePodsServer(
ctx,
endpoint,
kl.podsServer,
)
}
}
// Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around.

View file

@ -216,11 +216,16 @@ func (kl *Kubelet) getPodContainerDir(podUID types.UID, ctrName string) string {
return filepath.Join(kl.getPodDir(podUID), kubeletconfig.DefaultKubeletContainersDirName, ctrName)
}
// getPodResourcesSocket returns the full path to the directory containing the pod resources socket
// getPodResourcesDir returns the full path to the directory containing the pod resources socket
func (kl *Kubelet) getPodResourcesDir() string {
return filepath.Join(kl.getRootDir(), kubeletconfig.DefaultKubeletPodResourcesDirName)
}
// getPodsAPIDir returns the full path to the directory containing the pods API socket
func (kl *Kubelet) getPodsAPIDir() string {
return filepath.Join(kl.getRootDir(), kubeletconfig.DefaultKubeletPodsAPIDirName)
}
// GetPods returns all pods bound to the kubelet and their spec, and the mirror
// pods.
func (kl *Kubelet) GetPods() []*v1.Pod {

View file

@ -100,6 +100,10 @@ func TestKubeletDirs(t *testing.T) {
exp = filepath.Join(root, "pod-resources")
assert.Equal(t, exp, got)
got = kubelet.getPodsAPIDir()
exp = filepath.Join(root, "pods-api")
assert.Equal(t, exp, got)
got = kubelet.getPodVolumeSubpathsDir("abc123")
exp = filepath.Join(root, "pods/abc123/volume-subpaths")
assert.Equal(t, exp, got)

View file

@ -304,7 +304,7 @@ func newTestKubeletWithImageList(
kubelet.mirrorPodClient = fakeMirrorClient
kubelet.podManager = kubepod.NewBasicPodManager()
kubelet.podStartupLatencyTracker = kubeletutil.NewPodStartupLatencyTracker()
kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}, kubelet.podStartupLatencyTracker)
kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}, kubelet.podStartupLatencyTracker, nil)
kubelet.nodeStartupLatencyTracker = kubeletutil.NewNodeStartupLatencyTracker()
kubelet.podCertificateManager = &podcertificate.NoOpManager{}
@ -782,6 +782,7 @@ func TestVolumeAttachLimitExceededCleanup(t *testing.T) {
kl, kl.recorder, kl.workQueue,
kl.resyncInterval, backOffPeriod,
kl.podCache, kl.allocationManager,
nil,
)
kl.volumeManager = kubeletvolume.NewFakeVolumeManager(nil, 0, nil, true /* volumeAttachLimitExceededError */)

View file

@ -27,6 +27,7 @@ const (
DefaultKubeletContainersDirName = "containers"
DefaultKubeletPluginContainersDirName = "plugin-containers"
DefaultKubeletPodResourcesDirName = "pod-resources"
DefaultKubeletPodsAPIDirName = "pods-api"
KubeletPluginsDirSELinuxLabel = "system_u:object_r:container_file_t:s0"
KubeletContainersSharedSELinuxLabel = "system_u:object_r:container_file_t:s0"
DefaultKubeletCheckpointsDirName = "checkpoints"

View file

@ -198,6 +198,9 @@ const (
// Metric key for podcertificate states.
PodCertificateStatesKey = "podcertificate_states"
// Metric key for podsapi
PodWatchEventsDroppedKey = "pod_watch_events_dropped_total"
)
type imageSizeBucket struct {
@ -1266,6 +1269,16 @@ var (
},
[]string{"resource_name", "assignment_type"},
)
// PodWatchEventsDroppedTotal tracks the number of dropped pod watch events.
PodWatchEventsDroppedTotal = metrics.NewCounter(
&metrics.CounterOpts{
Subsystem: KubeletSubsystem,
Name: PodWatchEventsDroppedKey,
Help: "Cumulative number of pod watch events dropped.",
StabilityLevel: metrics.ALPHA,
},
)
)
var registerMetrics sync.Once
@ -1390,6 +1403,8 @@ func Register() {
legacyregistry.MustRegister(ResourceManagerAllocationErrorsTotal)
legacyregistry.MustRegister(ResourceManagerContainerAssignments)
}
legacyregistry.MustRegister(PodWatchEventsDroppedTotal)
})
}

View file

@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/subscription"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/utils/clock"
@ -609,6 +610,9 @@ type podWorkers struct {
// clock is used for testing timing
clock clock.PassiveClock
// subscribers are notified of pod updates.
subscribers []subscription.PodUpdateSubscriber
}
func newPodWorkers(
@ -618,6 +622,7 @@ func newPodWorkers(
resyncInterval, backOffPeriod time.Duration,
podCache kubecontainer.ROCache,
allocationManager allocation.Manager,
subscribers []subscription.PodUpdateSubscriber,
) PodWorkers {
return &podWorkers{
podSyncStatuses: map[types.UID]*podSyncStatus{},
@ -632,6 +637,7 @@ func newPodWorkers(
podCache: podCache,
allocationManager: allocationManager,
clock: clock.RealClock{},
subscribers: subscribers,
}
}
@ -982,6 +988,16 @@ func (p *podWorkers) UpdatePod(ctx context.Context, options UpdatePodOptions) {
}
status.working = true
updateLogger.V(4).Info("Notifying pod of pending update", "workType", status.WorkType())
for _, s := range p.subscribers {
switch options.UpdateType {
case kubetypes.SyncPodCreate:
s.OnPodAdded(pod)
case kubetypes.SyncPodUpdate:
s.OnPodUpdated(pod)
case kubetypes.SyncPodKill:
s.OnPodRemoved(pod)
}
}
select {
case podUpdates <- struct{}{}:
default:

View file

@ -461,6 +461,7 @@ func createPodWorkersWithLogger(_ klog.Logger) (*podWorkers, *containertest.Fake
time.Millisecond,
fakeCache,
allocation.NewInMemoryManager(nil, nil, nil, nil, nil, nil),
nil,
)
workers := w.(*podWorkers)
workers.clock = clock
@ -2169,6 +2170,7 @@ func TestFakePodWorkers(t *testing.T) {
time.Second,
fakeCache,
allocation.NewInMemoryManager(nil, nil, nil, nil, nil, nil),
nil,
)
fakePodWorkers := &fakePodWorkers{
syncPodFn: kubeletForFakeWorkers.SyncPod,

View file

@ -157,7 +157,7 @@ func newTestManager() *manager {
// Add test pod to pod manager, so that status manager can get the pod from pod manager if needed.
podManager.AddPod(getTestPod())
m := NewManager(
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker),
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, nil),
results.NewManager(),
results.NewManager(),
results.NewManager(),

View file

@ -84,7 +84,7 @@ func TestTCPPortExhaustion(t *testing.T) {
podManager := kubepod.NewBasicPodManager()
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
m := NewManager(
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker),
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, nil),
results.NewManager(),
results.NewManager(),
results.NewManager(),

View file

@ -174,7 +174,7 @@ func TestDoProbe(t *testing.T) {
t.Errorf("[%s-%d] Expected result: %v but got %v", probeType, i, test.expectedResult, result)
}
m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(), &statustest.FakePodDeletionSafetyProvider{}, kubeletutil.NewPodStartupLatencyTracker())
m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(), &statustest.FakePodDeletionSafetyProvider{}, kubeletutil.NewPodStartupLatencyTracker(), nil)
resultsManager(m, probeType).Remove(testContainerID)
}
}

View file

@ -19,6 +19,7 @@ package server
import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"net"
@ -82,6 +83,10 @@ import (
remotecommandserver "k8s.io/cri-streaming/pkg/streaming/remotecommand"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
podresourcesapiv1alpha1 "k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
podsv1alpha1 "k8s.io/kubelet/pkg/apis/pods/v1alpha1"
"k8s.io/kubelet/pkg/cri/streaming"
"k8s.io/kubelet/pkg/cri/streaming/portforward"
remotecommandserver "k8s.io/kubelet/pkg/cri/streaming/remotecommand"
kubelettypes "k8s.io/kubelet/pkg/types"
"k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core"
@ -90,6 +95,7 @@ import (
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
apisgrpc "k8s.io/kubernetes/pkg/kubelet/apis/grpc"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
"k8s.io/kubernetes/pkg/kubelet/apis/pods"
kubeletcadvisor "k8s.io/kubernetes/pkg/kubelet/cadvisor"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
@ -265,6 +271,32 @@ func ListenAndServePodResources(ctx context.Context, endpoint string, providers
}
}
// ListenAndServePodsServer initializes an HTTP server to serve the Pod API.
func ListenAndServePodsServer(ctx context.Context, endpoint string, srv podsv1alpha1.PodsServer) {
logger := klog.FromContext(ctx)
server := grpc.NewServer(apisgrpc.WithRateLimiter(ctx, "pods", pods.DefaultQPS, pods.DefaultBurstTokens))
podsv1alpha1.RegisterPodsServer(server, srv)
l, err := util.CreateListener(endpoint)
if err != nil {
logger.Error(err, "Failed to create listener for pods API endpoint")
os.Exit(1)
}
logger.Info("Starting to serve the pods API", "endpoint", endpoint)
go func() {
if err := server.Serve(l); err != nil && !errors.Is(err, grpc.ErrServerStopped) {
logger.Error(err, "Failed to serve")
os.Exit(1)
}
}()
<-ctx.Done()
logger.Info("Shutting down pods API server")
server.GracefulStop()
}
type NodeRequestAttributesGetter interface {
GetRequestAttributes(ctx context.Context, u user.Info, r *http.Request) []authorizer.Attributes
}

View file

@ -42,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/features"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/subscription"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
kubeutil "k8s.io/kubernetes/pkg/kubelet/util"
statusutil "k8s.io/kubernetes/pkg/util/pod"
@ -80,6 +81,7 @@ type manager struct {
podDeletionSafety PodDeletionSafetyProvider
podStartupLatencyHelper PodStartupLatencyStateHelper
subscribers []subscription.StatusUpdateSubscriber
}
type podResizeConditions struct {
@ -190,7 +192,7 @@ type Manager interface {
const syncPeriod = 10 * time.Second
// NewManager returns a functional Manager.
func NewManager(kubeClient clientset.Interface, podManager PodManager, podDeletionSafety PodDeletionSafetyProvider, podStartupLatencyHelper PodStartupLatencyStateHelper) Manager {
func NewManager(kubeClient clientset.Interface, podManager PodManager, podDeletionSafety PodDeletionSafetyProvider, podStartupLatencyHelper PodStartupLatencyStateHelper, subscribers []subscription.StatusUpdateSubscriber) Manager {
return &manager{
kubeClient: kubeClient,
podManager: podManager,
@ -200,6 +202,7 @@ func NewManager(kubeClient clientset.Interface, podManager PodManager, podDeleti
apiStatusVersions: make(map[kubetypes.MirrorPodUID]uint64),
podDeletionSafety: podDeletionSafety,
podStartupLatencyHelper: podStartupLatencyHelper,
subscribers: subscribers,
}
}
@ -933,6 +936,10 @@ func (m *manager) updateStatusInternal(logger klog.Logger, pod *v1.Pod, status v
m.podStatuses[pod.UID] = newStatus
for _, s := range m.subscribers {
s.OnPodStatusUpdated(pod, status)
}
select {
case m.podStatusChannel <- struct{}{}:
default:

View file

@ -98,7 +98,7 @@ func newTestManager(kubeClient clientset.Interface) *manager {
podManager := kubepod.NewBasicPodManager()
podManager.(mutablePodManager).AddPod(getTestPod())
podStartupLatencyTracker := util.NewPodStartupLatencyTracker()
return NewManager(kubeClient, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker).(*manager)
return NewManager(kubeClient, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, nil).(*manager)
}
func generateRandomMessage() string {
@ -1132,7 +1132,7 @@ func TestTerminatePod_DefaultUnknownStatus(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
podManager := kubepod.NewBasicPodManager()
podStartupLatencyTracker := util.NewPodStartupLatencyTracker()
syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker).(*manager)
syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, nil).(*manager)
original := tc.pod.DeepCopy()
syncer.SetPodStatus(logger, original, original.Status)
@ -1221,7 +1221,7 @@ func TestTerminatePod_EnsurePodPhaseIsTerminal(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
podManager := kubepod.NewBasicPodManager()
podStartupLatencyTracker := util.NewPodStartupLatencyTracker()
syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker).(*manager)
syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, nil).(*manager)
pod := getTestPod()
pod.Status = tc.status
@ -2187,7 +2187,7 @@ func TestContainerTerminationMetric(t *testing.T) {
}
func TestPodResizeConditions(t *testing.T) {
m := NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(), &statustest.FakePodDeletionSafetyProvider{}, util.NewPodStartupLatencyTracker())
m := NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(), &statustest.FakePodDeletionSafetyProvider{}, util.NewPodStartupLatencyTracker(), nil)
podUID := types.UID("12345")
testCases := []struct {
@ -2388,7 +2388,7 @@ func TestPodResizeConditions(t *testing.T) {
}
func TestClearPodResizeInProgressCondition(t *testing.T) {
m := NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(), &statustest.FakePodDeletionSafetyProvider{}, util.NewPodStartupLatencyTracker())
m := NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(), &statustest.FakePodDeletionSafetyProvider{}, util.NewPodStartupLatencyTracker(), nil)
podUID := types.UID("12345")
testCases := []struct {

View file

@ -0,0 +1,50 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package subscription
import (
v1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/types"
)
// PodUpdate is a union of a pod update and a pod status update.
type PodUpdate struct {
Pod *v1.Pod
Update types.SyncPodType
}
// PodUpdateSubscriber is an interface for other Kubelet components to subscribe to pod updates.
type PodUpdateSubscriber interface {
// OnPodAdded is called when a pod is added.
OnPodAdded(pod *v1.Pod)
// OnPodUpdated is called when a pod is updated.
OnPodUpdated(pod *v1.Pod)
// OnPodRemoved is called when a pod is removed.
OnPodRemoved(pod *v1.Pod)
}
// StatusUpdate is a union of a pod and a pod status.
type StatusUpdate struct {
Pod *v1.Pod
Status v1.PodStatus
}
// StatusUpdateSubscriber is an interface for other Kubelet components to subscribe to pod status updates.
type StatusUpdateSubscriber interface {
// OnPodStatusUpdated is called when a pod's status is updated.
OnPodStatusUpdated(pod *v1.Pod, status v1.PodStatus)
}

View file

@ -0,0 +1,460 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.4
// protoc v4.23.4
// source: staging/src/k8s.io/kubelet/pkg/apis/pods/v1alpha1/api.proto
package v1alpha1
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
unsafe "unsafe"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type EventType int32
const (
EventType_ADDED EventType = 0
EventType_MODIFIED EventType = 1
EventType_DELETED EventType = 2
)
// Enum value maps for EventType.
var (
EventType_name = map[int32]string{
0: "ADDED",
1: "MODIFIED",
2: "DELETED",
}
EventType_value = map[string]int32{
"ADDED": 0,
"MODIFIED": 1,
"DELETED": 2,
}
)
func (x EventType) Enum() *EventType {
p := new(EventType)
*p = x
return p
}
func (x EventType) String() string {
return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
}
func (EventType) Descriptor() protoreflect.EnumDescriptor {
return file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_enumTypes[0].Descriptor()
}
func (EventType) Type() protoreflect.EnumType {
return &file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_enumTypes[0]
}
func (x EventType) Number() protoreflect.EnumNumber {
return protoreflect.EnumNumber(x)
}
// Deprecated: Use EventType.Descriptor instead.
func (EventType) EnumDescriptor() ([]byte, []int) {
return file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_rawDescGZIP(), []int{0}
}
// This is sent for each update in the Watch stream
// Pod bytes are generated by calling v1.Pod#Marshal().
// They are convertable to a pod by calling v1.Pod#Unmarshal().
type WatchPodsEvent struct {
state protoimpl.MessageState `protogen:"open.v1"`
// ADDED, MODIFIED, DELETED
Type EventType `protobuf:"varint,1,opt,name=type,proto3,enum=v1alpha1.EventType" json:"type,omitempty"`
Pod []byte `protobuf:"bytes,2,opt,name=pod,proto3" json:"pod,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *WatchPodsEvent) Reset() {
*x = WatchPodsEvent{}
mi := &file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *WatchPodsEvent) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*WatchPodsEvent) ProtoMessage() {}
func (x *WatchPodsEvent) ProtoReflect() protoreflect.Message {
mi := &file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_msgTypes[0]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use WatchPodsEvent.ProtoReflect.Descriptor instead.
func (*WatchPodsEvent) Descriptor() ([]byte, []int) {
return file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_rawDescGZIP(), []int{0}
}
func (x *WatchPodsEvent) GetType() EventType {
if x != nil {
return x.Type
}
return EventType_ADDED
}
func (x *WatchPodsEvent) GetPod() []byte {
if x != nil {
return x.Pod
}
return nil
}
// ListPodsResponse returns a list of Pods.
// Pod bytes are generated by calling v1.Pod#Marshal().
// They are convertable to a pod by calling v1.Pod#Unmarshal().
type ListPodsResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Pods [][]byte `protobuf:"bytes,1,rep,name=pods,proto3" json:"pods,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ListPodsResponse) Reset() {
*x = ListPodsResponse{}
mi := &file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ListPodsResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ListPodsResponse) ProtoMessage() {}
func (x *ListPodsResponse) ProtoReflect() protoreflect.Message {
mi := &file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_msgTypes[1]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ListPodsResponse.ProtoReflect.Descriptor instead.
func (*ListPodsResponse) Descriptor() ([]byte, []int) {
return file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_rawDescGZIP(), []int{1}
}
func (x *ListPodsResponse) GetPods() [][]byte {
if x != nil {
return x.Pods
}
return nil
}
// GetPodResponse returns a Pod.
// Pod bytes are generated by calling v1.Pod#Marshal().
// They are convertable to a pod by calling v1.Pod#Unmarshal().
type GetPodResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Pod []byte `protobuf:"bytes,1,opt,name=pod,proto3" json:"pod,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *GetPodResponse) Reset() {
*x = GetPodResponse{}
mi := &file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *GetPodResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*GetPodResponse) ProtoMessage() {}
func (x *GetPodResponse) ProtoReflect() protoreflect.Message {
mi := &file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_msgTypes[2]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use GetPodResponse.ProtoReflect.Descriptor instead.
func (*GetPodResponse) Descriptor() ([]byte, []int) {
return file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_rawDescGZIP(), []int{2}
}
func (x *GetPodResponse) GetPod() []byte {
if x != nil {
return x.Pod
}
return nil
}
type WatchPodsRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *WatchPodsRequest) Reset() {
*x = WatchPodsRequest{}
mi := &file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *WatchPodsRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*WatchPodsRequest) ProtoMessage() {}
func (x *WatchPodsRequest) ProtoReflect() protoreflect.Message {
mi := &file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_msgTypes[3]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use WatchPodsRequest.ProtoReflect.Descriptor instead.
func (*WatchPodsRequest) Descriptor() ([]byte, []int) {
return file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_rawDescGZIP(), []int{3}
}
type ListPodsRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ListPodsRequest) Reset() {
*x = ListPodsRequest{}
mi := &file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ListPodsRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ListPodsRequest) ProtoMessage() {}
func (x *ListPodsRequest) ProtoReflect() protoreflect.Message {
mi := &file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_msgTypes[4]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ListPodsRequest.ProtoReflect.Descriptor instead.
func (*ListPodsRequest) Descriptor() ([]byte, []int) {
return file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_rawDescGZIP(), []int{4}
}
type GetPodRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
PodUID string `protobuf:"bytes,1,opt,name=podUID,proto3" json:"podUID,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *GetPodRequest) Reset() {
*x = GetPodRequest{}
mi := &file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *GetPodRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*GetPodRequest) ProtoMessage() {}
func (x *GetPodRequest) ProtoReflect() protoreflect.Message {
mi := &file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_msgTypes[5]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use GetPodRequest.ProtoReflect.Descriptor instead.
func (*GetPodRequest) Descriptor() ([]byte, []int) {
return file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_rawDescGZIP(), []int{5}
}
func (x *GetPodRequest) GetPodUID() string {
if x != nil {
return x.PodUID
}
return ""
}
var File_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto protoreflect.FileDescriptor
var file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_rawDesc = string([]byte{
0x0a, 0x3b, 0x73, 0x74, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x2f, 0x73, 0x72, 0x63, 0x2f, 0x6b, 0x38,
0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x6b, 0x75, 0x62, 0x65, 0x6c, 0x65, 0x74, 0x2f, 0x70, 0x6b, 0x67,
0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x6f, 0x64, 0x73, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70,
0x68, 0x61, 0x31, 0x2f, 0x61, 0x70, 0x69, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x76,
0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x22, 0x4b, 0x0a, 0x0e, 0x57, 0x61, 0x74, 0x63, 0x68,
0x50, 0x6f, 0x64, 0x73, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x27, 0x0a, 0x04, 0x74, 0x79, 0x70,
0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x13, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68,
0x61, 0x31, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79,
0x70, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x6f, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52,
0x03, 0x70, 0x6f, 0x64, 0x22, 0x26, 0x0a, 0x10, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x6f, 0x64, 0x73,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x6f, 0x64, 0x73,
0x18, 0x01, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x04, 0x70, 0x6f, 0x64, 0x73, 0x22, 0x22, 0x0a, 0x0e,
0x47, 0x65, 0x74, 0x50, 0x6f, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x10,
0x0a, 0x03, 0x70, 0x6f, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x70, 0x6f, 0x64,
0x22, 0x12, 0x0a, 0x10, 0x57, 0x61, 0x74, 0x63, 0x68, 0x50, 0x6f, 0x64, 0x73, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x22, 0x11, 0x0a, 0x0f, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x6f, 0x64, 0x73,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x27, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x50, 0x6f,
0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x70, 0x6f, 0x64, 0x55,
0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x70, 0x6f, 0x64, 0x55, 0x49, 0x44,
0x2a, 0x31, 0x0a, 0x09, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x09, 0x0a,
0x05, 0x41, 0x44, 0x44, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0c, 0x0a, 0x08, 0x4d, 0x4f, 0x44, 0x49,
0x46, 0x49, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x44, 0x45, 0x4c, 0x45, 0x54, 0x45,
0x44, 0x10, 0x02, 0x32, 0xd1, 0x01, 0x0a, 0x04, 0x50, 0x6f, 0x64, 0x73, 0x12, 0x45, 0x0a, 0x09,
0x57, 0x61, 0x74, 0x63, 0x68, 0x50, 0x6f, 0x64, 0x73, 0x12, 0x1a, 0x2e, 0x76, 0x31, 0x61, 0x6c,
0x70, 0x68, 0x61, 0x31, 0x2e, 0x57, 0x61, 0x74, 0x63, 0x68, 0x50, 0x6f, 0x64, 0x73, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31,
0x2e, 0x57, 0x61, 0x74, 0x63, 0x68, 0x50, 0x6f, 0x64, 0x73, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x22,
0x00, 0x30, 0x01, 0x12, 0x43, 0x0a, 0x08, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x6f, 0x64, 0x73, 0x12,
0x19, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x50,
0x6f, 0x64, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x76, 0x31, 0x61,
0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x6f, 0x64, 0x73, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x3d, 0x0a, 0x06, 0x47, 0x65, 0x74, 0x50,
0x6f, 0x64, 0x12, 0x17, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x47, 0x65,
0x74, 0x50, 0x6f, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x76, 0x31,
0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x6f, 0x64, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x27, 0x5a, 0x25, 0x6b, 0x38, 0x73, 0x2e, 0x69,
0x6f, 0x2f, 0x6b, 0x75, 0x62, 0x65, 0x6c, 0x65, 0x74, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70,
0x69, 0x73, 0x2f, 0x70, 0x6f, 0x64, 0x73, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31,
0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
})
var (
file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_rawDescOnce sync.Once
file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_rawDescData []byte
)
func file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_rawDescGZIP() []byte {
file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_rawDescOnce.Do(func() {
file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_rawDesc), len(file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_rawDesc)))
})
return file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_rawDescData
}
var file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_goTypes = []any{
(EventType)(0), // 0: v1alpha1.EventType
(*WatchPodsEvent)(nil), // 1: v1alpha1.WatchPodsEvent
(*ListPodsResponse)(nil), // 2: v1alpha1.ListPodsResponse
(*GetPodResponse)(nil), // 3: v1alpha1.GetPodResponse
(*WatchPodsRequest)(nil), // 4: v1alpha1.WatchPodsRequest
(*ListPodsRequest)(nil), // 5: v1alpha1.ListPodsRequest
(*GetPodRequest)(nil), // 6: v1alpha1.GetPodRequest
}
var file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_depIdxs = []int32{
0, // 0: v1alpha1.WatchPodsEvent.type:type_name -> v1alpha1.EventType
4, // 1: v1alpha1.Pods.WatchPods:input_type -> v1alpha1.WatchPodsRequest
5, // 2: v1alpha1.Pods.ListPods:input_type -> v1alpha1.ListPodsRequest
6, // 3: v1alpha1.Pods.GetPod:input_type -> v1alpha1.GetPodRequest
1, // 4: v1alpha1.Pods.WatchPods:output_type -> v1alpha1.WatchPodsEvent
2, // 5: v1alpha1.Pods.ListPods:output_type -> v1alpha1.ListPodsResponse
3, // 6: v1alpha1.Pods.GetPod:output_type -> v1alpha1.GetPodResponse
4, // [4:7] is the sub-list for method output_type
1, // [1:4] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
}
func init() { file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_init() }
func file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_init() {
if File_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto != nil {
return
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_rawDesc), len(file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_rawDesc)),
NumEnums: 1,
NumMessages: 6,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_goTypes,
DependencyIndexes: file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_depIdxs,
EnumInfos: file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_enumTypes,
MessageInfos: file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_msgTypes,
}.Build()
File_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto = out.File
file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_goTypes = nil
file_staging_src_k8s_io_kubelet_pkg_apis_pods_v1alpha1_api_proto_depIdxs = nil
}

View file

@ -0,0 +1,57 @@
syntax = "proto3";
package v1alpha1;
option go_package = "k8s.io/kubelet/pkg/apis/pods/v1alpha1";
service Pods {
// WatchPods will initially return an ADDED WatchPodsEvent for each
// existing Pod, and then return a WatchPodsEvent whenever a Pod is created,
// deleted, or updated.
rpc WatchPods(WatchPodsRequest) returns (stream WatchPodsEvent) {}
// ListPods returns a of List of Pods
rpc ListPods(ListPodsRequest) returns (ListPodsResponse) {}
// GetPod returns a Pod for given pod's UID
rpc GetPod(GetPodRequest) returns (GetPodResponse) {}
}
enum EventType {
ADDED = 0;
MODIFIED = 1;
DELETED = 2;
}
// This is sent for each update in the Watch stream
// Pod bytes are generated by calling v1.Pod#Marshal().
// They are convertable to a pod by calling v1.Pod#Unmarshal().
message WatchPodsEvent {
// ADDED, MODIFIED, DELETED
EventType type = 1;
bytes pod = 2;
}
// ListPodsResponse returns a list of Pods.
// Pod bytes are generated by calling v1.Pod#Marshal().
// They are convertable to a pod by calling v1.Pod#Unmarshal().
message ListPodsResponse {
repeated bytes pods = 1;
}
// GetPodResponse returns a Pod.
// Pod bytes are generated by calling v1.Pod#Marshal().
// They are convertable to a pod by calling v1.Pod#Unmarshal().
message GetPodResponse {
bytes pod = 1;
}
message WatchPodsRequest {}
message ListPodsRequest {}
message GetPodRequest {
string podUID = 1;
}

View file

@ -0,0 +1,227 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.5.1
// - protoc v4.23.4
// source: staging/src/k8s.io/kubelet/pkg/apis/pods/v1alpha1/api.proto
package v1alpha1
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.64.0 or later.
const _ = grpc.SupportPackageIsVersion9
const (
Pods_WatchPods_FullMethodName = "/v1alpha1.Pods/WatchPods"
Pods_ListPods_FullMethodName = "/v1alpha1.Pods/ListPods"
Pods_GetPod_FullMethodName = "/v1alpha1.Pods/GetPod"
)
// PodsClient is the client API for Pods service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type PodsClient interface {
// WatchPods will initially return an ADDED WatchPodsEvent for each
// existing Pod, and then return a WatchPodsEvent whenever a Pod is created,
// deleted, or updated.
WatchPods(ctx context.Context, in *WatchPodsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[WatchPodsEvent], error)
// ListPods returns a of List of Pods
ListPods(ctx context.Context, in *ListPodsRequest, opts ...grpc.CallOption) (*ListPodsResponse, error)
// GetPod returns a Pod for given pod's UID
GetPod(ctx context.Context, in *GetPodRequest, opts ...grpc.CallOption) (*GetPodResponse, error)
}
type podsClient struct {
cc grpc.ClientConnInterface
}
func NewPodsClient(cc grpc.ClientConnInterface) PodsClient {
return &podsClient{cc}
}
func (c *podsClient) WatchPods(ctx context.Context, in *WatchPodsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[WatchPodsEvent], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &Pods_ServiceDesc.Streams[0], Pods_WatchPods_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &grpc.GenericClientStream[WatchPodsRequest, WatchPodsEvent]{ClientStream: stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type Pods_WatchPodsClient = grpc.ServerStreamingClient[WatchPodsEvent]
func (c *podsClient) ListPods(ctx context.Context, in *ListPodsRequest, opts ...grpc.CallOption) (*ListPodsResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ListPodsResponse)
err := c.cc.Invoke(ctx, Pods_ListPods_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *podsClient) GetPod(ctx context.Context, in *GetPodRequest, opts ...grpc.CallOption) (*GetPodResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(GetPodResponse)
err := c.cc.Invoke(ctx, Pods_GetPod_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
// PodsServer is the server API for Pods service.
// All implementations must embed UnimplementedPodsServer
// for forward compatibility.
type PodsServer interface {
// WatchPods will initially return an ADDED WatchPodsEvent for each
// existing Pod, and then return a WatchPodsEvent whenever a Pod is created,
// deleted, or updated.
WatchPods(*WatchPodsRequest, grpc.ServerStreamingServer[WatchPodsEvent]) error
// ListPods returns a of List of Pods
ListPods(context.Context, *ListPodsRequest) (*ListPodsResponse, error)
// GetPod returns a Pod for given pod's UID
GetPod(context.Context, *GetPodRequest) (*GetPodResponse, error)
mustEmbedUnimplementedPodsServer()
}
// UnimplementedPodsServer must be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedPodsServer struct{}
func (UnimplementedPodsServer) WatchPods(*WatchPodsRequest, grpc.ServerStreamingServer[WatchPodsEvent]) error {
return status.Errorf(codes.Unimplemented, "method WatchPods not implemented")
}
func (UnimplementedPodsServer) ListPods(context.Context, *ListPodsRequest) (*ListPodsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ListPods not implemented")
}
func (UnimplementedPodsServer) GetPod(context.Context, *GetPodRequest) (*GetPodResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetPod not implemented")
}
func (UnimplementedPodsServer) mustEmbedUnimplementedPodsServer() {}
func (UnimplementedPodsServer) testEmbeddedByValue() {}
// UnsafePodsServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to PodsServer will
// result in compilation errors.
type UnsafePodsServer interface {
mustEmbedUnimplementedPodsServer()
}
func RegisterPodsServer(s grpc.ServiceRegistrar, srv PodsServer) {
// If the following call pancis, it indicates UnimplementedPodsServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&Pods_ServiceDesc, srv)
}
func _Pods_WatchPods_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(WatchPodsRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(PodsServer).WatchPods(m, &grpc.GenericServerStream[WatchPodsRequest, WatchPodsEvent]{ServerStream: stream})
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type Pods_WatchPodsServer = grpc.ServerStreamingServer[WatchPodsEvent]
func _Pods_ListPods_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ListPodsRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(PodsServer).ListPods(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Pods_ListPods_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(PodsServer).ListPods(ctx, req.(*ListPodsRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Pods_GetPod_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetPodRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(PodsServer).GetPod(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Pods_GetPod_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(PodsServer).GetPod(ctx, req.(*GetPodRequest))
}
return interceptor(ctx, in, info, handler)
}
// Pods_ServiceDesc is the grpc.ServiceDesc for Pods service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Pods_ServiceDesc = grpc.ServiceDesc{
ServiceName: "v1alpha1.Pods",
HandlerType: (*PodsServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "ListPods",
Handler: _Pods_ListPods_Handler,
},
{
MethodName: "GetPod",
Handler: _Pods_GetPod_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "WatchPods",
Handler: _Pods_WatchPods_Handler,
ServerStreams: true,
},
},
Metadata: "staging/src/k8s.io/kubelet/pkg/apis/pods/v1alpha1/api.proto",
}

View file

@ -1447,6 +1447,12 @@
lockToDefault: false
preRelease: Beta
version: "1.29"
- name: PodsAPI
versionedSpecs:
- default: false
lockToDefault: false
preRelease: Alpha
version: "1.35"
- name: PodSchedulingReadiness
versionedSpecs:
- default: false

View file

@ -0,0 +1,199 @@
//go:build linux
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2enode
import (
"context"
"time"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
podsv1alpha1 "k8s.io/kubelet/pkg/apis/pods/v1alpha1"
kubefeatures "k8s.io/kubernetes/pkg/features"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/apis/pods"
"k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/test/e2e/framework"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
admissionapi "k8s.io/pod-security-admission/api"
)
// podsAPISuite is a Ginkgo test suite for the Kubelet Pods API.
var _ = SIGDescribe("Kubelet Pods API", framework.WithSerial(), func() {
f := framework.NewDefaultFramework("pods-api-test")
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
ginkgo.Context("when the PodsAPI feature gate is enabled", func() {
var (
conn *grpc.ClientConn
client podsv1alpha1.PodsClient
)
tempSetCurrentKubeletConfig(f, func(ctx context.Context, initialConfig *kubeletconfig.KubeletConfiguration) {
if initialConfig.FeatureGates == nil {
initialConfig.FeatureGates = make(map[string]bool)
}
initialConfig.FeatureGates[string(kubefeatures.PodsAPI)] = true
})
ginkgo.BeforeEach(func(ctx context.Context) {
ginkgo.By("Wait for the node to be ready")
waitForNodeReady(ctx)
ginkgo.By("Connecting to Pods API")
endpoint, err := util.LocalEndpoint("/var/lib/kubelet/pods-api", pods.Socket)
framework.ExpectNoError(err, "failed to get local endpoint for Pods API")
gomega.Eventually(ctx, func(ctx context.Context) error {
conn, err = grpc.DialContext(ctx, endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
if err != nil {
return err
}
client = podsv1alpha1.NewPodsClient(conn)
// Make a simple call to ensure the server is responsive.
_, err = client.ListPods(ctx, &podsv1alpha1.ListPodsRequest{})
if err != nil {
conn.Close() // Close connection on failure to retry dialing
return err
}
return nil
}, "1m", "5s").Should(gomega.Succeed(), "failed to connect to Pods API")
})
ginkgo.AfterEach(func() {
if conn != nil {
conn.Close()
}
})
ginkgo.It("should be able to list, get, and watch pods", func(ctx context.Context) {
// Create a test pod
podName := "test-pod-" + string(uuid.NewUUID())
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: f.Namespace.Name,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "test-container",
Image: "busybox:1.36",
Command: []string{"sleep", "3600"},
},
},
},
}
ginkgo.By("creating a test pod")
testPod := e2epod.NewPodClient(f).CreateSync(ctx, pod)
ginkgo.By("listing pods and ensuring the test pod is present")
gomega.Eventually(ctx, func(ctx context.Context) bool {
listResp, err := client.ListPods(ctx, &podsv1alpha1.ListPodsRequest{})
if err != nil {
framework.Logf("failed to list pods, will retry: %v", err)
return false
}
for _, p := range listResp.Pods {
var pod v1.Pod
err := pod.Unmarshal(p)
if err != nil {
framework.Logf("failed to unmarshal pod, will retry: %v", err)
return false
}
if pod.ObjectMeta.UID == testPod.UID {
return true
}
}
return false
}, "1m", "5s").Should(gomega.BeTrue(), "test pod not found in list")
ginkgo.By("getting the test pod by UID")
getResp, err := client.GetPod(ctx, &podsv1alpha1.GetPodRequest{PodUID: string(testPod.UID)})
framework.ExpectNoError(err, "failed to get pod")
var podFromGet v1.Pod
err = podFromGet.Unmarshal(getResp.Pod)
framework.ExpectNoError(err, "failed to unmarshal pod from get")
gomega.Expect(podFromGet.ObjectMeta.UID).To(gomega.Equal(testPod.UID))
ginkgo.By("watching for pod events")
watchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
watchClient, err := client.WatchPods(watchCtx, &podsv1alpha1.WatchPodsRequest{})
framework.ExpectNoError(err, "failed to watch pods")
// Expect to receive an ADDED event for the new pod
var podFromWatch v1.Pod
gomega.Eventually(ctx, func(ctx context.Context) (types.UID, error) {
event, err := watchClient.Recv()
if err != nil {
return "", err
}
if event.Type != podsv1alpha1.EventType_ADDED {
return "", nil // Continue waiting
}
if err := podFromWatch.Unmarshal(event.Pod); err != nil {
return "", err
}
return podFromWatch.ObjectMeta.UID, nil
}, "1m", "1s").Should(gomega.Equal(testPod.UID), "did not receive ADDED event for the test pod")
ginkgo.By("deleting the test pod")
eventChan := make(chan *podsv1alpha1.WatchPodsEvent, 10)
go func() {
defer ginkgo.GinkgoRecover()
defer close(eventChan)
for {
event, err := watchClient.Recv()
if err != nil {
return // Error will be detected by Eventually timeout
}
eventChan <- event
}
}()
gracePeriod := int64(0)
err = e2epod.NewPodClient(f).Delete(ctx, testPod.Name, metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod})
framework.ExpectNoError(err, "failed to delete pod")
// Expect to receive a DELETED event
gomega.Eventually(eventChan, "1m", "100ms").Should(gomega.Receive(gomega.SatisfyAll(
gomega.WithTransform(func(event *podsv1alpha1.WatchPodsEvent) podsv1alpha1.EventType { return event.Type }, gomega.Equal(podsv1alpha1.EventType_DELETED)),
gomega.WithTransform(
func(event *podsv1alpha1.WatchPodsEvent) types.UID {
var p v1.Pod
if err := p.Unmarshal(event.Pod); err != nil {
return ""
}
return p.UID
},
gomega.Equal(testPod.UID),
),
)), "did not receive DELETED event for the test pod")
})
})
})