mirror of
https://github.com/hashicorp/vault.git
synced 2026-06-09 00:33:28 -04:00
Make grpc plugin client use an atomic server value to fix a data race. (#4089)
Also add some coordination to ensure we don't try to clean up the grpc server before it's created/started
This commit is contained in:
parent
b334a3ead7
commit
65bd8dc8b0
2 changed files with 29 additions and 6 deletions
|
|
@ -3,6 +3,7 @@ package plugin
|
|||
import (
|
||||
"context"
|
||||
"net/rpc"
|
||||
"sync/atomic"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
|
|
@ -42,10 +43,17 @@ func (b BackendPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) err
|
|||
}
|
||||
|
||||
func (p *BackendPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) {
|
||||
return &backendGRPCPluginClient{
|
||||
ret := &backendGRPCPluginClient{
|
||||
client: pb.NewBackendClient(c),
|
||||
clientConn: c,
|
||||
broker: broker,
|
||||
cleanupCh: make(chan struct{}),
|
||||
doneCtx: ctx,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Create the value and set the type
|
||||
ret.server = new(atomic.Value)
|
||||
ret.server.Store((*grpc.Server)(nil))
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ package plugin
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync/atomic"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
|
|
@ -28,8 +29,13 @@ type backendGRPCPluginClient struct {
|
|||
system logical.SystemView
|
||||
logger log.Logger
|
||||
|
||||
// This is used to signal to the Cleanup function that it can proceed
|
||||
// because we have a defined server
|
||||
cleanupCh chan struct{}
|
||||
|
||||
// server is the grpc server used for serving storage and sysview requests.
|
||||
server *grpc.Server
|
||||
server *atomic.Value
|
||||
|
||||
// clientConn is the underlying grpc connection to the server, we store it
|
||||
// so it can be cleaned up.
|
||||
clientConn *grpc.ClientConn
|
||||
|
|
@ -139,8 +145,16 @@ func (b *backendGRPCPluginClient) Cleanup(ctx context.Context) {
|
|||
defer cancel()
|
||||
|
||||
b.client.Cleanup(ctx, &pb.Empty{})
|
||||
if b.server != nil {
|
||||
b.server.GracefulStop()
|
||||
|
||||
// This will block until Setup has run the function to create a new server
|
||||
// in b.server. If we stop here before it has a chance to actually start
|
||||
// listening, when it starts listening it will immediatley error out and
|
||||
// exit, which is fine. Overall this ensures that we do not miss stopping
|
||||
// the server if it ends up being created after Cleanup is called.
|
||||
<-b.cleanupCh
|
||||
server := b.server.Load()
|
||||
if server != nil {
|
||||
server.(*grpc.Server).GracefulStop()
|
||||
}
|
||||
b.clientConn.Close()
|
||||
}
|
||||
|
|
@ -184,7 +198,8 @@ func (b *backendGRPCPluginClient) Setup(ctx context.Context, config *logical.Bac
|
|||
s := grpc.NewServer(opts...)
|
||||
pb.RegisterSystemViewServer(s, sysView)
|
||||
pb.RegisterStorageServer(s, storage)
|
||||
b.server = s
|
||||
b.server.Store(s)
|
||||
close(b.cleanupCh)
|
||||
return s
|
||||
}
|
||||
brokerID := b.broker.NextId()
|
||||
|
|
|
|||
Loading…
Reference in a new issue