From 0acd0a446885ee30683eadbf9ea6efe7b04e482d Mon Sep 17 00:00:00 2001 From: Ayato Tokubi Date: Sat, 21 Mar 2026 16:11:11 +0000 Subject: [PATCH] cri-client: use atomic.Bool for useStreaming to fix data race MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace plain bool with sync/atomic.Bool for the useStreaming field in remoteRuntimeService and remoteImageService to eliminate a data race when multiple goroutines concurrently read/write the field during Unimplemented fallback. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --- .../src/k8s.io/cri-client/pkg/remote_image.go | 19 +++++++----- .../k8s.io/cri-client/pkg/remote_runtime.go | 30 ++++++++++--------- 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/staging/src/k8s.io/cri-client/pkg/remote_image.go b/staging/src/k8s.io/cri-client/pkg/remote_image.go index e6f975eece0..0a3ab3dd335 100644 --- a/staging/src/k8s.io/cri-client/pkg/remote_image.go +++ b/staging/src/k8s.io/cri-client/pkg/remote_image.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "io" + "sync/atomic" "time" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" @@ -46,9 +47,10 @@ type remoteImageService struct { conn *grpc.ClientConn // useStreaming indicates whether to use streaming RPCs for list operations // when the CRIListStreaming feature gate is enabled. It falls back to false - // if a streaming RPC returns Unimplemented. The fallback is racy but kept - // simple since the worst case is a few extra fallback attempts. - useStreaming bool + // if a streaming RPC returns Unimplemented. Multiple goroutines may + // concurrently observe Unimplemented and store false, but that is harmless + // because the store is idempotent. + useStreaming atomic.Bool } // NewRemoteImageService creates a new internalapi.ImageManagerService. @@ -104,10 +106,11 @@ func NewRemoteImageService(ctx context.Context, endpoint string, connectionTimeo } service := &remoteImageService{ - timeout: connectionTimeout, - conn: conn, - useStreaming: useStreaming, + timeout: connectionTimeout, + conn: conn, } + service.useStreaming.Store(useStreaming) + if err := service.validateServiceConnection(ctx, conn, endpoint); err != nil { return nil, fmt.Errorf("validate service connection: %w", err) } @@ -143,7 +146,7 @@ func (r *remoteImageService) ListImages(ctx context.Context, filter *runtimeapi. ctx, cancel := context.WithTimeout(ctx, r.timeout) defer cancel() - if r.useStreaming { + if r.useStreaming.Load() { return r.streamImagesV1(ctx, filter) } return r.listImagesV1(ctx, filter) @@ -184,7 +187,7 @@ func (r *remoteImageService) streamImagesV1(ctx context.Context, filter *runtime // but when calling Recv() on the stream. if status.Code(err) == codes.Unimplemented { logger.Info("StreamImages not implemented, falling back to ListImages", "filter", filter) - r.useStreaming = false + r.useStreaming.Store(false) return r.listImagesV1(ctx, filter) } logger.Error(err, "StreamImages recv failed", "filter", filter, "itemsReceived", len(images)) diff --git a/staging/src/k8s.io/cri-client/pkg/remote_runtime.go b/staging/src/k8s.io/cri-client/pkg/remote_runtime.go index ef12c265246..919cd9d8480 100644 --- a/staging/src/k8s.io/cri-client/pkg/remote_runtime.go +++ b/staging/src/k8s.io/cri-client/pkg/remote_runtime.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "strings" + "sync/atomic" "time" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" @@ -51,9 +52,10 @@ type remoteRuntimeService struct { conn *grpc.ClientConn // useStreaming indicates whether to use streaming RPCs for list operations // when the CRIListStreaming feature gate is enabled. It falls back to false - // if a streaming RPC returns Unimplemented. The fallback is racy but kept - // simple since the worst case is a few extra fallback attempts. - useStreaming bool + // if a streaming RPC returns Unimplemented. Multiple goroutines may + // concurrently observe Unimplemented and store false, but that is harmless + // because the store is idempotent. + useStreaming atomic.Bool } const ( @@ -138,8 +140,8 @@ func NewRemoteRuntimeService(ctx context.Context, endpoint string, connectionTim timeout: connectionTimeout, logReduction: logreduction.NewLogReduction(identicalErrorDelay), conn: conn, - useStreaming: useStreaming, } + service.useStreaming.Store(useStreaming) if err := service.validateServiceConnection(ctx, conn, endpoint); err != nil { return nil, fmt.Errorf("validate service connection: %w", err) @@ -330,7 +332,7 @@ func (r *remoteRuntimeService) ListPodSandbox(ctx context.Context, filter *runti ctx, cancel := context.WithTimeout(ctx, r.timeout) defer cancel() - if r.useStreaming { + if r.useStreaming.Load() { return r.streamPodSandboxesV1(ctx, filter) } return r.listPodSandboxV1(ctx, filter) @@ -373,7 +375,7 @@ func (r *remoteRuntimeService) streamPodSandboxesV1(ctx context.Context, filter // but when calling Recv() on the stream. if status.Code(err) == codes.Unimplemented { logger.Info("StreamPodSandboxes not implemented, falling back to ListPodSandbox", "filter", filter) - r.useStreaming = false + r.useStreaming.Store(false) return r.listPodSandboxV1(ctx, filter) } logger.Error(err, "StreamPodSandboxes recv failed", "filter", filter, "itemsReceived", len(items)) @@ -489,7 +491,7 @@ func (r *remoteRuntimeService) ListContainers(ctx context.Context, filter *runti ctx, cancel := context.WithTimeout(ctx, r.timeout) defer cancel() - if r.useStreaming { + if r.useStreaming.Load() { return r.streamContainersV1(ctx, filter) } return r.listContainersV1(ctx, filter) @@ -531,7 +533,7 @@ func (r *remoteRuntimeService) streamContainersV1(ctx context.Context, filter *r // but when calling Recv() on the stream. if status.Code(err) == codes.Unimplemented { logger.Info("StreamContainers not implemented, falling back to ListContainers", "filter", filter) - r.useStreaming = false + r.useStreaming.Store(false) return r.listContainersV1(ctx, filter) } logger.Error(err, "StreamContainers recv failed", "filter", filter, "itemsReceived", len(containers)) @@ -846,7 +848,7 @@ func (r *remoteRuntimeService) ListContainerStats(ctx context.Context, filter *r ctx, cancel := context.WithTimeout(ctx, r.timeout) defer cancel() - if r.useStreaming { + if r.useStreaming.Load() { return r.streamContainerStatsV1(ctx, filter) } return r.listContainerStatsV1(ctx, filter) @@ -888,7 +890,7 @@ func (r *remoteRuntimeService) streamContainerStatsV1(ctx context.Context, filte // but when calling Recv() on the stream. if status.Code(err) == codes.Unimplemented { logger.Info("StreamContainerStats not implemented, falling back to ListContainerStats", "filter", filter) - r.useStreaming = false + r.useStreaming.Store(false) return r.listContainerStatsV1(ctx, filter) } logger.Error(err, "StreamContainerStats recv failed", "filter", filter, "itemsReceived", len(stats)) @@ -937,7 +939,7 @@ func (r *remoteRuntimeService) ListPodSandboxStats(ctx context.Context, filter * ctx, cancel := context.WithTimeout(ctx, r.timeout) defer cancel() - if r.useStreaming { + if r.useStreaming.Load() { return r.streamPodSandboxStatsV1(ctx, filter) } return r.listPodSandboxStatsV1(ctx, filter) @@ -979,7 +981,7 @@ func (r *remoteRuntimeService) streamPodSandboxStatsV1(ctx context.Context, filt // but when calling Recv() on the stream. if status.Code(err) == codes.Unimplemented { logger.Info("StreamPodSandboxStats not implemented, falling back to ListPodSandboxStats", "filter", filter) - r.useStreaming = false + r.useStreaming.Store(false) return r.listPodSandboxStatsV1(ctx, filter) } logger.Error(err, "StreamPodSandboxStats recv failed", "filter", filter, "itemsReceived", len(stats)) @@ -1112,7 +1114,7 @@ func (r *remoteRuntimeService) ListPodSandboxMetrics(ctx context.Context) ([]*ru ctx, cancel := context.WithTimeout(ctx, r.timeout) defer cancel() - if r.useStreaming { + if r.useStreaming.Load() { return r.streamPodSandboxMetricsV1(ctx) } return r.listPodSandboxMetricsV1(ctx) @@ -1150,7 +1152,7 @@ func (r *remoteRuntimeService) streamPodSandboxMetricsV1(ctx context.Context) ([ // but when calling Recv() on the stream. if status.Code(err) == codes.Unimplemented { logger.Info("StreamPodSandboxMetrics not implemented, falling back to ListPodSandboxMetrics") - r.useStreaming = false + r.useStreaming.Store(false) return r.listPodSandboxMetricsV1(ctx) } logger.Error(err, "StreamPodSandboxMetrics recv failed", "itemsReceived", len(metrics))