diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index b7910f4522b..48ee1708642 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -39,6 +39,8 @@ import ( "go.uber.org/zap/zapcore" "golang.org/x/time/rate" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "k8s.io/klog/v2" "k8s.io/apimachinery/pkg/runtime" @@ -352,7 +354,42 @@ var newETCD3Client = func(c storagebackend.TransportConfig) (*kubernetes.Client, Logger: etcd3ClientLogger, } - return kubernetes.New(cfg) + kClient, err := kubernetes.New(cfg) + if err != nil { + return nil, err + } + + if err := blockUntilReady(dialTimeout, kClient); err != nil { + return nil, err + } + + return kClient, nil +} + +func blockUntilReady(timeout time.Duration, client *kubernetes.Client) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + conn := client.Client.ActiveConnection() + if conn == nil { + return fmt.Errorf("no grpc connection") + } + + for { + currentState := conn.GetState() + if currentState == connectivity.Ready { + // ready, return + return nil + } + if currentState == connectivity.Idle { + // attempt connect + conn.Connect() + } + // wait for state change from currentState until context times out + if !conn.WaitForStateChange(ctx, currentState) { + return fmt.Errorf("etcd grpc connection not ready: %w", ctx.Err()) + } + } } type runningCompactor struct {