cri-api: Add streaming RPCs for CRI list operations

Add server-side streaming RPCs to bypass the gRPC 16MB message size
limit on nodes with many containers/pods. This implements KEP-5825.

New RuntimeService streaming RPCs:
- StreamPodSandboxes
- StreamContainers
- StreamContainerStats
- StreamPodSandboxStats
- StreamPodSandboxMetrics

New ImageService streaming RPC:
- StreamImages

Each streaming RPC accepts the same filter as its unary counterpart
and streams results one item at a time.

Feature gate: CRIListStreaming
KEP: https://kep.k8s.io/5825

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Signed-off-by: Ayato Tokubi <atokubi@redhat.com>
This commit is contained in:
Ayato Tokubi 2026-02-12 14:55:38 +00:00
parent 09183519e6
commit 3256f5175f
19 changed files with 4147 additions and 2299 deletions

View file

@ -41,11 +41,11 @@ type Impl interface {
}
func (*defaultImpl) NewRemoteRuntimeService(ctx context.Context, endpoint string, connectionTimeout time.Duration) (criapi.RuntimeService, error) {
return criclient.NewRemoteRuntimeService(ctx, endpoint, connectionTimeout, nil)
return criclient.NewRemoteRuntimeService(ctx, endpoint, connectionTimeout, nil, false)
}
func (*defaultImpl) NewRemoteImageService(ctx context.Context, endpoint string, connectionTimeout time.Duration) (criapi.ImageManagerService, error) {
return criclient.NewRemoteImageService(ctx, endpoint, connectionTimeout, nil)
return criclient.NewRemoteImageService(ctx, endpoint, connectionTimeout, nil, false)
}
func (*defaultImpl) RuntimeConfig(ctx context.Context, runtimeService criapi.RuntimeService) (*runtimeapi.RuntimeConfigResponse, error) {

View file

@ -247,14 +247,14 @@ func run(ctx context.Context, config *hollowNodeConfig) error {
return fmt.Errorf("Failed to start fake runtime, error: %w", err)
}
defer fakeRemoteRuntime.Stop()
runtimeService, err := remote.NewRemoteRuntimeService(ctx, endpoint, 15*time.Second, noop.NewTracerProvider())
runtimeService, err := remote.NewRemoteRuntimeService(ctx, endpoint, 15*time.Second, noop.NewTracerProvider(), false)
if err != nil {
return fmt.Errorf("Failed to init runtime service, error: %w", err)
}
var imageService internalapi.ImageManagerService = fakeRemoteRuntime.ImageService
if config.UseHostImageService {
imageService, err = remote.NewRemoteImageService(ctx, c.ImageServiceEndpoint, 15*time.Second, noop.NewTracerProvider())
imageService, err = remote.NewRemoteImageService(ctx, c.ImageServiceEndpoint, 15*time.Second, noop.NewTracerProvider(), false)
if err != nil {
return fmt.Errorf("Failed to init image service, error: %w", err)
}

View file

@ -118,6 +118,12 @@ const (
// Allow the usage of options to fine-tune the cpumanager policies.
CPUManagerPolicyOptions featuregate.Feature = "CPUManagerPolicyOptions"
// owner: @bitoku
// kep: https://kep.k8s.io/5825
//
// Enables using streaming RPCs for CRI list operations.
CRIListStreaming featuregate.Feature = "CRIListStreaming"
// owner: @aramase
// kep: http://kep.k8s.io/5538
//
@ -1221,6 +1227,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.36
},
CRIListStreaming: {
{Version: version.MustParse("1.36"), Default: false, PreRelease: featuregate.Alpha},
},
CSIServiceAccountTokenSecrets: {
{Version: version.MustParse("1.35"), Default: true, PreRelease: featuregate.Beta},
{Version: version.MustParse("1.36"), Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // GA in 1.36; remove in 1.39
@ -2283,6 +2293,8 @@ var defaultKubernetesFeatureGateDependencies = map[featuregate.Feature][]feature
CPUManagerPolicyOptions: {},
CRIListStreaming: {},
CSIServiceAccountTokenSecrets: {},
CSIVolumeHealth: {},

View file

@ -405,10 +405,11 @@ func PreInitRuntimeService(ctx context.Context, kubeCfg *kubeletconfiginternal.K
remoteImageEndpoint = kubeCfg.ContainerRuntimeEndpoint
}
var err error
if kubeDeps.RemoteRuntimeService, err = remote.NewRemoteRuntimeService(ctx, kubeCfg.ContainerRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration, kubeDeps.TracerProvider); err != nil {
useStreaming := utilfeature.DefaultFeatureGate.Enabled(features.CRIListStreaming)
if kubeDeps.RemoteRuntimeService, err = remote.NewRemoteRuntimeService(ctx, kubeCfg.ContainerRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration, kubeDeps.TracerProvider, useStreaming); err != nil {
return err
}
if kubeDeps.RemoteImageService, err = remote.NewRemoteImageService(ctx, remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration, kubeDeps.TracerProvider); err != nil {
if kubeDeps.RemoteImageService, err = remote.NewRemoteImageService(ctx, remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration, kubeDeps.TracerProvider, useStreaming); err != nil {
return err
}

View file

@ -3275,7 +3275,7 @@ func createAndStartFakeRemoteRuntime(t *testing.T) (*fakeremote.RemoteRuntime, s
}
func createRemoteRuntimeService(ctx context.Context, endpoint string, t *testing.T, tp oteltrace.TracerProvider) internalapi.RuntimeService {
runtimeService, err := remote.NewRemoteRuntimeService(ctx, endpoint, 15*time.Second, tp)
runtimeService, err := remote.NewRemoteRuntimeService(ctx, endpoint, 15*time.Second, tp, false)
require.NoError(t, err)
return runtimeService
}
@ -3600,7 +3600,7 @@ func TestSyncPodSpans(t *testing.T) {
fakeRuntime.ImageService.SetFakeImageSize(100)
fakeRuntime.ImageService.SetFakeImages([]string{"test:latest"})
imageSvc, err := remote.NewRemoteImageService(tCtx, endpoint, 15*time.Second, tp)
imageSvc, err := remote.NewRemoteImageService(tCtx, endpoint, 15*time.Second, tp, false)
assert.NoError(t, err)
kubelet.containerRuntime, _, err = kuberuntime.NewKubeGenericRuntimeManager(

File diff suppressed because it is too large Load diff

View file

@ -48,6 +48,19 @@ service RuntimeService {
rpc PodSandboxStatus(PodSandboxStatusRequest) returns (PodSandboxStatusResponse) {}
// ListPodSandbox returns a list of PodSandboxes.
rpc ListPodSandbox(ListPodSandboxRequest) returns (ListPodSandboxResponse) {}
// StreamPodSandboxes returns a stream of PodSandboxes.
// This is an alternative to ListPodSandbox that streams results in lists
// of at least one item, avoiding the gRPC message size limit for nodes with
// many pods. The number of items per list may vary depending on the
// container runtime. Each item must appear in exactly one response and must
// not be duplicated across responses in the same stream. The server must
// close the stream with EOF after all items have been sent. The kubelet
// collects all items from the stream and processes them all at once after
// the stream completes. The kubelet enforces a timeout on the entire stream
// and will discard partial results if the stream is not completed in time.
// Feature gate: CRIListStreaming
// See https://kep.k8s.io/5825 for more details.
rpc StreamPodSandboxes(StreamPodSandboxesRequest) returns (stream StreamPodSandboxesResponse) {}
// CreateContainer creates a new container in specified PodSandbox
rpc CreateContainer(CreateContainerRequest) returns (CreateContainerResponse) {}
@ -66,6 +79,19 @@ service RuntimeService {
rpc RemoveContainer(RemoveContainerRequest) returns (RemoveContainerResponse) {}
// ListContainers lists all containers by filters.
rpc ListContainers(ListContainersRequest) returns (ListContainersResponse) {}
// StreamContainers returns a stream of containers.
// This is an alternative to ListContainers that streams results in lists
// of at least one item, avoiding the gRPC message size limit for nodes with
// many containers. The number of items per list may vary depending on the
// container runtime. Each item must appear in exactly one response and must
// not be duplicated across responses in the same stream. The server must
// close the stream with EOF after all items have been sent. The kubelet
// collects all items from the stream and processes them all at once after
// the stream completes. The kubelet enforces a timeout on the entire stream
// and will discard partial results if the stream is not completed in time.
// Feature gate: CRIListStreaming
// See https://kep.k8s.io/5825 for more details.
rpc StreamContainers(StreamContainersRequest) returns (stream StreamContainersResponse) {}
// ContainerStatus returns status of the container. If the container is not
// present, returns an error.
rpc ContainerStatus(ContainerStatusRequest) returns (ContainerStatusResponse) {}
@ -93,12 +119,40 @@ service RuntimeService {
rpc ContainerStats(ContainerStatsRequest) returns (ContainerStatsResponse) {}
// ListContainerStats returns stats of all running containers.
rpc ListContainerStats(ListContainerStatsRequest) returns (ListContainerStatsResponse) {}
// StreamContainerStats returns a stream of container stats.
// This is an alternative to ListContainerStats that streams results in
// lists of at least one item, avoiding the gRPC message size limit for
// nodes with many containers. The number of items per list may vary
// depending on the container runtime. Each item must appear in exactly one
// response and must not be duplicated across responses in the same stream.
// The server must close the stream with EOF after all items have been sent.
// The kubelet collects all items from the stream and processes them all at
// once after the stream completes. The kubelet enforces a timeout on the
// entire stream and will discard partial results if the stream is not
// completed in time.
// Feature gate: CRIListStreaming
// See https://kep.k8s.io/5825 for more details.
rpc StreamContainerStats(StreamContainerStatsRequest) returns (stream StreamContainerStatsResponse) {}
// PodSandboxStats returns stats of the pod sandbox. If the pod sandbox does not
// exist, the call returns an error.
rpc PodSandboxStats(PodSandboxStatsRequest) returns (PodSandboxStatsResponse) {}
// ListPodSandboxStats returns stats of the pod sandboxes matching a filter.
rpc ListPodSandboxStats(ListPodSandboxStatsRequest) returns (ListPodSandboxStatsResponse) {}
// StreamPodSandboxStats returns a stream of pod sandbox stats.
// This is an alternative to ListPodSandboxStats that streams results in
// lists of at least one item, avoiding the gRPC message size limit for
// nodes with many pods. The number of items per list may vary depending on
// the container runtime. Each item must appear in exactly one response and
// must not be duplicated across responses in the same stream. The server
// must close the stream with EOF after all items have been sent. The
// kubelet collects all items from the stream and processes them all at once
// after the stream completes. The kubelet enforces a timeout on the entire
// stream and will discard partial results if the stream is not completed
// in time.
// Feature gate: CRIListStreaming
// See https://kep.k8s.io/5825 for more details.
rpc StreamPodSandboxStats(StreamPodSandboxStatsRequest) returns (stream StreamPodSandboxStatsResponse) {}
// UpdateRuntimeConfig updates the runtime configuration based on the given request.
rpc UpdateRuntimeConfig(UpdateRuntimeConfigRequest) returns (UpdateRuntimeConfigResponse) {}
@ -121,6 +175,20 @@ service RuntimeService {
// ListPodSandboxMetrics gets pod sandbox metrics from CRI Runtime
rpc ListPodSandboxMetrics(ListPodSandboxMetricsRequest) returns (ListPodSandboxMetricsResponse) {}
// StreamPodSandboxMetrics returns a stream of pod sandbox metrics.
// This is an alternative to ListPodSandboxMetrics that streams results in
// lists of at least one item, avoiding the gRPC message size limit for
// nodes with many pods. The number of items per list may vary depending on
// the container runtime. Each item must appear in exactly one response and
// must not be duplicated across responses in the same stream. The server
// must close the stream with EOF after all items have been sent. The
// kubelet collects all items from the stream and processes them all at once
// after the stream completes. The kubelet enforces a timeout on the entire
// stream and will discard partial results if the stream is not completed
// in time.
// Feature gate: CRIListStreaming
// See https://kep.k8s.io/5825 for more details.
rpc StreamPodSandboxMetrics(StreamPodSandboxMetricsRequest) returns (stream StreamPodSandboxMetricsResponse) {}
// RuntimeConfig returns configuration information of the runtime.
// A couple of notes:
@ -143,6 +211,19 @@ service RuntimeService {
service ImageService {
// ListImages lists existing images.
rpc ListImages(ListImagesRequest) returns (ListImagesResponse) {}
// StreamImages returns a stream of images.
// This is an alternative to ListImages that streams results in lists of at
// least one item, avoiding the gRPC message size limit for nodes with many
// images. The number of items per list may vary depending on the container
// runtime. Each item must appear in exactly one response and must not be
// duplicated across responses in the same stream. The server must close the
// stream with EOF after all items have been sent. The kubelet collects all
// items from the stream and processes them all at once after the stream
// completes. The kubelet enforces a timeout on the entire stream and will
// discard partial results if the stream is not completed in time.
// Feature gate: CRIListStreaming
// See https://kep.k8s.io/5825 for more details.
rpc StreamImages(StreamImagesRequest) returns (stream StreamImagesResponse) {}
// ImageStatus returns the status of the image. If the image is not
// present, returns a response with ImageStatusResponse.Image set to
// nil.
@ -678,6 +759,16 @@ message ListPodSandboxResponse {
repeated PodSandbox items = 1;
}
message StreamPodSandboxesRequest {
// Filter for the list request.
PodSandboxFilter filter = 1;
}
message StreamPodSandboxesResponse {
// List of PodSandboxes.
repeated PodSandbox pod_sandboxes = 1;
}
message PodSandboxStatsRequest {
// ID of the pod sandbox for which to retrieve stats.
string pod_sandbox_id = 1;
@ -708,6 +799,16 @@ message ListPodSandboxStatsResponse {
repeated PodSandboxStats stats = 1;
}
message StreamPodSandboxStatsRequest {
// Filter for the list request.
PodSandboxStatsFilter filter = 1;
}
message StreamPodSandboxStatsResponse {
// List of pod sandbox stats.
repeated PodSandboxStats pod_sandbox_stats = 1;
}
// PodSandboxAttributes provides basic information of the pod sandbox.
message PodSandboxAttributes {
// ID of the pod sandbox.
@ -1374,6 +1475,16 @@ message ListContainersResponse {
repeated Container containers = 1;
}
message StreamContainersRequest {
// Filter for the list request.
ContainerFilter filter = 1;
}
message StreamContainersResponse {
// List of containers.
repeated Container containers = 1;
}
message ContainerStatusRequest {
// ID of the container for which to retrieve status.
string container_id = 1;
@ -1610,6 +1721,16 @@ message ListImagesResponse {
repeated Image images = 1;
}
message StreamImagesRequest {
// Filter to list images.
ImageFilter filter = 1;
}
message StreamImagesResponse {
// List of images.
repeated Image images = 1;
}
message ImageStatusRequest {
// Spec of the image.
ImageSpec image = 1;
@ -1853,6 +1974,16 @@ message ListContainerStatsResponse {
repeated ContainerStats stats = 1;
}
message StreamContainerStatsRequest {
// Filter for the list request.
ContainerStatsFilter filter = 1;
}
message StreamContainerStatsResponse {
// List of container stats.
repeated ContainerStats container_stats = 1;
}
// ContainerAttributes provides basic information of the container.
message ContainerAttributes {
// ID of the container.
@ -2069,6 +2200,13 @@ message ListPodSandboxMetricsResponse {
repeated PodSandboxMetrics pod_metrics = 1;
}
message StreamPodSandboxMetricsRequest {}
message StreamPodSandboxMetricsResponse {
// List of pod sandbox metrics.
repeated PodSandboxMetrics pod_sandbox_metrics = 1;
}
message PodSandboxMetrics {
string pod_sandbox_id = 1;
repeated Metric metrics = 2;

View file

@ -58,11 +58,13 @@ const (
RuntimeService_RemovePodSandbox_FullMethodName = "/runtime.v1.RuntimeService/RemovePodSandbox"
RuntimeService_PodSandboxStatus_FullMethodName = "/runtime.v1.RuntimeService/PodSandboxStatus"
RuntimeService_ListPodSandbox_FullMethodName = "/runtime.v1.RuntimeService/ListPodSandbox"
RuntimeService_StreamPodSandboxes_FullMethodName = "/runtime.v1.RuntimeService/StreamPodSandboxes"
RuntimeService_CreateContainer_FullMethodName = "/runtime.v1.RuntimeService/CreateContainer"
RuntimeService_StartContainer_FullMethodName = "/runtime.v1.RuntimeService/StartContainer"
RuntimeService_StopContainer_FullMethodName = "/runtime.v1.RuntimeService/StopContainer"
RuntimeService_RemoveContainer_FullMethodName = "/runtime.v1.RuntimeService/RemoveContainer"
RuntimeService_ListContainers_FullMethodName = "/runtime.v1.RuntimeService/ListContainers"
RuntimeService_StreamContainers_FullMethodName = "/runtime.v1.RuntimeService/StreamContainers"
RuntimeService_ContainerStatus_FullMethodName = "/runtime.v1.RuntimeService/ContainerStatus"
RuntimeService_UpdateContainerResources_FullMethodName = "/runtime.v1.RuntimeService/UpdateContainerResources"
RuntimeService_ReopenContainerLog_FullMethodName = "/runtime.v1.RuntimeService/ReopenContainerLog"
@ -72,14 +74,17 @@ const (
RuntimeService_PortForward_FullMethodName = "/runtime.v1.RuntimeService/PortForward"
RuntimeService_ContainerStats_FullMethodName = "/runtime.v1.RuntimeService/ContainerStats"
RuntimeService_ListContainerStats_FullMethodName = "/runtime.v1.RuntimeService/ListContainerStats"
RuntimeService_StreamContainerStats_FullMethodName = "/runtime.v1.RuntimeService/StreamContainerStats"
RuntimeService_PodSandboxStats_FullMethodName = "/runtime.v1.RuntimeService/PodSandboxStats"
RuntimeService_ListPodSandboxStats_FullMethodName = "/runtime.v1.RuntimeService/ListPodSandboxStats"
RuntimeService_StreamPodSandboxStats_FullMethodName = "/runtime.v1.RuntimeService/StreamPodSandboxStats"
RuntimeService_UpdateRuntimeConfig_FullMethodName = "/runtime.v1.RuntimeService/UpdateRuntimeConfig"
RuntimeService_Status_FullMethodName = "/runtime.v1.RuntimeService/Status"
RuntimeService_CheckpointContainer_FullMethodName = "/runtime.v1.RuntimeService/CheckpointContainer"
RuntimeService_GetContainerEvents_FullMethodName = "/runtime.v1.RuntimeService/GetContainerEvents"
RuntimeService_ListMetricDescriptors_FullMethodName = "/runtime.v1.RuntimeService/ListMetricDescriptors"
RuntimeService_ListPodSandboxMetrics_FullMethodName = "/runtime.v1.RuntimeService/ListPodSandboxMetrics"
RuntimeService_StreamPodSandboxMetrics_FullMethodName = "/runtime.v1.RuntimeService/StreamPodSandboxMetrics"
RuntimeService_RuntimeConfig_FullMethodName = "/runtime.v1.RuntimeService/RuntimeConfig"
RuntimeService_UpdatePodSandboxResources_FullMethodName = "/runtime.v1.RuntimeService/UpdatePodSandboxResources"
)
@ -115,6 +120,19 @@ type RuntimeServiceClient interface {
PodSandboxStatus(ctx context.Context, in *PodSandboxStatusRequest, opts ...grpc.CallOption) (*PodSandboxStatusResponse, error)
// ListPodSandbox returns a list of PodSandboxes.
ListPodSandbox(ctx context.Context, in *ListPodSandboxRequest, opts ...grpc.CallOption) (*ListPodSandboxResponse, error)
// StreamPodSandboxes returns a stream of PodSandboxes.
// This is an alternative to ListPodSandbox that streams results in lists
// of at least one item, avoiding the gRPC message size limit for nodes with
// many pods. The number of items per list may vary depending on the
// container runtime. Each item must appear in exactly one response and must
// not be duplicated across responses in the same stream. The server must
// close the stream with EOF after all items have been sent. The kubelet
// collects all items from the stream and processes them all at once after
// the stream completes. The kubelet enforces a timeout on the entire stream
// and will discard partial results if the stream is not completed in time.
// Feature gate: CRIListStreaming
// See https://kep.k8s.io/5825 for more details.
StreamPodSandboxes(ctx context.Context, in *StreamPodSandboxesRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[StreamPodSandboxesResponse], error)
// CreateContainer creates a new container in specified PodSandbox
CreateContainer(ctx context.Context, in *CreateContainerRequest, opts ...grpc.CallOption) (*CreateContainerResponse, error)
// StartContainer starts the container.
@ -132,6 +150,19 @@ type RuntimeServiceClient interface {
RemoveContainer(ctx context.Context, in *RemoveContainerRequest, opts ...grpc.CallOption) (*RemoveContainerResponse, error)
// ListContainers lists all containers by filters.
ListContainers(ctx context.Context, in *ListContainersRequest, opts ...grpc.CallOption) (*ListContainersResponse, error)
// StreamContainers returns a stream of containers.
// This is an alternative to ListContainers that streams results in lists
// of at least one item, avoiding the gRPC message size limit for nodes with
// many containers. The number of items per list may vary depending on the
// container runtime. Each item must appear in exactly one response and must
// not be duplicated across responses in the same stream. The server must
// close the stream with EOF after all items have been sent. The kubelet
// collects all items from the stream and processes them all at once after
// the stream completes. The kubelet enforces a timeout on the entire stream
// and will discard partial results if the stream is not completed in time.
// Feature gate: CRIListStreaming
// See https://kep.k8s.io/5825 for more details.
StreamContainers(ctx context.Context, in *StreamContainersRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[StreamContainersResponse], error)
// ContainerStatus returns status of the container. If the container is not
// present, returns an error.
ContainerStatus(ctx context.Context, in *ContainerStatusRequest, opts ...grpc.CallOption) (*ContainerStatusResponse, error)
@ -157,11 +188,39 @@ type RuntimeServiceClient interface {
ContainerStats(ctx context.Context, in *ContainerStatsRequest, opts ...grpc.CallOption) (*ContainerStatsResponse, error)
// ListContainerStats returns stats of all running containers.
ListContainerStats(ctx context.Context, in *ListContainerStatsRequest, opts ...grpc.CallOption) (*ListContainerStatsResponse, error)
// StreamContainerStats returns a stream of container stats.
// This is an alternative to ListContainerStats that streams results in
// lists of at least one item, avoiding the gRPC message size limit for
// nodes with many containers. The number of items per list may vary
// depending on the container runtime. Each item must appear in exactly one
// response and must not be duplicated across responses in the same stream.
// The server must close the stream with EOF after all items have been sent.
// The kubelet collects all items from the stream and processes them all at
// once after the stream completes. The kubelet enforces a timeout on the
// entire stream and will discard partial results if the stream is not
// completed in time.
// Feature gate: CRIListStreaming
// See https://kep.k8s.io/5825 for more details.
StreamContainerStats(ctx context.Context, in *StreamContainerStatsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[StreamContainerStatsResponse], error)
// PodSandboxStats returns stats of the pod sandbox. If the pod sandbox does not
// exist, the call returns an error.
PodSandboxStats(ctx context.Context, in *PodSandboxStatsRequest, opts ...grpc.CallOption) (*PodSandboxStatsResponse, error)
// ListPodSandboxStats returns stats of the pod sandboxes matching a filter.
ListPodSandboxStats(ctx context.Context, in *ListPodSandboxStatsRequest, opts ...grpc.CallOption) (*ListPodSandboxStatsResponse, error)
// StreamPodSandboxStats returns a stream of pod sandbox stats.
// This is an alternative to ListPodSandboxStats that streams results in
// lists of at least one item, avoiding the gRPC message size limit for
// nodes with many pods. The number of items per list may vary depending on
// the container runtime. Each item must appear in exactly one response and
// must not be duplicated across responses in the same stream. The server
// must close the stream with EOF after all items have been sent. The
// kubelet collects all items from the stream and processes them all at once
// after the stream completes. The kubelet enforces a timeout on the entire
// stream and will discard partial results if the stream is not completed
// in time.
// Feature gate: CRIListStreaming
// See https://kep.k8s.io/5825 for more details.
StreamPodSandboxStats(ctx context.Context, in *StreamPodSandboxStatsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[StreamPodSandboxStatsResponse], error)
// UpdateRuntimeConfig updates the runtime configuration based on the given request.
UpdateRuntimeConfig(ctx context.Context, in *UpdateRuntimeConfigRequest, opts ...grpc.CallOption) (*UpdateRuntimeConfigResponse, error)
// Status returns the status of the runtime.
@ -178,6 +237,20 @@ type RuntimeServiceClient interface {
ListMetricDescriptors(ctx context.Context, in *ListMetricDescriptorsRequest, opts ...grpc.CallOption) (*ListMetricDescriptorsResponse, error)
// ListPodSandboxMetrics gets pod sandbox metrics from CRI Runtime
ListPodSandboxMetrics(ctx context.Context, in *ListPodSandboxMetricsRequest, opts ...grpc.CallOption) (*ListPodSandboxMetricsResponse, error)
// StreamPodSandboxMetrics returns a stream of pod sandbox metrics.
// This is an alternative to ListPodSandboxMetrics that streams results in
// lists of at least one item, avoiding the gRPC message size limit for
// nodes with many pods. The number of items per list may vary depending on
// the container runtime. Each item must appear in exactly one response and
// must not be duplicated across responses in the same stream. The server
// must close the stream with EOF after all items have been sent. The
// kubelet collects all items from the stream and processes them all at once
// after the stream completes. The kubelet enforces a timeout on the entire
// stream and will discard partial results if the stream is not completed
// in time.
// Feature gate: CRIListStreaming
// See https://kep.k8s.io/5825 for more details.
StreamPodSandboxMetrics(ctx context.Context, in *StreamPodSandboxMetricsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[StreamPodSandboxMetricsResponse], error)
// RuntimeConfig returns configuration information of the runtime.
// A couple of notes:
// - The RuntimeConfigRequest object is not to be confused with the contents of UpdateRuntimeConfigRequest.
@ -262,6 +335,25 @@ func (c *runtimeServiceClient) ListPodSandbox(ctx context.Context, in *ListPodSa
return out, nil
}
func (c *runtimeServiceClient) StreamPodSandboxes(ctx context.Context, in *StreamPodSandboxesRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[StreamPodSandboxesResponse], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &RuntimeService_ServiceDesc.Streams[0], RuntimeService_StreamPodSandboxes_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &grpc.GenericClientStream[StreamPodSandboxesRequest, StreamPodSandboxesResponse]{ClientStream: stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type RuntimeService_StreamPodSandboxesClient = grpc.ServerStreamingClient[StreamPodSandboxesResponse]
func (c *runtimeServiceClient) CreateContainer(ctx context.Context, in *CreateContainerRequest, opts ...grpc.CallOption) (*CreateContainerResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(CreateContainerResponse)
@ -312,6 +404,25 @@ func (c *runtimeServiceClient) ListContainers(ctx context.Context, in *ListConta
return out, nil
}
func (c *runtimeServiceClient) StreamContainers(ctx context.Context, in *StreamContainersRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[StreamContainersResponse], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &RuntimeService_ServiceDesc.Streams[1], RuntimeService_StreamContainers_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &grpc.GenericClientStream[StreamContainersRequest, StreamContainersResponse]{ClientStream: stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type RuntimeService_StreamContainersClient = grpc.ServerStreamingClient[StreamContainersResponse]
func (c *runtimeServiceClient) ContainerStatus(ctx context.Context, in *ContainerStatusRequest, opts ...grpc.CallOption) (*ContainerStatusResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ContainerStatusResponse)
@ -402,6 +513,25 @@ func (c *runtimeServiceClient) ListContainerStats(ctx context.Context, in *ListC
return out, nil
}
func (c *runtimeServiceClient) StreamContainerStats(ctx context.Context, in *StreamContainerStatsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[StreamContainerStatsResponse], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &RuntimeService_ServiceDesc.Streams[2], RuntimeService_StreamContainerStats_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &grpc.GenericClientStream[StreamContainerStatsRequest, StreamContainerStatsResponse]{ClientStream: stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type RuntimeService_StreamContainerStatsClient = grpc.ServerStreamingClient[StreamContainerStatsResponse]
func (c *runtimeServiceClient) PodSandboxStats(ctx context.Context, in *PodSandboxStatsRequest, opts ...grpc.CallOption) (*PodSandboxStatsResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(PodSandboxStatsResponse)
@ -422,6 +552,25 @@ func (c *runtimeServiceClient) ListPodSandboxStats(ctx context.Context, in *List
return out, nil
}
func (c *runtimeServiceClient) StreamPodSandboxStats(ctx context.Context, in *StreamPodSandboxStatsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[StreamPodSandboxStatsResponse], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &RuntimeService_ServiceDesc.Streams[3], RuntimeService_StreamPodSandboxStats_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &grpc.GenericClientStream[StreamPodSandboxStatsRequest, StreamPodSandboxStatsResponse]{ClientStream: stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type RuntimeService_StreamPodSandboxStatsClient = grpc.ServerStreamingClient[StreamPodSandboxStatsResponse]
func (c *runtimeServiceClient) UpdateRuntimeConfig(ctx context.Context, in *UpdateRuntimeConfigRequest, opts ...grpc.CallOption) (*UpdateRuntimeConfigResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(UpdateRuntimeConfigResponse)
@ -454,7 +603,7 @@ func (c *runtimeServiceClient) CheckpointContainer(ctx context.Context, in *Chec
func (c *runtimeServiceClient) GetContainerEvents(ctx context.Context, in *GetEventsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[ContainerEventResponse], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &RuntimeService_ServiceDesc.Streams[0], RuntimeService_GetContainerEvents_FullMethodName, cOpts...)
stream, err := c.cc.NewStream(ctx, &RuntimeService_ServiceDesc.Streams[4], RuntimeService_GetContainerEvents_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
@ -491,6 +640,25 @@ func (c *runtimeServiceClient) ListPodSandboxMetrics(ctx context.Context, in *Li
return out, nil
}
func (c *runtimeServiceClient) StreamPodSandboxMetrics(ctx context.Context, in *StreamPodSandboxMetricsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[StreamPodSandboxMetricsResponse], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &RuntimeService_ServiceDesc.Streams[5], RuntimeService_StreamPodSandboxMetrics_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &grpc.GenericClientStream[StreamPodSandboxMetricsRequest, StreamPodSandboxMetricsResponse]{ClientStream: stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type RuntimeService_StreamPodSandboxMetricsClient = grpc.ServerStreamingClient[StreamPodSandboxMetricsResponse]
func (c *runtimeServiceClient) RuntimeConfig(ctx context.Context, in *RuntimeConfigRequest, opts ...grpc.CallOption) (*RuntimeConfigResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(RuntimeConfigResponse)
@ -542,6 +710,19 @@ type RuntimeServiceServer interface {
PodSandboxStatus(context.Context, *PodSandboxStatusRequest) (*PodSandboxStatusResponse, error)
// ListPodSandbox returns a list of PodSandboxes.
ListPodSandbox(context.Context, *ListPodSandboxRequest) (*ListPodSandboxResponse, error)
// StreamPodSandboxes returns a stream of PodSandboxes.
// This is an alternative to ListPodSandbox that streams results in lists
// of at least one item, avoiding the gRPC message size limit for nodes with
// many pods. The number of items per list may vary depending on the
// container runtime. Each item must appear in exactly one response and must
// not be duplicated across responses in the same stream. The server must
// close the stream with EOF after all items have been sent. The kubelet
// collects all items from the stream and processes them all at once after
// the stream completes. The kubelet enforces a timeout on the entire stream
// and will discard partial results if the stream is not completed in time.
// Feature gate: CRIListStreaming
// See https://kep.k8s.io/5825 for more details.
StreamPodSandboxes(*StreamPodSandboxesRequest, grpc.ServerStreamingServer[StreamPodSandboxesResponse]) error
// CreateContainer creates a new container in specified PodSandbox
CreateContainer(context.Context, *CreateContainerRequest) (*CreateContainerResponse, error)
// StartContainer starts the container.
@ -559,6 +740,19 @@ type RuntimeServiceServer interface {
RemoveContainer(context.Context, *RemoveContainerRequest) (*RemoveContainerResponse, error)
// ListContainers lists all containers by filters.
ListContainers(context.Context, *ListContainersRequest) (*ListContainersResponse, error)
// StreamContainers returns a stream of containers.
// This is an alternative to ListContainers that streams results in lists
// of at least one item, avoiding the gRPC message size limit for nodes with
// many containers. The number of items per list may vary depending on the
// container runtime. Each item must appear in exactly one response and must
// not be duplicated across responses in the same stream. The server must
// close the stream with EOF after all items have been sent. The kubelet
// collects all items from the stream and processes them all at once after
// the stream completes. The kubelet enforces a timeout on the entire stream
// and will discard partial results if the stream is not completed in time.
// Feature gate: CRIListStreaming
// See https://kep.k8s.io/5825 for more details.
StreamContainers(*StreamContainersRequest, grpc.ServerStreamingServer[StreamContainersResponse]) error
// ContainerStatus returns status of the container. If the container is not
// present, returns an error.
ContainerStatus(context.Context, *ContainerStatusRequest) (*ContainerStatusResponse, error)
@ -584,11 +778,39 @@ type RuntimeServiceServer interface {
ContainerStats(context.Context, *ContainerStatsRequest) (*ContainerStatsResponse, error)
// ListContainerStats returns stats of all running containers.
ListContainerStats(context.Context, *ListContainerStatsRequest) (*ListContainerStatsResponse, error)
// StreamContainerStats returns a stream of container stats.
// This is an alternative to ListContainerStats that streams results in
// lists of at least one item, avoiding the gRPC message size limit for
// nodes with many containers. The number of items per list may vary
// depending on the container runtime. Each item must appear in exactly one
// response and must not be duplicated across responses in the same stream.
// The server must close the stream with EOF after all items have been sent.
// The kubelet collects all items from the stream and processes them all at
// once after the stream completes. The kubelet enforces a timeout on the
// entire stream and will discard partial results if the stream is not
// completed in time.
// Feature gate: CRIListStreaming
// See https://kep.k8s.io/5825 for more details.
StreamContainerStats(*StreamContainerStatsRequest, grpc.ServerStreamingServer[StreamContainerStatsResponse]) error
// PodSandboxStats returns stats of the pod sandbox. If the pod sandbox does not
// exist, the call returns an error.
PodSandboxStats(context.Context, *PodSandboxStatsRequest) (*PodSandboxStatsResponse, error)
// ListPodSandboxStats returns stats of the pod sandboxes matching a filter.
ListPodSandboxStats(context.Context, *ListPodSandboxStatsRequest) (*ListPodSandboxStatsResponse, error)
// StreamPodSandboxStats returns a stream of pod sandbox stats.
// This is an alternative to ListPodSandboxStats that streams results in
// lists of at least one item, avoiding the gRPC message size limit for
// nodes with many pods. The number of items per list may vary depending on
// the container runtime. Each item must appear in exactly one response and
// must not be duplicated across responses in the same stream. The server
// must close the stream with EOF after all items have been sent. The
// kubelet collects all items from the stream and processes them all at once
// after the stream completes. The kubelet enforces a timeout on the entire
// stream and will discard partial results if the stream is not completed
// in time.
// Feature gate: CRIListStreaming
// See https://kep.k8s.io/5825 for more details.
StreamPodSandboxStats(*StreamPodSandboxStatsRequest, grpc.ServerStreamingServer[StreamPodSandboxStatsResponse]) error
// UpdateRuntimeConfig updates the runtime configuration based on the given request.
UpdateRuntimeConfig(context.Context, *UpdateRuntimeConfigRequest) (*UpdateRuntimeConfigResponse, error)
// Status returns the status of the runtime.
@ -605,6 +827,20 @@ type RuntimeServiceServer interface {
ListMetricDescriptors(context.Context, *ListMetricDescriptorsRequest) (*ListMetricDescriptorsResponse, error)
// ListPodSandboxMetrics gets pod sandbox metrics from CRI Runtime
ListPodSandboxMetrics(context.Context, *ListPodSandboxMetricsRequest) (*ListPodSandboxMetricsResponse, error)
// StreamPodSandboxMetrics returns a stream of pod sandbox metrics.
// This is an alternative to ListPodSandboxMetrics that streams results in
// lists of at least one item, avoiding the gRPC message size limit for
// nodes with many pods. The number of items per list may vary depending on
// the container runtime. Each item must appear in exactly one response and
// must not be duplicated across responses in the same stream. The server
// must close the stream with EOF after all items have been sent. The
// kubelet collects all items from the stream and processes them all at once
// after the stream completes. The kubelet enforces a timeout on the entire
// stream and will discard partial results if the stream is not completed
// in time.
// Feature gate: CRIListStreaming
// See https://kep.k8s.io/5825 for more details.
StreamPodSandboxMetrics(*StreamPodSandboxMetricsRequest, grpc.ServerStreamingServer[StreamPodSandboxMetricsResponse]) error
// RuntimeConfig returns configuration information of the runtime.
// A couple of notes:
// - The RuntimeConfigRequest object is not to be confused with the contents of UpdateRuntimeConfigRequest.
@ -647,6 +883,9 @@ func (UnimplementedRuntimeServiceServer) PodSandboxStatus(context.Context, *PodS
func (UnimplementedRuntimeServiceServer) ListPodSandbox(context.Context, *ListPodSandboxRequest) (*ListPodSandboxResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ListPodSandbox not implemented")
}
func (UnimplementedRuntimeServiceServer) StreamPodSandboxes(*StreamPodSandboxesRequest, grpc.ServerStreamingServer[StreamPodSandboxesResponse]) error {
return status.Errorf(codes.Unimplemented, "method StreamPodSandboxes not implemented")
}
func (UnimplementedRuntimeServiceServer) CreateContainer(context.Context, *CreateContainerRequest) (*CreateContainerResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method CreateContainer not implemented")
}
@ -662,6 +901,9 @@ func (UnimplementedRuntimeServiceServer) RemoveContainer(context.Context, *Remov
func (UnimplementedRuntimeServiceServer) ListContainers(context.Context, *ListContainersRequest) (*ListContainersResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ListContainers not implemented")
}
func (UnimplementedRuntimeServiceServer) StreamContainers(*StreamContainersRequest, grpc.ServerStreamingServer[StreamContainersResponse]) error {
return status.Errorf(codes.Unimplemented, "method StreamContainers not implemented")
}
func (UnimplementedRuntimeServiceServer) ContainerStatus(context.Context, *ContainerStatusRequest) (*ContainerStatusResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ContainerStatus not implemented")
}
@ -689,12 +931,18 @@ func (UnimplementedRuntimeServiceServer) ContainerStats(context.Context, *Contai
func (UnimplementedRuntimeServiceServer) ListContainerStats(context.Context, *ListContainerStatsRequest) (*ListContainerStatsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ListContainerStats not implemented")
}
func (UnimplementedRuntimeServiceServer) StreamContainerStats(*StreamContainerStatsRequest, grpc.ServerStreamingServer[StreamContainerStatsResponse]) error {
return status.Errorf(codes.Unimplemented, "method StreamContainerStats not implemented")
}
func (UnimplementedRuntimeServiceServer) PodSandboxStats(context.Context, *PodSandboxStatsRequest) (*PodSandboxStatsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method PodSandboxStats not implemented")
}
func (UnimplementedRuntimeServiceServer) ListPodSandboxStats(context.Context, *ListPodSandboxStatsRequest) (*ListPodSandboxStatsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ListPodSandboxStats not implemented")
}
func (UnimplementedRuntimeServiceServer) StreamPodSandboxStats(*StreamPodSandboxStatsRequest, grpc.ServerStreamingServer[StreamPodSandboxStatsResponse]) error {
return status.Errorf(codes.Unimplemented, "method StreamPodSandboxStats not implemented")
}
func (UnimplementedRuntimeServiceServer) UpdateRuntimeConfig(context.Context, *UpdateRuntimeConfigRequest) (*UpdateRuntimeConfigResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method UpdateRuntimeConfig not implemented")
}
@ -713,6 +961,9 @@ func (UnimplementedRuntimeServiceServer) ListMetricDescriptors(context.Context,
func (UnimplementedRuntimeServiceServer) ListPodSandboxMetrics(context.Context, *ListPodSandboxMetricsRequest) (*ListPodSandboxMetricsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ListPodSandboxMetrics not implemented")
}
func (UnimplementedRuntimeServiceServer) StreamPodSandboxMetrics(*StreamPodSandboxMetricsRequest, grpc.ServerStreamingServer[StreamPodSandboxMetricsResponse]) error {
return status.Errorf(codes.Unimplemented, "method StreamPodSandboxMetrics not implemented")
}
func (UnimplementedRuntimeServiceServer) RuntimeConfig(context.Context, *RuntimeConfigRequest) (*RuntimeConfigResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method RuntimeConfig not implemented")
}
@ -848,6 +1099,17 @@ func _RuntimeService_ListPodSandbox_Handler(srv interface{}, ctx context.Context
return interceptor(ctx, in, info, handler)
}
func _RuntimeService_StreamPodSandboxes_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(StreamPodSandboxesRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(RuntimeServiceServer).StreamPodSandboxes(m, &grpc.GenericServerStream[StreamPodSandboxesRequest, StreamPodSandboxesResponse]{ServerStream: stream})
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type RuntimeService_StreamPodSandboxesServer = grpc.ServerStreamingServer[StreamPodSandboxesResponse]
func _RuntimeService_CreateContainer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(CreateContainerRequest)
if err := dec(in); err != nil {
@ -938,6 +1200,17 @@ func _RuntimeService_ListContainers_Handler(srv interface{}, ctx context.Context
return interceptor(ctx, in, info, handler)
}
func _RuntimeService_StreamContainers_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(StreamContainersRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(RuntimeServiceServer).StreamContainers(m, &grpc.GenericServerStream[StreamContainersRequest, StreamContainersResponse]{ServerStream: stream})
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type RuntimeService_StreamContainersServer = grpc.ServerStreamingServer[StreamContainersResponse]
func _RuntimeService_ContainerStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ContainerStatusRequest)
if err := dec(in); err != nil {
@ -1100,6 +1373,17 @@ func _RuntimeService_ListContainerStats_Handler(srv interface{}, ctx context.Con
return interceptor(ctx, in, info, handler)
}
func _RuntimeService_StreamContainerStats_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(StreamContainerStatsRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(RuntimeServiceServer).StreamContainerStats(m, &grpc.GenericServerStream[StreamContainerStatsRequest, StreamContainerStatsResponse]{ServerStream: stream})
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type RuntimeService_StreamContainerStatsServer = grpc.ServerStreamingServer[StreamContainerStatsResponse]
func _RuntimeService_PodSandboxStats_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PodSandboxStatsRequest)
if err := dec(in); err != nil {
@ -1136,6 +1420,17 @@ func _RuntimeService_ListPodSandboxStats_Handler(srv interface{}, ctx context.Co
return interceptor(ctx, in, info, handler)
}
func _RuntimeService_StreamPodSandboxStats_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(StreamPodSandboxStatsRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(RuntimeServiceServer).StreamPodSandboxStats(m, &grpc.GenericServerStream[StreamPodSandboxStatsRequest, StreamPodSandboxStatsResponse]{ServerStream: stream})
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type RuntimeService_StreamPodSandboxStatsServer = grpc.ServerStreamingServer[StreamPodSandboxStatsResponse]
func _RuntimeService_UpdateRuntimeConfig_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(UpdateRuntimeConfigRequest)
if err := dec(in); err != nil {
@ -1237,6 +1532,17 @@ func _RuntimeService_ListPodSandboxMetrics_Handler(srv interface{}, ctx context.
return interceptor(ctx, in, info, handler)
}
func _RuntimeService_StreamPodSandboxMetrics_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(StreamPodSandboxMetricsRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(RuntimeServiceServer).StreamPodSandboxMetrics(m, &grpc.GenericServerStream[StreamPodSandboxMetricsRequest, StreamPodSandboxMetricsResponse]{ServerStream: stream})
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type RuntimeService_StreamPodSandboxMetricsServer = grpc.ServerStreamingServer[StreamPodSandboxMetricsResponse]
func _RuntimeService_RuntimeConfig_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RuntimeConfigRequest)
if err := dec(in); err != nil {
@ -1398,21 +1704,47 @@ var RuntimeService_ServiceDesc = grpc.ServiceDesc{
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "StreamPodSandboxes",
Handler: _RuntimeService_StreamPodSandboxes_Handler,
ServerStreams: true,
},
{
StreamName: "StreamContainers",
Handler: _RuntimeService_StreamContainers_Handler,
ServerStreams: true,
},
{
StreamName: "StreamContainerStats",
Handler: _RuntimeService_StreamContainerStats_Handler,
ServerStreams: true,
},
{
StreamName: "StreamPodSandboxStats",
Handler: _RuntimeService_StreamPodSandboxStats_Handler,
ServerStreams: true,
},
{
StreamName: "GetContainerEvents",
Handler: _RuntimeService_GetContainerEvents_Handler,
ServerStreams: true,
},
{
StreamName: "StreamPodSandboxMetrics",
Handler: _RuntimeService_StreamPodSandboxMetrics_Handler,
ServerStreams: true,
},
},
Metadata: "staging/src/k8s.io/cri-api/pkg/apis/runtime/v1/api.proto",
}
const (
ImageService_ListImages_FullMethodName = "/runtime.v1.ImageService/ListImages"
ImageService_ImageStatus_FullMethodName = "/runtime.v1.ImageService/ImageStatus"
ImageService_PullImage_FullMethodName = "/runtime.v1.ImageService/PullImage"
ImageService_RemoveImage_FullMethodName = "/runtime.v1.ImageService/RemoveImage"
ImageService_ImageFsInfo_FullMethodName = "/runtime.v1.ImageService/ImageFsInfo"
ImageService_ListImages_FullMethodName = "/runtime.v1.ImageService/ListImages"
ImageService_StreamImages_FullMethodName = "/runtime.v1.ImageService/StreamImages"
ImageService_ImageStatus_FullMethodName = "/runtime.v1.ImageService/ImageStatus"
ImageService_PullImage_FullMethodName = "/runtime.v1.ImageService/PullImage"
ImageService_RemoveImage_FullMethodName = "/runtime.v1.ImageService/RemoveImage"
ImageService_ImageFsInfo_FullMethodName = "/runtime.v1.ImageService/ImageFsInfo"
)
// ImageServiceClient is the client API for ImageService service.
@ -1423,6 +1755,19 @@ const (
type ImageServiceClient interface {
// ListImages lists existing images.
ListImages(ctx context.Context, in *ListImagesRequest, opts ...grpc.CallOption) (*ListImagesResponse, error)
// StreamImages returns a stream of images.
// This is an alternative to ListImages that streams results in lists of at
// least one item, avoiding the gRPC message size limit for nodes with many
// images. The number of items per list may vary depending on the container
// runtime. Each item must appear in exactly one response and must not be
// duplicated across responses in the same stream. The server must close the
// stream with EOF after all items have been sent. The kubelet collects all
// items from the stream and processes them all at once after the stream
// completes. The kubelet enforces a timeout on the entire stream and will
// discard partial results if the stream is not completed in time.
// Feature gate: CRIListStreaming
// See https://kep.k8s.io/5825 for more details.
StreamImages(ctx context.Context, in *StreamImagesRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[StreamImagesResponse], error)
// ImageStatus returns the status of the image. If the image is not
// present, returns a response with ImageStatusResponse.Image set to
// nil.
@ -1463,6 +1808,25 @@ func (c *imageServiceClient) ListImages(ctx context.Context, in *ListImagesReque
return out, nil
}
func (c *imageServiceClient) StreamImages(ctx context.Context, in *StreamImagesRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[StreamImagesResponse], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &ImageService_ServiceDesc.Streams[0], ImageService_StreamImages_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &grpc.GenericClientStream[StreamImagesRequest, StreamImagesResponse]{ClientStream: stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type ImageService_StreamImagesClient = grpc.ServerStreamingClient[StreamImagesResponse]
func (c *imageServiceClient) ImageStatus(ctx context.Context, in *ImageStatusRequest, opts ...grpc.CallOption) (*ImageStatusResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ImageStatusResponse)
@ -1511,6 +1875,19 @@ func (c *imageServiceClient) ImageFsInfo(ctx context.Context, in *ImageFsInfoReq
type ImageServiceServer interface {
// ListImages lists existing images.
ListImages(context.Context, *ListImagesRequest) (*ListImagesResponse, error)
// StreamImages returns a stream of images.
// This is an alternative to ListImages that streams results in lists of at
// least one item, avoiding the gRPC message size limit for nodes with many
// images. The number of items per list may vary depending on the container
// runtime. Each item must appear in exactly one response and must not be
// duplicated across responses in the same stream. The server must close the
// stream with EOF after all items have been sent. The kubelet collects all
// items from the stream and processes them all at once after the stream
// completes. The kubelet enforces a timeout on the entire stream and will
// discard partial results if the stream is not completed in time.
// Feature gate: CRIListStreaming
// See https://kep.k8s.io/5825 for more details.
StreamImages(*StreamImagesRequest, grpc.ServerStreamingServer[StreamImagesResponse]) error
// ImageStatus returns the status of the image. If the image is not
// present, returns a response with ImageStatusResponse.Image set to
// nil.
@ -1544,6 +1921,9 @@ type UnimplementedImageServiceServer struct{}
func (UnimplementedImageServiceServer) ListImages(context.Context, *ListImagesRequest) (*ListImagesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ListImages not implemented")
}
func (UnimplementedImageServiceServer) StreamImages(*StreamImagesRequest, grpc.ServerStreamingServer[StreamImagesResponse]) error {
return status.Errorf(codes.Unimplemented, "method StreamImages not implemented")
}
func (UnimplementedImageServiceServer) ImageStatus(context.Context, *ImageStatusRequest) (*ImageStatusResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ImageStatus not implemented")
}
@ -1595,6 +1975,17 @@ func _ImageService_ListImages_Handler(srv interface{}, ctx context.Context, dec
return interceptor(ctx, in, info, handler)
}
func _ImageService_StreamImages_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(StreamImagesRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(ImageServiceServer).StreamImages(m, &grpc.GenericServerStream[StreamImagesRequest, StreamImagesResponse]{ServerStream: stream})
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type ImageService_StreamImagesServer = grpc.ServerStreamingServer[StreamImagesResponse]
func _ImageService_ImageStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ImageStatusRequest)
if err := dec(in); err != nil {
@ -1695,6 +2086,12 @@ var ImageService_ServiceDesc = grpc.ServiceDesc{
Handler: _ImageService_ImageFsInfo_Handler,
},
},
Streams: []grpc.StreamDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "StreamImages",
Handler: _ImageService_StreamImages_Handler,
ServerStreams: true,
},
},
Metadata: "staging/src/k8s.io/cri-api/pkg/apis/runtime/v1/api.proto",
}

View file

@ -374,6 +374,60 @@ func (f *RemoteRuntime) UpdatePodSandboxResources(ctx context.Context, req *kube
return f.RuntimeService.UpdatePodSandboxResources(ctx, req)
}
// StreamPodSandboxes returns a stream of PodSandboxes.
func (f *RemoteRuntime) StreamPodSandboxes(req *kubeapi.StreamPodSandboxesRequest, stream kubeapi.RuntimeService_StreamPodSandboxesServer) error {
items, err := f.RuntimeService.ListPodSandbox(stream.Context(), req.Filter)
if err != nil {
return err
}
return stream.Send(&kubeapi.StreamPodSandboxesResponse{PodSandboxes: items})
}
// StreamContainers returns a stream of containers.
func (f *RemoteRuntime) StreamContainers(req *kubeapi.StreamContainersRequest, stream kubeapi.RuntimeService_StreamContainersServer) error {
items, err := f.RuntimeService.ListContainers(stream.Context(), req.Filter)
if err != nil {
return err
}
return stream.Send(&kubeapi.StreamContainersResponse{Containers: items})
}
// StreamContainerStats returns a stream of container stats.
func (f *RemoteRuntime) StreamContainerStats(req *kubeapi.StreamContainerStatsRequest, stream kubeapi.RuntimeService_StreamContainerStatsServer) error {
stats, err := f.RuntimeService.ListContainerStats(stream.Context(), req.Filter)
if err != nil {
return err
}
return stream.Send(&kubeapi.StreamContainerStatsResponse{ContainerStats: stats})
}
// StreamPodSandboxStats returns a stream of pod sandbox stats.
func (f *RemoteRuntime) StreamPodSandboxStats(req *kubeapi.StreamPodSandboxStatsRequest, stream kubeapi.RuntimeService_StreamPodSandboxStatsServer) error {
stats, err := f.RuntimeService.ListPodSandboxStats(stream.Context(), req.Filter)
if err != nil {
return err
}
return stream.Send(&kubeapi.StreamPodSandboxStatsResponse{PodSandboxStats: stats})
}
// StreamPodSandboxMetrics returns a stream of pod sandbox metrics.
func (f *RemoteRuntime) StreamPodSandboxMetrics(req *kubeapi.StreamPodSandboxMetricsRequest, stream kubeapi.RuntimeService_StreamPodSandboxMetricsServer) error {
podMetrics, err := f.RuntimeService.ListPodSandboxMetrics(stream.Context())
if err != nil {
return err
}
return stream.Send(&kubeapi.StreamPodSandboxMetricsResponse{PodSandboxMetrics: podMetrics})
}
// StreamImages returns a stream of images.
func (f *RemoteRuntime) StreamImages(req *kubeapi.StreamImagesRequest, stream kubeapi.ImageService_StreamImagesServer) error {
images, err := f.ImageService.ListImages(stream.Context(), req.Filter)
if err != nil {
return err
}
return stream.Send(&kubeapi.StreamImagesResponse{Images: images})
}
// Close will shutdown the internal gRPC client connection.
func (f *RemoteRuntime) Close(ctx context.Context) error {
return f.RuntimeService.Close(ctx)

View file

@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"io"
"time"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
@ -43,10 +44,21 @@ type remoteImageService struct {
timeout time.Duration
imageClient runtimeapi.ImageServiceClient
conn *grpc.ClientConn
// useStreaming indicates whether to use streaming RPCs for list operations
// when the CRIListStreaming feature gate is enabled. It falls back to false
// if a streaming RPC returns Unimplemented. The fallback is racy but kept
// simple since the worst case is a few extra fallback attempts.
useStreaming bool
}
// NewRemoteImageService creates a new internalapi.ImageManagerService.
func NewRemoteImageService(ctx context.Context, endpoint string, connectionTimeout time.Duration, tp trace.TracerProvider) (internalapi.ImageManagerService, error) {
// If useStreaming is true, streaming RPCs will be used for list operations
// instead of unary RPCs. If the runtime returns an Unimplemented error,
// the client automatically falls back to unary RPCs.
// NOTE: useStreaming is supposed to be gated by the CRIListStreaming feature
// gate and is expected to default to true once the feature graduates to GA,
// at which point this parameter may be removed.
func NewRemoteImageService(ctx context.Context, endpoint string, connectionTimeout time.Duration, tp trace.TracerProvider, useStreaming bool) (internalapi.ImageManagerService, error) {
logger := klog.FromContext(ctx)
logger.V(3).Info("Connecting to image service", "endpoint", endpoint)
addr, dialer, err := util.GetAddressAndDialer(endpoint)
@ -92,8 +104,9 @@ func NewRemoteImageService(ctx context.Context, endpoint string, connectionTimeo
}
service := &remoteImageService{
timeout: connectionTimeout,
conn: conn,
timeout: connectionTimeout,
conn: conn,
useStreaming: useStreaming,
}
if err := service.validateServiceConnection(ctx, conn, endpoint); err != nil {
return nil, fmt.Errorf("validate service connection: %w", err)
@ -130,6 +143,9 @@ func (r *remoteImageService) ListImages(ctx context.Context, filter *runtimeapi.
ctx, cancel := context.WithTimeout(ctx, r.timeout)
defer cancel()
if r.useStreaming {
return r.streamImagesV1(ctx, filter)
}
return r.listImagesV1(ctx, filter)
}
@ -146,6 +162,40 @@ func (r *remoteImageService) listImagesV1(ctx context.Context, filter *runtimeap
return resp.Images, nil
}
func (r *remoteImageService) streamImagesV1(ctx context.Context, filter *runtimeapi.ImageFilter) ([]*runtimeapi.Image, error) {
logger := klog.FromContext(ctx)
stream, err := r.imageClient.StreamImages(ctx, &runtimeapi.StreamImagesRequest{
Filter: filter,
})
if err != nil {
logger.Error(err, "StreamImages from image service failed", "filter", filter)
return nil, err
}
var images []*runtimeapi.Image
for {
resp, err := stream.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
// If the RPC is unimplemented, disable streaming and fall back to the unary RPC.
// The Unimplemented status is not returned when creating the stream,
// but when calling Recv() on the stream.
if status.Code(err) == codes.Unimplemented {
logger.Info("StreamImages not implemented, falling back to ListImages", "filter", filter)
r.useStreaming = false
return r.listImagesV1(ctx, filter)
}
logger.Error(err, "StreamImages recv failed", "filter", filter, "itemsReceived", len(images))
return nil, err
}
images = append(images, resp.Images...)
}
return images, nil
}
// ImageStatus returns the status of the image.
func (r *remoteImageService) ImageStatus(ctx context.Context, image *runtimeapi.ImageSpec, verbose bool) (*runtimeapi.ImageStatusResponse, error) {
ctx, cancel := context.WithTimeout(ctx, r.timeout)

View file

@ -33,14 +33,14 @@ import (
)
func createRemoteImageServiceWithTracerProvider(ctx context.Context, endpoint string, tp oteltrace.TracerProvider, t *testing.T) internalapi.ImageManagerService {
imageService, err := NewRemoteImageService(ctx, endpoint, defaultConnectionTimeout, tp)
imageService, err := NewRemoteImageService(ctx, endpoint, defaultConnectionTimeout, tp, false)
require.NoError(t, err)
return imageService
}
func createRemoteImageServiceWithoutTracerProvider(ctx context.Context, endpoint string, t *testing.T) internalapi.ImageManagerService {
imageService, err := NewRemoteImageService(ctx, endpoint, defaultConnectionTimeout, noop.NewTracerProvider())
imageService, err := NewRemoteImageService(ctx, endpoint, defaultConnectionTimeout, noop.NewTracerProvider(), false)
require.NoError(t, err)
return imageService

View file

@ -49,6 +49,11 @@ type remoteRuntimeService struct {
// Cache last per-container error message to reduce log spam
logReduction *logreduction.LogReduction
conn *grpc.ClientConn
// useStreaming indicates whether to use streaming RPCs for list operations
// when the CRIListStreaming feature gate is enabled. It falls back to false
// if a streaming RPC returns Unimplemented. The fallback is racy but kept
// simple since the worst case is a few extra fallback attempts.
useStreaming bool
}
const (
@ -79,7 +84,13 @@ const (
)
// NewRemoteRuntimeService creates a new internalapi.RuntimeService.
func NewRemoteRuntimeService(ctx context.Context, endpoint string, connectionTimeout time.Duration, tp trace.TracerProvider) (internalapi.RuntimeService, error) {
// If useStreaming is true, streaming RPCs will be used for list operations
// instead of unary RPCs. If the runtime returns an Unimplemented error,
// the client automatically falls back to unary RPCs.
// NOTE: useStreaming is supposed to be gated by the CRIListStreaming feature
// gate and is expected to default to true once the feature graduates to GA,
// at which point this parameter may be removed.
func NewRemoteRuntimeService(ctx context.Context, endpoint string, connectionTimeout time.Duration, tp trace.TracerProvider, useStreaming bool) (internalapi.RuntimeService, error) {
logger := klog.FromContext(ctx)
logger.V(3).Info("Connecting to runtime service", "endpoint", endpoint)
addr, dialer, err := util.GetAddressAndDialer(endpoint)
@ -127,6 +138,7 @@ func NewRemoteRuntimeService(ctx context.Context, endpoint string, connectionTim
timeout: connectionTimeout,
logReduction: logreduction.NewLogReduction(identicalErrorDelay),
conn: conn,
useStreaming: useStreaming,
}
if err := service.validateServiceConnection(ctx, conn, endpoint); err != nil {
@ -318,6 +330,9 @@ func (r *remoteRuntimeService) ListPodSandbox(ctx context.Context, filter *runti
ctx, cancel := context.WithTimeout(ctx, r.timeout)
defer cancel()
if r.useStreaming {
return r.streamPodSandboxesV1(ctx, filter)
}
return r.listPodSandboxV1(ctx, filter)
}
@ -336,6 +351,42 @@ func (r *remoteRuntimeService) listPodSandboxV1(ctx context.Context, filter *run
return resp.Items, nil
}
func (r *remoteRuntimeService) streamPodSandboxesV1(ctx context.Context, filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error) {
logger := klog.FromContext(ctx)
stream, err := r.runtimeClient.StreamPodSandboxes(ctx, &runtimeapi.StreamPodSandboxesRequest{
Filter: filter,
})
if err != nil {
logger.Error(err, "StreamPodSandboxes from runtime service failed", "filter", filter)
return nil, err
}
var items []*runtimeapi.PodSandbox
for {
resp, err := stream.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
// If the RPC is unimplemented, disable streaming and fall back to the unary RPC.
// The Unimplemented status is not returned when creating the stream,
// but when calling Recv() on the stream.
if status.Code(err) == codes.Unimplemented {
logger.Info("StreamPodSandboxes not implemented, falling back to ListPodSandbox", "filter", filter)
r.useStreaming = false
return r.listPodSandboxV1(ctx, filter)
}
logger.Error(err, "StreamPodSandboxes recv failed", "filter", filter, "itemsReceived", len(items))
return nil, err
}
items = append(items, resp.PodSandboxes...)
}
logger.V(10).Info("[RemoteRuntimeService] StreamPodSandboxes Response", "filter", filter, "items", items)
return items, nil
}
// CreateContainer creates a new container in the specified PodSandbox.
func (r *remoteRuntimeService) CreateContainer(ctx context.Context, podSandBoxID string, config *runtimeapi.ContainerConfig, sandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
logger := klog.FromContext(ctx)
@ -438,6 +489,9 @@ func (r *remoteRuntimeService) ListContainers(ctx context.Context, filter *runti
ctx, cancel := context.WithTimeout(ctx, r.timeout)
defer cancel()
if r.useStreaming {
return r.streamContainersV1(ctx, filter)
}
return r.listContainersV1(ctx, filter)
}
@ -455,6 +509,42 @@ func (r *remoteRuntimeService) listContainersV1(ctx context.Context, filter *run
return resp.Containers, nil
}
func (r *remoteRuntimeService) streamContainersV1(ctx context.Context, filter *runtimeapi.ContainerFilter) ([]*runtimeapi.Container, error) {
logger := klog.FromContext(ctx)
stream, err := r.runtimeClient.StreamContainers(ctx, &runtimeapi.StreamContainersRequest{
Filter: filter,
})
if err != nil {
logger.Error(err, "StreamContainers from runtime service failed", "filter", filter)
return nil, err
}
var containers []*runtimeapi.Container
for {
resp, err := stream.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
// If the RPC is unimplemented, disable streaming and fall back to the unary RPC.
// The Unimplemented status is not returned when creating the stream,
// but when calling Recv() on the stream.
if status.Code(err) == codes.Unimplemented {
logger.Info("StreamContainers not implemented, falling back to ListContainers", "filter", filter)
r.useStreaming = false
return r.listContainersV1(ctx, filter)
}
logger.Error(err, "StreamContainers recv failed", "filter", filter, "itemsReceived", len(containers))
return nil, err
}
containers = append(containers, resp.Containers...)
}
logger.V(10).Info("[RemoteRuntimeService] StreamContainers Response", "filter", filter, "containers", containers)
return containers, nil
}
// ContainerStatus returns the container status.
func (r *remoteRuntimeService) ContainerStatus(ctx context.Context, containerID string, verbose bool) (*runtimeapi.ContainerStatusResponse, error) {
logger := klog.FromContext(ctx)
@ -756,6 +846,9 @@ func (r *remoteRuntimeService) ListContainerStats(ctx context.Context, filter *r
ctx, cancel := context.WithTimeout(ctx, r.timeout)
defer cancel()
if r.useStreaming {
return r.streamContainerStatsV1(ctx, filter)
}
return r.listContainerStatsV1(ctx, filter)
}
@ -773,6 +866,42 @@ func (r *remoteRuntimeService) listContainerStatsV1(ctx context.Context, filter
return resp.GetStats(), nil
}
func (r *remoteRuntimeService) streamContainerStatsV1(ctx context.Context, filter *runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error) {
logger := klog.FromContext(ctx)
stream, err := r.runtimeClient.StreamContainerStats(ctx, &runtimeapi.StreamContainerStatsRequest{
Filter: filter,
})
if err != nil {
logger.Error(err, "StreamContainerStats from runtime service failed", "filter", filter)
return nil, err
}
var stats []*runtimeapi.ContainerStats
for {
resp, err := stream.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
// If the RPC is unimplemented, disable streaming and fall back to the unary RPC.
// The Unimplemented status is not returned when creating the stream,
// but when calling Recv() on the stream.
if status.Code(err) == codes.Unimplemented {
logger.Info("StreamContainerStats not implemented, falling back to ListContainerStats", "filter", filter)
r.useStreaming = false
return r.listContainerStatsV1(ctx, filter)
}
logger.Error(err, "StreamContainerStats recv failed", "filter", filter, "itemsReceived", len(stats))
return nil, err
}
stats = append(stats, resp.ContainerStats...)
}
logger.V(10).Info("[RemoteRuntimeService] StreamContainerStats Response", "filter", filter, "stats", stats)
return stats, nil
}
// PodSandboxStats returns the stats of the pod.
func (r *remoteRuntimeService) PodSandboxStats(ctx context.Context, podSandboxID string) (*runtimeapi.PodSandboxStats, error) {
logger := klog.FromContext(ctx)
@ -808,6 +937,9 @@ func (r *remoteRuntimeService) ListPodSandboxStats(ctx context.Context, filter *
ctx, cancel := context.WithTimeout(ctx, r.timeout)
defer cancel()
if r.useStreaming {
return r.streamPodSandboxStatsV1(ctx, filter)
}
return r.listPodSandboxStatsV1(ctx, filter)
}
@ -825,6 +957,42 @@ func (r *remoteRuntimeService) listPodSandboxStatsV1(ctx context.Context, filter
return resp.GetStats(), nil
}
func (r *remoteRuntimeService) streamPodSandboxStatsV1(ctx context.Context, filter *runtimeapi.PodSandboxStatsFilter) ([]*runtimeapi.PodSandboxStats, error) {
logger := klog.FromContext(ctx)
stream, err := r.runtimeClient.StreamPodSandboxStats(ctx, &runtimeapi.StreamPodSandboxStatsRequest{
Filter: filter,
})
if err != nil {
logger.Error(err, "StreamPodSandboxStats from runtime service failed", "filter", filter)
return nil, err
}
var stats []*runtimeapi.PodSandboxStats
for {
resp, err := stream.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
// If the RPC is unimplemented, disable streaming and fall back to the unary RPC.
// The Unimplemented status is not returned when creating the stream,
// but when calling Recv() on the stream.
if status.Code(err) == codes.Unimplemented {
logger.Info("StreamPodSandboxStats not implemented, falling back to ListPodSandboxStats", "filter", filter)
r.useStreaming = false
return r.listPodSandboxStatsV1(ctx, filter)
}
logger.Error(err, "StreamPodSandboxStats recv failed", "filter", filter, "itemsReceived", len(stats))
return nil, err
}
stats = append(stats, resp.PodSandboxStats...)
}
logger.V(10).Info("[RemoteRuntimeService] StreamPodSandboxStats Response", "filter", filter, "stats", stats)
return stats, nil
}
// ReopenContainerLog reopens the container log file.
func (r *remoteRuntimeService) ReopenContainerLog(ctx context.Context, containerID string) (err error) {
logger := klog.FromContext(ctx)
@ -908,7 +1076,7 @@ func (r *remoteRuntimeService) GetContainerEvents(ctx context.Context, container
for {
resp, err := containerEventsStreamingClient.Recv()
if err == io.EOF {
if errors.Is(err, io.EOF) {
logger.Error(err, "container events stream is closed")
return err
}
@ -944,6 +1112,13 @@ func (r *remoteRuntimeService) ListPodSandboxMetrics(ctx context.Context) ([]*ru
ctx, cancel := context.WithTimeout(ctx, r.timeout)
defer cancel()
if r.useStreaming {
return r.streamPodSandboxMetricsV1(ctx)
}
return r.listPodSandboxMetricsV1(ctx)
}
func (r *remoteRuntimeService) listPodSandboxMetricsV1(ctx context.Context) ([]*runtimeapi.PodSandboxMetrics, error) {
resp, err := r.runtimeClient.ListPodSandboxMetrics(ctx, &runtimeapi.ListPodSandboxMetricsRequest{})
logger := klog.FromContext(ctx)
if err != nil {
@ -955,6 +1130,40 @@ func (r *remoteRuntimeService) ListPodSandboxMetrics(ctx context.Context) ([]*ru
return resp.GetPodMetrics(), nil
}
func (r *remoteRuntimeService) streamPodSandboxMetricsV1(ctx context.Context) ([]*runtimeapi.PodSandboxMetrics, error) {
logger := klog.FromContext(ctx)
stream, err := r.runtimeClient.StreamPodSandboxMetrics(ctx, &runtimeapi.StreamPodSandboxMetricsRequest{})
if err != nil {
logger.Error(err, "StreamPodSandboxMetrics from runtime service failed")
return nil, err
}
var metrics []*runtimeapi.PodSandboxMetrics
for {
resp, err := stream.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
// If the RPC is unimplemented, disable streaming and fall back to the unary RPC.
// The Unimplemented status is not returned when creating the stream,
// but when calling Recv() on the stream.
if status.Code(err) == codes.Unimplemented {
logger.Info("StreamPodSandboxMetrics not implemented, falling back to ListPodSandboxMetrics")
r.useStreaming = false
return r.listPodSandboxMetricsV1(ctx)
}
logger.Error(err, "StreamPodSandboxMetrics recv failed", "itemsReceived", len(metrics))
return nil, err
}
metrics = append(metrics, resp.PodSandboxMetrics...)
}
logger.V(10).Info("[RemoteRuntimeService] StreamPodSandboxMetrics Response", "metrics", metrics)
return metrics, nil
}
// RuntimeConfig returns the configuration information of the runtime.
func (r *remoteRuntimeService) RuntimeConfig(ctx context.Context) (*runtimeapi.RuntimeConfigResponse, error) {
ctx, cancel := context.WithTimeout(ctx, r.timeout)

View file

@ -51,7 +51,7 @@ func createAndStartFakeRemoteRuntime(t *testing.T) (*fakeremote.RemoteRuntime, s
}
func createRemoteRuntimeService(ctx context.Context, endpoint string, t *testing.T) internalapi.RuntimeService {
runtimeService, err := NewRemoteRuntimeService(ctx, endpoint, defaultConnectionTimeout, noop.NewTracerProvider())
runtimeService, err := NewRemoteRuntimeService(ctx, endpoint, defaultConnectionTimeout, noop.NewTracerProvider(), false)
require.NoError(t, err)
@ -59,7 +59,7 @@ func createRemoteRuntimeService(ctx context.Context, endpoint string, t *testing
}
func createRemoteRuntimeServiceWithTracerProvider(ctx context.Context, endpoint string, tp oteltrace.TracerProvider, t *testing.T) internalapi.RuntimeService {
runtimeService, err := NewRemoteRuntimeService(ctx, endpoint, defaultConnectionTimeout, tp)
runtimeService, err := NewRemoteRuntimeService(ctx, endpoint, defaultConnectionTimeout, tp, false)
require.NoError(t, err)
return runtimeService

View file

@ -29,6 +29,7 @@
| CPUManagerPolicyOptions | :ballot_box_with_check:&nbsp;1.23+ | :closed_lock_with_key:&nbsp;1.33+ | 1.22 | 1.231.32 | 1.33 | | | [code](https://cs.k8s.io/?q=%5CbCPUManagerPolicyOptions%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbCPUManagerPolicyOptions%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| CRDObservedGenerationTracking | :ballot_box_with_check:&nbsp;1.36+ | | | 1.35 | | | | [code](https://cs.k8s.io/?q=%5CbCRDObservedGenerationTracking%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbCRDObservedGenerationTracking%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| CRDValidationRatcheting | :ballot_box_with_check:&nbsp;1.30+ | :closed_lock_with_key:&nbsp;1.33+ | 1.281.29 | 1.301.32 | 1.33 | | | [code](https://cs.k8s.io/?q=%5CbCRDValidationRatcheting%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbCRDValidationRatcheting%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| CRIListStreaming | | | 1.36 | | | | | [code](https://cs.k8s.io/?q=%5CbCRIListStreaming%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbCRIListStreaming%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| CSIServiceAccountTokenSecrets | :ballot_box_with_check:&nbsp;1.35+ | :closed_lock_with_key:&nbsp;1.36+ | | 1.35 | 1.36 | | | [code](https://cs.k8s.io/?q=%5CbCSIServiceAccountTokenSecrets%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbCSIServiceAccountTokenSecrets%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| CSIVolumeHealth | | | 1.21 | | | | | [code](https://cs.k8s.io/?q=%5CbCSIVolumeHealth%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbCSIVolumeHealth%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| ChangeContainerStatusOnKubeletRestart | :ballot_box_with_check:&nbsp;1.0+ | | | | 1.01.34 | 1.35 | | [code](https://cs.k8s.io/?q=%5CbChangeContainerStatusOnKubeletRestart%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbChangeContainerStatusOnKubeletRestart%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |

View file

@ -399,6 +399,12 @@
lockToDefault: true
preRelease: GA
version: "1.33"
- name: CRIListStreaming
versionedSpecs:
- default: false
lockToDefault: false
preRelease: Alpha
version: "1.36"
- name: CrossNamespaceVolumeDataSource
versionedSpecs:
- default: false

View file

@ -28,6 +28,10 @@ const (
PullImage = "PullImage"
RemoveImage = "RemoveImage"
ImageFsInfo = "ImageFsInfo"
// Streaming APIs
StreamImages = "StreamImages"
// Per-send injection point for streaming APIs, called before each stream.Send().
StreamImagesSend = "StreamImagesSend"
)
// ListImages lists existing images.
@ -102,3 +106,24 @@ func (p *RemoteRuntime) ImageFsInfo(ctx context.Context, req *kubeapi.ImageFsInf
}
return resp, nil
}
// StreamImages returns a stream of images.
func (p *RemoteRuntime) StreamImages(req *kubeapi.StreamImagesRequest, stream kubeapi.ImageService_StreamImagesServer) error {
if err := p.runInjectors(StreamImages); err != nil {
return err
}
images, err := p.imageService.ListImages(stream.Context(), req.Filter)
if err != nil {
return err
}
for _, image := range images {
if err := p.runInjectors(StreamImagesSend); err != nil {
return err
}
if err := stream.Send(&kubeapi.StreamImagesResponse{Images: []*kubeapi.Image{image}}); err != nil {
return err
}
}
return nil
}

View file

@ -64,6 +64,15 @@ const (
ListPodSandboxMetrics = "ListPodSandboxMetrics"
RuntimeConfig = "RuntimeConfig"
UpdatePodSandboxResources = "UpdatePodSandboxResources"
// Streaming APIs
StreamPodSandboxes = "StreamPodSandboxes"
StreamContainers = "StreamContainers"
StreamContainerStats = "StreamContainerStats"
StreamPodSandboxStats = "StreamPodSandboxStats"
StreamPodSandboxMetrics = "StreamPodSandboxMetrics"
// Per-send injection points for streaming APIs, called before each stream.Send().
StreamPodSandboxesSend = "StreamPodSandboxesSend"
StreamContainersSend = "StreamContainersSend"
)
// AddInjector inject the error or delay to the next call to the RuntimeService.
@ -533,3 +542,99 @@ func (p *RemoteRuntime) RuntimeConfig(ctx context.Context, req *runtimeapi.Runti
}
return resp, nil
}
// StreamPodSandboxes returns a stream of PodSandboxes.
func (p *RemoteRuntime) StreamPodSandboxes(req *runtimeapi.StreamPodSandboxesRequest, stream runtimeapi.RuntimeService_StreamPodSandboxesServer) error {
if err := p.runInjectors(StreamPodSandboxes); err != nil {
return err
}
items, err := p.runtimeService.ListPodSandbox(stream.Context(), req.Filter)
if err != nil {
return err
}
for _, item := range items {
if err := p.runInjectors(StreamPodSandboxesSend); err != nil {
return err
}
if err := stream.Send(&runtimeapi.StreamPodSandboxesResponse{PodSandboxes: []*runtimeapi.PodSandbox{item}}); err != nil {
return err
}
}
return nil
}
// StreamContainers returns a stream of containers.
func (p *RemoteRuntime) StreamContainers(req *runtimeapi.StreamContainersRequest, stream runtimeapi.RuntimeService_StreamContainersServer) error {
if err := p.runInjectors(StreamContainers); err != nil {
return err
}
items, err := p.runtimeService.ListContainers(stream.Context(), req.Filter)
if err != nil {
return err
}
for _, item := range items {
if err := p.runInjectors(StreamContainersSend); err != nil {
return err
}
if err := stream.Send(&runtimeapi.StreamContainersResponse{Containers: []*runtimeapi.Container{item}}); err != nil {
return err
}
}
return nil
}
// StreamContainerStats returns a stream of container stats.
func (p *RemoteRuntime) StreamContainerStats(req *runtimeapi.StreamContainerStatsRequest, stream runtimeapi.RuntimeService_StreamContainerStatsServer) error {
if err := p.runInjectors(StreamContainerStats); err != nil {
return err
}
stats, err := p.runtimeService.ListContainerStats(stream.Context(), req.Filter)
if err != nil {
return err
}
for _, stat := range stats {
if err := stream.Send(&runtimeapi.StreamContainerStatsResponse{ContainerStats: []*runtimeapi.ContainerStats{stat}}); err != nil {
return err
}
}
return nil
}
// StreamPodSandboxStats returns a stream of pod sandbox stats.
func (p *RemoteRuntime) StreamPodSandboxStats(req *runtimeapi.StreamPodSandboxStatsRequest, stream runtimeapi.RuntimeService_StreamPodSandboxStatsServer) error {
if err := p.runInjectors(StreamPodSandboxStats); err != nil {
return err
}
stats, err := p.runtimeService.ListPodSandboxStats(stream.Context(), req.Filter)
if err != nil {
return err
}
for _, stat := range stats {
if err := stream.Send(&runtimeapi.StreamPodSandboxStatsResponse{PodSandboxStats: []*runtimeapi.PodSandboxStats{stat}}); err != nil {
return err
}
}
return nil
}
// StreamPodSandboxMetrics returns a stream of pod sandbox metrics.
func (p *RemoteRuntime) StreamPodSandboxMetrics(req *runtimeapi.StreamPodSandboxMetricsRequest, stream runtimeapi.RuntimeService_StreamPodSandboxMetricsServer) error {
if err := p.runInjectors(StreamPodSandboxMetrics); err != nil {
return err
}
podMetrics, err := p.runtimeService.ListPodSandboxMetrics(stream.Context())
if err != nil {
return err
}
for _, metric := range podMetrics {
if err := stream.Send(&runtimeapi.StreamPodSandboxMetricsResponse{PodSandboxMetrics: []*runtimeapi.PodSandboxMetrics{metric}}); err != nil {
return err
}
}
return nil
}

View file

@ -24,10 +24,13 @@ import (
"fmt"
"os"
"strings"
"sync/atomic"
"time"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -251,6 +254,163 @@ var _ = SIGDescribe(feature.CriProxy, framework.WithSerial(), func() {
gomega.Expect(imagePullDuration).To(gomega.BeNumerically(">=", delayTime), "PullImages should take more than 10 seconds")
})
})
// CRI streaming API tests
framework.Context("CRI streaming list operations", feature.CriProxy, framework.WithFeatureGate(features.CRIListStreaming), func() {
ginkgo.BeforeEach(func() {
if err := resetCRIProxyInjector(e2eCriProxy); err != nil {
ginkgo.Skip("Skip the test since the CRI Proxy is undefined.")
}
ginkgo.DeferCleanup(func() error {
return resetCRIProxyInjector(e2eCriProxy)
})
})
ginkgo.It("should use streaming RPCs for listing pods and containers", func(ctx context.Context) {
// Track which streaming APIs were called
apiCalled := make(map[string]bool)
err := addCRIProxyInjector(e2eCriProxy, func(apiName string) error {
apiCalled[apiName] = true
return nil
})
framework.ExpectNoError(err)
// Wait for kubelet to make list calls (which should use streaming when enabled)
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(apiCalled[criproxy.StreamContainers]).To(gomega.BeTrueBecause("StreamContainers should be called"))
g.Expect(apiCalled[criproxy.StreamPodSandboxes]).To(gomega.BeTrueBecause("StreamPodSandboxes should be called"))
g.Expect(apiCalled[criproxy.ListContainers]).To(gomega.BeFalseBecause("ListContainers should not be called"))
g.Expect(apiCalled[criproxy.ListPodSandbox]).To(gomega.BeFalseBecause("ListPodSandbox should not be called"))
}).WithPolling(1 * time.Second).WithTimeout(1 * time.Minute).Should(gomega.Succeed())
})
ginkgo.It("should handle mid-stream errors", func(ctx context.Context) {
// Create a pod so that StreamContainersSend is called at least twice
// (once per container), allowing us to inject a mid-stream error
// after the first item is successfully sent.
e2epod.NewPodClient(f).CreateSync(ctx, newPausePodWithContainers(2))
// Track per-call send count so we can fail after the first item
// is successfully sent, simulating a mid-stream failure.
var perCallSendCount atomic.Int32
var midStreamErrors atomic.Int32
var listFallbackCalls atomic.Int32
err := addCRIProxyInjector(e2eCriProxy, func(apiName string) error {
switch apiName {
case criproxy.StreamContainers:
// Reset per-call counter at the start of each streaming call
perCallSendCount.Store(0)
case criproxy.StreamContainersSend:
if perCallSendCount.Add(1) > 1 {
midStreamErrors.Add(1)
return status.Error(codes.Internal, "injected mid-stream error")
}
case criproxy.ListContainers:
listFallbackCalls.Add(1)
}
return nil
})
framework.ExpectNoError(err)
// Wait for the mid-stream error to be triggered at least once
gomega.Eventually(func() bool {
return midStreamErrors.Load() > 0
}).WithPolling(1 * time.Second).WithTimeout(1 * time.Minute).Should(
gomega.BeTrueBecause("Expected mid-stream error to be triggered during StreamContainers"))
// Verify no fallback to unary RPC (Internal errors should NOT trigger fallback)
gomega.Expect(listFallbackCalls.Load()).To(gomega.Equal(int32(0)),
"Non-Unimplemented errors should not trigger fallback to ListContainers")
})
ginkgo.It("should handle streaming timeout", func(ctx context.Context) {
// Create a pod so that StreamContainersSend is called at least twice,
// allowing us to block on the second send to simulate a timeout.
e2epod.NewPodClient(f).CreateSync(ctx, newPausePodWithContainers(5))
var perCallSendCount atomic.Int32
var listFallbackCalls atomic.Int32
err := addCRIProxyInjector(e2eCriProxy, func(apiName string) error {
switch apiName {
case criproxy.StreamContainers:
// Reset per-call counter at the start of each streaming call
perCallSendCount.Store(0)
case criproxy.StreamContainersSend:
// Simulate a slow runtime by waiting one minute per item;
// the client's context timeout will fire and the Recv()
// will return DeadlineExceeded.
time.Sleep(1 * time.Minute)
perCallSendCount.Add(1)
case criproxy.ListContainers:
listFallbackCalls.Add(1)
}
return nil
})
framework.ExpectNoError(err)
// Ensure the number of containers sent per streaming call never exceeds 2,
// because the connection timeout is set to 2 mins.
// It confirms the client times out behave the same as List methods.
gomega.Eventually(func() bool {
return perCallSendCount.Load() > 0
}).WithPolling(1 * time.Second).WithTimeout(3 * time.Minute).Should(
gomega.BeTrueBecause("Expected at least one StreamContainersSend call"))
gomega.Expect(perCallSendCount.Load()).To(gomega.BeNumerically("<=", int32(2)),
"Expected no more than 2 containers to be sent before timeout")
// Wait for the kubelet to log the streaming recv failure caused by the timeout
gomega.Eventually(func() error {
return verifyErrorInKubeletLogs("StreamContainers recv failed")
}).WithPolling(5*time.Second).WithTimeout(3*time.Minute).Should(gomega.Succeed(),
"Expected kubelet to log a StreamContainers recv failure due to timeout")
// Verify no fallback to unary RPC (timeout errors should NOT trigger fallback)
gomega.Expect(listFallbackCalls.Load()).To(gomega.Equal(int32(0)),
"Timeout errors should not trigger fallback to ListContainers")
})
// Each fallback test restarts the kubelet to ensure a fresh CRI client
// with useStreaming=true, since triggering the Unimplemented fallback
// permanently disables streaming on the kubelet's CRI client.
ginkgo.It("should fall back to unary RPC when streaming returns Unimplemented", func(ctx context.Context) {
// Restart kubelet on cleanup to reset the useStreaming flag,
// which is cached from the first streaming attempt.
ginkgo.DeferCleanup(func(ctx context.Context) error {
err := resetCRIProxyInjector(e2eCriProxy)
if err != nil {
return err
}
restartKubelet(ctx, true)
waitForKubeletToStart(ctx, f)
return nil
})
var streamCallCount atomic.Int32
var listCallCount atomic.Int32
err := addCRIProxyInjector(e2eCriProxy, func(apiName string) error {
switch apiName {
case criproxy.StreamContainers:
streamCallCount.Add(1)
return status.Error(codes.Unimplemented, "streaming not supported")
case criproxy.ListContainers:
listCallCount.Add(1)
}
return nil
})
framework.ExpectNoError(err)
gomega.Eventually(func() bool {
return listCallCount.Load() > 0
}).WithPolling(1 * time.Second).WithTimeout(1 * time.Minute).Should(
gomega.BeTrueBecause("Expected fallback to ListContainers after StreamContainers returned Unimplemented"))
gomega.Expect(streamCallCount.Load()).To(gomega.BeNumerically(">=", int32(1)),
"Expected StreamContainers to be attempted at least once before falling back")
})
})
})
func getFailedToPullImageMsg(ctx context.Context, f *framework.Framework, podName string) (string, error) {
@ -312,6 +472,25 @@ func newPullImageAlwaysPod() *v1.Pod {
return pod
}
func newPausePodWithContainers(count int) *v1.Pod {
podName := "cri-proxy-test-" + string(uuid.NewUUID())
var containers []v1.Container
for i := range count {
containers = append(containers, v1.Container{
Name: fmt.Sprintf("pause-%d", i),
Image: imageutils.GetPauseImageName(),
})
}
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Spec: v1.PodSpec{
Containers: containers,
},
}
}
func verifyErrorInKubeletLogs(errorMsg string) error {
kubeletLog, err := os.ReadFile(framework.TestContext.ReportDir + "/kubelet.log")
if err != nil {

View file

@ -54,6 +54,7 @@ import (
stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
"k8s.io/kubelet/pkg/types"
"k8s.io/kubernetes/pkg/cluster/ports"
"k8s.io/kubernetes/pkg/features"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
"k8s.io/kubernetes/pkg/kubelet/cm"
@ -284,7 +285,8 @@ func getCRIClient(ctx context.Context) (internalapi.RuntimeService, internalapi.
// connection timeout for CRI service connection
const connectionTimeout = 2 * time.Minute
runtimeEndpoint := framework.TestContext.ContainerRuntimeEndpoint
r, err := remote.NewRemoteRuntimeService(ctx, runtimeEndpoint, connectionTimeout, noop.NewTracerProvider())
useStreaming := utilfeature.DefaultFeatureGate.Enabled(features.CRIListStreaming)
r, err := remote.NewRemoteRuntimeService(ctx, runtimeEndpoint, connectionTimeout, noop.NewTracerProvider(), useStreaming)
if err != nil {
return nil, nil, err
}
@ -294,7 +296,7 @@ func getCRIClient(ctx context.Context) (internalapi.RuntimeService, internalapi.
//explicitly specified
imageManagerEndpoint = framework.TestContext.ImageServiceEndpoint
}
i, err := remote.NewRemoteImageService(ctx, imageManagerEndpoint, connectionTimeout, noop.NewTracerProvider())
i, err := remote.NewRemoteImageService(ctx, imageManagerEndpoint, connectionTimeout, noop.NewTracerProvider(), useStreaming)
if err != nil {
return nil, nil, err
}