Vendor deps

This commit is contained in:
Jeff Mitchell 2019-07-24 13:42:50 -04:00
parent 8bac81e209
commit ba5fe5d4f8
37 changed files with 983 additions and 382 deletions

View file

@ -17,6 +17,7 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
@ -74,9 +75,11 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@ -85,6 +88,7 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190129075346-302c3dd5f1cc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@ -98,6 +102,8 @@ golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
@ -105,7 +111,9 @@ google.golang.org/genproto v0.0.0-20190404172233-64821d5d2107/go.mod h1:VzzqZJRn
google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.19.1/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.22.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d/go.mod h1:cuepJuh7vyXfUyUwEgHQXw849cJrilpS5NeIjOWESAw=
gopkg.in/square/go-jose.v2 v2.3.1 h1:SK5KegNXmKmqE342YYN2qPHEnUYeoMiXXl1poUlI+o4=
gopkg.in/square/go-jose.v2 v2.3.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View file

@ -95,7 +95,7 @@ func (s *Weighted) Release(n int64) {
s.cur -= n
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: bad release")
panic("semaphore: released more than held")
}
for {
next := s.waiters.Front()

View file

@ -3,15 +3,15 @@ language: go
matrix:
include:
- go: 1.12.x
env: GO111MODULE=on
- go: 1.11.x
env: VET=1 GO111MODULE=on
- go: 1.11.x
- go: 1.12.x
env: RACE=1 GO111MODULE=on
- go: 1.11.x
- go: 1.12.x
env: RUN386=1
- go: 1.11.x
- go: 1.12.x
env: GRPC_GO_RETRY=on
- go: 1.11.x
env: GO111MODULE=on
- go: 1.10.x
- go: 1.9.x
- go: 1.9.x

View file

@ -11,22 +11,46 @@ In order to protect both you and ourselves, you will need to sign the
## Guidelines for Pull Requests
How to get your contributions merged smoothly and quickly.
- Create **small PRs** that are narrowly focused on **addressing a single concern**. We often times receive PRs that are trying to fix several things at a time, but only one fix is considered acceptable, nothing gets merged and both author's & review's time is wasted. Create more PRs to address different concerns and everyone will be happy.
- For speculative changes, consider opening an issue and discussing it first. If you are suggesting a behavioral or API change, consider starting with a [gRFC proposal](https://github.com/grpc/proposal).
- Provide a good **PR description** as a record of **what** change is being made and **why** it was made. Link to a github issue if it exists.
- Don't fix code style and formatting unless you are already changing that line to address an issue. PRs with irrelevant changes won't be merged. If you do want to fix formatting or style, do that in a separate PR.
- Unless your PR is trivial, you should expect there will be reviewer comments that you'll need to address before merging. We expect you to be reasonably responsive to those comments, otherwise the PR will be closed after 2-3 weeks of inactivity.
- Maintain **clean commit history** and use **meaningful commit messages**. PRs with messy commit history are difficult to review and won't be merged. Use `rebase -i upstream/master` to curate your commit history and/or to bring in latest changes from master (but avoid rebasing in the middle of a code review).
- Keep your PR up to date with upstream/master (if there are merge conflicts, we can't really merge your change).
- **All tests need to be passing** before your change can be merged. We recommend you **run tests locally** before creating your PR to catch breakages early on.
- Create **small PRs** that are narrowly focused on **addressing a single
concern**. We often times receive PRs that are trying to fix several things at
a time, but only one fix is considered acceptable, nothing gets merged and
both author's & review's time is wasted. Create more PRs to address different
concerns and everyone will be happy.
- The grpc package should only depend on standard Go packages and a small number
of exceptions. If your contribution introduces new dependencies which are NOT
in the [list](https://godoc.org/google.golang.org/grpc?imports), you need a
discussion with gRPC-Go authors and consultants.
- For speculative changes, consider opening an issue and discussing it first. If
you are suggesting a behavioral or API change, consider starting with a [gRFC
proposal](https://github.com/grpc/proposal).
- Provide a good **PR description** as a record of **what** change is being made
and **why** it was made. Link to a github issue if it exists.
- Don't fix code style and formatting unless you are already changing that line
to address an issue. PRs with irrelevant changes won't be merged. If you do
want to fix formatting or style, do that in a separate PR.
- Unless your PR is trivial, you should expect there will be reviewer comments
that you'll need to address before merging. We expect you to be reasonably
responsive to those comments, otherwise the PR will be closed after 2-3 weeks
of inactivity.
- Maintain **clean commit history** and use **meaningful commit messages**. PRs
with messy commit history are difficult to review and won't be merged. Use
`rebase -i upstream/master` to curate your commit history and/or to bring in
latest changes from master (but avoid rebasing in the middle of a code
review).
- Keep your PR up to date with upstream/master (if there are merge conflicts, we
can't really merge your change).
- **All tests need to be passing** before your change can be merged. We
recommend you **run tests locally** before creating your PR to catch breakages
early on.
- `make all` to test everything, OR
- `make vet` to catch vet errors
- `make test` to run the tests
@ -34,4 +58,3 @@ How to get your contributions merged smoothly and quickly.
- optional `make testappengine` to run tests with appengine
- Exceptions to the rules can be made if there's a compelling reason for doing so.

View file

@ -1,42 +1,96 @@
# gRPC-Go
[![Build Status](https://travis-ci.org/grpc/grpc-go.svg)](https://travis-ci.org/grpc/grpc-go) [![GoDoc](https://godoc.org/google.golang.org/grpc?status.svg)](https://godoc.org/google.golang.org/grpc) [![GoReportCard](https://goreportcard.com/badge/grpc/grpc-go)](https://goreportcard.com/report/github.com/grpc/grpc-go)
[![Build Status](https://travis-ci.org/grpc/grpc-go.svg)](https://travis-ci.org/grpc/grpc-go)
[![GoDoc](https://godoc.org/google.golang.org/grpc?status.svg)](https://godoc.org/google.golang.org/grpc)
[![GoReportCard](https://goreportcard.com/badge/grpc/grpc-go)](https://goreportcard.com/report/github.com/grpc/grpc-go)
The Go implementation of [gRPC](https://grpc.io/): A high performance, open source, general RPC framework that puts mobile and HTTP/2 first. For more information see the [gRPC Quick Start: Go](https://grpc.io/docs/quickstart/go.html) guide.
The Go implementation of [gRPC](https://grpc.io/): A high performance, open
source, general RPC framework that puts mobile and HTTP/2 first. For more
information see the [gRPC Quick Start:
Go](https://grpc.io/docs/quickstart/go.html) guide.
Installation
------------
To install this package, you need to install Go and setup your Go workspace on your computer. The simplest way to install the library is to run:
To install this package, you need to install Go and setup your Go workspace on
your computer. The simplest way to install the library is to run:
```
$ go get -u google.golang.org/grpc
```
With Go module support (Go 1.11+), simply `import "google.golang.org/grpc"` in
your source code and `go [build|run|test]` will automatically download the
necessary dependencies ([Go modules
ref](https://github.com/golang/go/wiki/Modules)).
If you are trying to access grpc-go from within China, please see the
[FAQ](#FAQ) below.
Prerequisites
-------------
gRPC-Go requires Go 1.9 or later.
Constraints
-----------
The grpc package should only depend on standard Go packages and a small number of exceptions. If your contribution introduces new dependencies which are NOT in the [list](https://godoc.org/google.golang.org/grpc?imports), you need a discussion with gRPC-Go authors and consultants.
Documentation
-------------
See [API documentation](https://godoc.org/google.golang.org/grpc) for package and API descriptions and find examples in the [examples directory](examples/).
- See [godoc](https://godoc.org/google.golang.org/grpc) for package and API
descriptions.
- Documentation on specific topics can be found in the [Documentation
directory](Documentation/).
- Examples can be found in the [examples directory](examples/).
Performance
-----------
See the current benchmarks for some of the languages supported in [this dashboard](https://performance-dot-grpc-testing.appspot.com/explore?dashboard=5652536396611584&widget=490377658&container=1286539696).
Performance benchmark data for grpc-go and other languages is maintained in
[this
dashboard](https://performance-dot-grpc-testing.appspot.com/explore?dashboard=5652536396611584&widget=490377658&container=1286539696).
Status
------
General Availability [Google Cloud Platform Launch Stages](https://cloud.google.com/terms/launch-stages).
General Availability [Google Cloud Platform Launch
Stages](https://cloud.google.com/terms/launch-stages).
FAQ
---
#### I/O Timeout Errors
The `golang.org` domain may be blocked from some countries. `go get` usually
produces an error like the following when this happens:
```
$ go get -u google.golang.org/grpc
package google.golang.org/grpc: unrecognized import path "google.golang.org/grpc" (https fetch: Get https://google.golang.org/grpc?go-get=1: dial tcp 216.239.37.1:443: i/o timeout)
```
To build Go code, there are several options:
- Set up a VPN and access google.golang.org through that.
- Without Go module support: `git clone` the repo manually:
```
git clone https://github.com/grpc/grpc-go.git $GOPATH/src/google.golang.org/grpc
```
You will need to do the same for all of grpc's dependencies in `golang.org`,
e.g. `golang.org/x/net`.
- With Go module support: it is possible to use the `replace` feature of `go
mod` to create aliases for golang.org packages. In your project's directory:
```
go mod edit -replace=google.golang.org/grpc=github.com/grpc/grpc-go@latest
go mod tidy
go mod vendor
go build -mod=vendor
```
Again, this will need to be done for all transitive dependencies hosted on
golang.org as well. Please refer to [this
issue](https://github.com/golang/go/issues/28652) in the golang repo regarding
this concern.
#### Compiling error, undefined: grpc.SupportPackageIsVersion
Please update proto package, gRPC package and rebuild the proto files:

View file

@ -22,6 +22,7 @@ package balancer
import (
"context"
"encoding/json"
"errors"
"net"
"strings"
@ -31,6 +32,7 @@ import (
"google.golang.org/grpc/internal"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
var (
@ -39,7 +41,10 @@ var (
)
// Register registers the balancer builder to the balancer map. b.Name
// (lowercased) will be used as the name registered with this builder.
// (lowercased) will be used as the name registered with this builder. If the
// Builder implements ConfigParser, ParseConfig will be called when new service
// configs are received by the resolver, and the result will be provided to the
// Balancer in UpdateClientConnState.
//
// NOTE: this function must only be called during initialization time (i.e. in
// an init() function), and is not thread-safe. If multiple Balancers are
@ -138,6 +143,8 @@ type ClientConn interface {
ResolveNow(resolver.ResolveNowOption)
// Target returns the dial target for this ClientConn.
//
// Deprecated: Use the Target field in the BuildOptions instead.
Target() string
}
@ -155,6 +162,10 @@ type BuildOptions struct {
Dialer func(context.Context, string) (net.Conn, error)
// ChannelzParentID is the entity parent's channelz unique identification number.
ChannelzParentID int64
// Target contains the parsed address info of the dial target. It is the same resolver.Target as
// passed to the resolver.
// See the documentation for the resolver.Target type for details about what it contains.
Target resolver.Target
}
// Builder creates a balancer.
@ -166,6 +177,14 @@ type Builder interface {
Name() string
}
// ConfigParser parses load balancer configs.
type ConfigParser interface {
// ParseConfig parses the JSON load balancer config provided into an
// internal form or returns an error if the config is invalid. For future
// compatibility reasons, unknown fields in the config should be ignored.
ParseConfig(LoadBalancingConfigJSON json.RawMessage) (serviceconfig.LoadBalancingConfig, error)
}
// PickOptions contains addition information for the Pick operation.
type PickOptions struct {
// FullMethodName is the method name that NewClientStream() is called
@ -264,7 +283,7 @@ type Balancer interface {
// non-nil error to gRPC.
//
// Deprecated: if V2Balancer is implemented by the Balancer,
// UpdateResolverState will be called instead.
// UpdateClientConnState will be called instead.
HandleResolvedAddrs([]resolver.Address, error)
// Close closes the balancer. The balancer is not required to call
// ClientConn.RemoveSubConn for its existing SubConns.
@ -277,14 +296,23 @@ type SubConnState struct {
// TODO: add last connection error
}
// ClientConnState describes the state of a ClientConn relevant to the
// balancer.
type ClientConnState struct {
ResolverState resolver.State
// The parsed load balancing configuration returned by the builder's
// ParseConfig method, if implemented.
BalancerConfig serviceconfig.LoadBalancingConfig
}
// V2Balancer is defined for documentation purposes. If a Balancer also
// implements V2Balancer, its UpdateResolverState method will be called instead
// of HandleResolvedAddrs and its UpdateSubConnState will be called instead of
// HandleSubConnStateChange.
// implements V2Balancer, its UpdateClientConnState method will be called
// instead of HandleResolvedAddrs and its UpdateSubConnState will be called
// instead of HandleSubConnStateChange.
type V2Balancer interface {
// UpdateResolverState is called by gRPC when the state of the resolver
// UpdateClientConnState is called by gRPC when the state of the ClientConn
// changes.
UpdateResolverState(resolver.State)
UpdateClientConnState(ClientConnState)
// UpdateSubConnState is called by gRPC when the state of a SubConn
// changes.
UpdateSubConnState(SubConn, SubConnState)

View file

@ -70,13 +70,13 @@ func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
panic("not implemented")
}
func (b *baseBalancer) UpdateResolverState(s resolver.State) {
// TODO: handle s.Err (log if not nil) once implemented.
// TODO: handle s.ServiceConfig?
grpclog.Infoln("base.baseBalancer: got new resolver state: ", s)
func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) {
// TODO: handle s.ResolverState.Err (log if not nil) once implemented.
// TODO: handle s.ResolverState.ServiceConfig?
grpclog.Infoln("base.baseBalancer: got new ClientConn state: ", s)
// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
addrsSet := make(map[resolver.Address]struct{})
for _, a := range s.Addresses {
for _, a := range s.ResolverState.Addresses {
addrsSet[a] = struct{}{}
if _, ok := b.subConns[a]; !ok {
// a is a new address (not existing in b.subConns).

View file

@ -88,7 +88,7 @@ type ccBalancerWrapper struct {
cc *ClientConn
balancer balancer.Balancer
stateChangeQueue *scStateUpdateBuffer
resolverUpdateCh chan *resolver.State
ccUpdateCh chan *balancer.ClientConnState
done chan struct{}
mu sync.Mutex
@ -99,7 +99,7 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui
ccb := &ccBalancerWrapper{
cc: cc,
stateChangeQueue: newSCStateUpdateBuffer(),
resolverUpdateCh: make(chan *resolver.State, 1),
ccUpdateCh: make(chan *balancer.ClientConnState, 1),
done: make(chan struct{}),
subConns: make(map[*acBalancerWrapper]struct{}),
}
@ -126,7 +126,7 @@ func (ccb *ccBalancerWrapper) watcher() {
} else {
ccb.balancer.HandleSubConnStateChange(t.sc, t.state)
}
case s := <-ccb.resolverUpdateCh:
case s := <-ccb.ccUpdateCh:
select {
case <-ccb.done:
ccb.balancer.Close()
@ -134,9 +134,9 @@ func (ccb *ccBalancerWrapper) watcher() {
default:
}
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
ub.UpdateResolverState(*s)
ub.UpdateClientConnState(*s)
} else {
ccb.balancer.HandleResolvedAddrs(s.Addresses, nil)
ccb.balancer.HandleResolvedAddrs(s.ResolverState.Addresses, nil)
}
case <-ccb.done:
}
@ -151,9 +151,11 @@ func (ccb *ccBalancerWrapper) watcher() {
for acbw := range scs {
ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
}
ccb.UpdateBalancerState(connectivity.Connecting, nil)
return
default:
}
ccb.cc.firstResolveEvent.Fire()
}
}
@ -178,9 +180,10 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s co
})
}
func (ccb *ccBalancerWrapper) updateResolverState(s resolver.State) {
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) {
if ccb.cc.curBalancerName != grpclbName {
// Filter any grpclb addresses since we don't have the grpclb balancer.
s := ccs.ResolverState
for i := 0; i < len(s.Addresses); {
if s.Addresses[i].Type == resolver.GRPCLB {
copy(s.Addresses[i:], s.Addresses[i+1:])
@ -191,10 +194,10 @@ func (ccb *ccBalancerWrapper) updateResolverState(s resolver.State) {
}
}
select {
case <-ccb.resolverUpdateCh:
case <-ccb.ccUpdateCh:
default:
}
ccb.resolverUpdateCh <- &s
ccb.ccUpdateCh <- ccs
}
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {

View file

@ -20,7 +20,6 @@ package grpc
import (
"context"
"strings"
"sync"
"google.golang.org/grpc/balancer"
@ -34,13 +33,7 @@ type balancerWrapperBuilder struct {
}
func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
targetAddr := cc.Target()
targetSplitted := strings.Split(targetAddr, ":///")
if len(targetSplitted) >= 2 {
targetAddr = targetSplitted[1]
}
bwb.b.Start(targetAddr, BalancerConfig{
bwb.b.Start(opts.Target.Endpoint, BalancerConfig{
DialCreds: opts.DialCreds,
Dialer: opts.Dialer,
})
@ -49,7 +42,7 @@ func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.B
balancer: bwb.b,
pickfirst: pickfirst,
cc: cc,
targetAddr: targetAddr,
targetAddr: opts.Target.Endpoint,
startCh: make(chan struct{}),
conns: make(map[resolver.Address]balancer.SubConn),
connSt: make(map[balancer.SubConn]*scState),
@ -120,7 +113,7 @@ func (bw *balancerWrapper) lbWatcher() {
}
for addrs := range notifyCh {
grpclog.Infof("balancerWrapper: got update addr from Notify: %v\n", addrs)
grpclog.Infof("balancerWrapper: got update addr from Notify: %v", addrs)
if bw.pickfirst {
var (
oldA resolver.Address

View file

@ -45,6 +45,7 @@ import (
"google.golang.org/grpc/resolver"
_ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
_ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
)
@ -137,6 +138,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
opt.apply(&cc.dopts)
}
chainUnaryClientInterceptors(cc)
chainStreamClientInterceptors(cc)
defer func() {
if err != nil {
cc.Close()
@ -290,6 +294,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
}
// Build the resolver.
@ -327,6 +332,68 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
return cc, nil
}
// chainUnaryClientInterceptors chains all unary client interceptors into one.
func chainUnaryClientInterceptors(cc *ClientConn) {
interceptors := cc.dopts.chainUnaryInts
// Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
// be executed before any other chained interceptors.
if cc.dopts.unaryInt != nil {
interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
}
var chainedInt UnaryClientInterceptor
if len(interceptors) == 0 {
chainedInt = nil
} else if len(interceptors) == 1 {
chainedInt = interceptors[0]
} else {
chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
}
}
cc.dopts.unaryInt = chainedInt
}
// getChainUnaryInvoker recursively generate the chained unary invoker.
func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
if curr == len(interceptors)-1 {
return finalInvoker
}
return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
}
}
// chainStreamClientInterceptors chains all stream client interceptors into one.
func chainStreamClientInterceptors(cc *ClientConn) {
interceptors := cc.dopts.chainStreamInts
// Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
// be executed before any other chained interceptors.
if cc.dopts.streamInt != nil {
interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
}
var chainedInt StreamClientInterceptor
if len(interceptors) == 0 {
chainedInt = nil
} else if len(interceptors) == 1 {
chainedInt = interceptors[0]
} else {
chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
}
}
cc.dopts.streamInt = chainedInt
}
// getChainStreamer recursively generate the chained client stream constructor.
func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
if curr == len(interceptors)-1 {
return finalStreamer
}
return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
}
}
// connectivityStateManager keeps the connectivity.State of ClientConn.
// This struct will eventually be exported so the balancers can access it.
type connectivityStateManager struct {
@ -466,24 +533,6 @@ func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
}
}
// gRPC should resort to default service config when:
// * resolver service config is disabled
// * or, resolver does not return a service config or returns an invalid one.
func (cc *ClientConn) fallbackToDefaultServiceConfig(sc string) bool {
if cc.dopts.disableServiceConfig {
return true
}
// The logic below is temporary, will be removed once we change the resolver.State ServiceConfig field type.
// Right now, we assume that empty service config string means resolver does not return a config.
if sc == "" {
return true
}
// TODO: the logic below is temporary. Once we finish the logic to validate service config
// in resolver, we will replace the logic below.
_, err := parseServiceConfig(sc)
return err != nil
}
func (cc *ClientConn) updateResolverState(s resolver.State) error {
cc.mu.Lock()
defer cc.mu.Unlock()
@ -494,54 +543,47 @@ func (cc *ClientConn) updateResolverState(s resolver.State) error {
return nil
}
if cc.fallbackToDefaultServiceConfig(s.ServiceConfig) {
if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
if cc.dopts.defaultServiceConfig != nil && cc.sc == nil {
cc.applyServiceConfig(cc.dopts.defaultServiceConfig)
}
} else {
// TODO: the parsing logic below will be moved inside resolver.
sc, err := parseServiceConfig(s.ServiceConfig)
if err != nil {
return err
}
if cc.sc == nil || cc.sc.rawJSONString != s.ServiceConfig {
cc.applyServiceConfig(sc)
}
}
// update the service config that will be sent to balancer.
if cc.sc != nil {
s.ServiceConfig = cc.sc.rawJSONString
} else if sc, ok := s.ServiceConfig.(*ServiceConfig); ok {
cc.applyServiceConfig(sc)
}
var balCfg serviceconfig.LoadBalancingConfig
if cc.dopts.balancerBuilder == nil {
// Only look at balancer types and switch balancer if balancer dial
// option is not set.
var isGRPCLB bool
for _, a := range s.Addresses {
if a.Type == resolver.GRPCLB {
isGRPCLB = true
break
}
}
var newBalancerName string
// TODO: use new loadBalancerConfig field with appropriate priority.
if isGRPCLB {
newBalancerName = grpclbName
} else if cc.sc != nil && cc.sc.LB != nil {
newBalancerName = *cc.sc.LB
if cc.sc != nil && cc.sc.lbConfig != nil {
newBalancerName = cc.sc.lbConfig.name
balCfg = cc.sc.lbConfig.cfg
} else {
newBalancerName = PickFirstBalancerName
var isGRPCLB bool
for _, a := range s.Addresses {
if a.Type == resolver.GRPCLB {
isGRPCLB = true
break
}
}
if isGRPCLB {
newBalancerName = grpclbName
} else if cc.sc != nil && cc.sc.LB != nil {
newBalancerName = *cc.sc.LB
} else {
newBalancerName = PickFirstBalancerName
}
}
cc.switchBalancer(newBalancerName)
} else if cc.balancerWrapper == nil {
// Balancer dial option was set, and this is the first time handling
// resolved addresses. Build a balancer with dopts.balancerBuilder.
cc.curBalancerName = cc.dopts.balancerBuilder.Name()
cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
}
cc.balancerWrapper.updateResolverState(s)
cc.firstResolveEvent.Fire()
cc.balancerWrapper.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
return nil
}
@ -554,7 +596,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State) error {
//
// Caller must hold cc.mu.
func (cc *ClientConn) switchBalancer(name string) {
if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) {
if strings.EqualFold(cc.curBalancerName, name) {
return
}
@ -693,6 +735,8 @@ func (ac *addrConn) connect() error {
ac.mu.Unlock()
return nil
}
// Update connectivity state within the lock to prevent subsequent or
// concurrent calls from resetting the transport more than once.
ac.updateConnectivityState(connectivity.Connecting)
ac.mu.Unlock()
@ -703,7 +747,16 @@ func (ac *addrConn) connect() error {
// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
//
// It checks whether current connected address of ac is in the new addrs list.
// If ac is Connecting, it returns false. The caller should tear down the ac and
// create a new one. Note that the backoff will be reset when this happens.
//
// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
// addresses will be picked up by retry in the next iteration after backoff.
//
// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
//
// If ac is Ready, it checks whether current connected address of ac is in the
// new addrs list.
// - If true, it updates ac.addrs and returns true. The ac will keep using
// the existing connection.
// - If false, it does nothing and returns false.
@ -711,17 +764,18 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
ac.mu.Lock()
defer ac.mu.Unlock()
grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
if ac.state == connectivity.Shutdown {
if ac.state == connectivity.Shutdown ||
ac.state == connectivity.TransientFailure ||
ac.state == connectivity.Idle {
ac.addrs = addrs
return true
}
// Unless we're busy reconnecting already, let's reconnect from the top of
// the list.
if ac.state != connectivity.Ready {
if ac.state == connectivity.Connecting {
return false
}
// ac.state is Ready, try to find the connected address.
var curAddrFound bool
for _, a := range addrs {
if reflect.DeepEqual(ac.curAddr, a) {
@ -970,6 +1024,9 @@ func (ac *addrConn) resetTransport() {
// The spec doesn't mention what should be done for multiple addresses.
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
connectDeadline := time.Now().Add(dialDuration)
ac.updateConnectivityState(connectivity.Connecting)
ac.transport = nil
ac.mu.Unlock()
newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
@ -1012,27 +1069,8 @@ func (ac *addrConn) resetTransport() {
ac.transport = newTr
ac.backoffIdx = 0
healthCheckConfig := ac.cc.healthCheckConfig()
// LB channel health checking is only enabled when all the four requirements below are met:
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
// 3. a service config with non-empty healthCheckConfig field is provided,
// 4. the current load balancer allows it.
hctx, hcancel := context.WithCancel(ac.ctx)
healthcheckManagingState := false
if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
if ac.cc.dopts.healthCheckFunc == nil {
// TODO: add a link to the health check doc in the error message.
grpclog.Error("the client side LB channel health check function has not been set.")
} else {
// TODO(deklerk) refactor to just return transport
go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
healthcheckManagingState = true
}
}
if !healthcheckManagingState {
ac.updateConnectivityState(connectivity.Ready)
}
ac.startHealthCheck(hctx)
ac.mu.Unlock()
// Block until the created transport is down. And when this happens,
@ -1066,8 +1104,6 @@ func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.T
ac.mu.Unlock()
return nil, resolver.Address{}, nil, errConnClosing
}
ac.updateConnectivityState(connectivity.Connecting)
ac.transport = nil
ac.cc.mu.RLock()
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
@ -1158,42 +1194,83 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
return newTr, reconnect, nil
}
func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.ClientTransport, addr resolver.Address, serviceName string) {
// Set up the health check helper functions
newStream := func() (interface{}, error) {
return ac.newClientStream(ctx, &StreamDesc{ServerStreams: true}, "/grpc.health.v1.Health/Watch", newTr)
// startHealthCheck starts the health checking stream (RPC) to watch the health
// stats of this connection if health checking is requested and configured.
//
// LB channel health checking is enabled when all requirements below are met:
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package
// 3. a service config with non-empty healthCheckConfig field is provided
// 4. the load balancer requests it
//
// It sets addrConn to READY if the health checking stream is not started.
//
// Caller must hold ac.mu.
func (ac *addrConn) startHealthCheck(ctx context.Context) {
var healthcheckManagingState bool
defer func() {
if !healthcheckManagingState {
ac.updateConnectivityState(connectivity.Ready)
}
}()
if ac.cc.dopts.disableHealthCheck {
return
}
firstReady := true
reportHealth := func(ok bool) {
healthCheckConfig := ac.cc.healthCheckConfig()
if healthCheckConfig == nil {
return
}
if !ac.scopts.HealthCheckEnabled {
return
}
healthCheckFunc := ac.cc.dopts.healthCheckFunc
if healthCheckFunc == nil {
// The health package is not imported to set health check function.
//
// TODO: add a link to the health check doc in the error message.
grpclog.Error("Health check is requested but health check function is not set.")
return
}
healthcheckManagingState = true
// Set up the health check helper functions.
currentTr := ac.transport
newStream := func(method string) (interface{}, error) {
ac.mu.Lock()
if ac.transport != currentTr {
ac.mu.Unlock()
return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
}
ac.mu.Unlock()
return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
}
setConnectivityState := func(s connectivity.State) {
ac.mu.Lock()
defer ac.mu.Unlock()
if ac.transport != newTr {
if ac.transport != currentTr {
return
}
if ok {
if firstReady {
firstReady = false
ac.curAddr = addr
}
ac.updateConnectivityState(connectivity.Ready)
} else {
ac.updateConnectivityState(connectivity.TransientFailure)
}
ac.updateConnectivityState(s)
}
err := ac.cc.dopts.healthCheckFunc(ctx, newStream, reportHealth, serviceName)
if err != nil {
if status.Code(err) == codes.Unimplemented {
if channelz.IsOn() {
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled",
Severity: channelz.CtError,
})
// Start the health checking stream.
go func() {
err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
if err != nil {
if status.Code(err) == codes.Unimplemented {
if channelz.IsOn() {
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled",
Severity: channelz.CtError,
})
}
grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
} else {
grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
}
grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
} else {
grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
}
}
}()
}
func (ac *addrConn) resetConnectBackoff() {

View file

@ -132,7 +132,8 @@ const (
// Unavailable indicates the service is currently unavailable.
// This is a most likely a transient condition and may be corrected
// by retrying with a backoff.
// by retrying with a backoff. Note that it is not always safe to retry
// non-idempotent operations.
//
// See litmus test above for deciding between FailedPrecondition,
// Aborted, and Unavailable.

View file

@ -278,24 +278,22 @@ type ChannelzSecurityValue interface {
// TLSChannelzSecurityValue defines the struct that TLS protocol should return
// from GetSecurityValue(), containing security info like cipher and certificate used.
type TLSChannelzSecurityValue struct {
ChannelzSecurityValue
StandardName string
LocalCertificate []byte
RemoteCertificate []byte
}
func (*TLSChannelzSecurityValue) isChannelzSecurityValue() {}
// OtherChannelzSecurityValue defines the struct that non-TLS protocol should return
// from GetSecurityValue(), which contains protocol specific security info. Note
// the Value field will be sent to users of channelz requesting channel info, and
// thus sensitive info should better be avoided.
type OtherChannelzSecurityValue struct {
ChannelzSecurityValue
Name string
Value proto.Message
}
func (*OtherChannelzSecurityValue) isChannelzSecurityValue() {}
var cipherSuiteLookup = map[uint16]string{
tls.TLS_RSA_WITH_RC4_128_SHA: "TLS_RSA_WITH_RC4_128_SHA",
tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA: "TLS_RSA_WITH_3DES_EDE_CBC_SHA",

View file

@ -39,8 +39,12 @@ import (
// dialOptions configure a Dial call. dialOptions are set by the DialOption
// values passed to Dial.
type dialOptions struct {
unaryInt UnaryClientInterceptor
streamInt StreamClientInterceptor
unaryInt UnaryClientInterceptor
streamInt StreamClientInterceptor
chainUnaryInts []UnaryClientInterceptor
chainStreamInts []StreamClientInterceptor
cp Compressor
dc Decompressor
bs backoff.Strategy
@ -414,6 +418,17 @@ func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
})
}
// WithChainUnaryInterceptor returns a DialOption that specifies the chained
// interceptor for unary RPCs. The first interceptor will be the outer most,
// while the last interceptor will be the inner most wrapper around the real call.
// All interceptors added by this method will be chained, and the interceptor
// defined by WithUnaryInterceptor will always be prepended to the chain.
func WithChainUnaryInterceptor(interceptors ...UnaryClientInterceptor) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
})
}
// WithStreamInterceptor returns a DialOption that specifies the interceptor for
// streaming RPCs.
func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
@ -422,6 +437,17 @@ func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
})
}
// WithChainStreamInterceptor returns a DialOption that specifies the chained
// interceptor for unary RPCs. The first interceptor will be the outer most,
// while the last interceptor will be the inner most wrapper around the real call.
// All interceptors added by this method will be chained, and the interceptor
// defined by WithStreamInterceptor will always be prepended to the chain.
func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.chainStreamInts = append(o.chainStreamInts, interceptors...)
})
}
// WithAuthority returns a DialOption that specifies the value to be used as the
// :authority pseudo-header. This value only works with WithInsecure and has no
// effect if TransportCredentials are present.
@ -440,12 +466,12 @@ func WithChannelzParentID(id int64) DialOption {
})
}
// WithDisableServiceConfig returns a DialOption that causes grpc to ignore any
// WithDisableServiceConfig returns a DialOption that causes gRPC to ignore any
// service config provided by the resolver and provides a hint to the resolver
// to not fetch service configs.
//
// Note that, this dial option only disables service config from resolver. If
// default service config is provided, grpc will use the default service config.
// Note that this dial option only disables service config from resolver. If
// default service config is provided, gRPC will use the default service config.
func WithDisableServiceConfig() DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.disableServiceConfig = true

View file

@ -7,13 +7,13 @@ require (
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/mock v1.1.1
github.com/golang/protobuf v1.2.0
github.com/google/go-cmp v0.2.0
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3
golang.org/x/net v0.0.0-20190311183353-d8887717615a
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f // indirect
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a
golang.org/x/tools v0.0.0-20190311212946-11955173bddd
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135
google.golang.org/appengine v1.1.0 // indirect
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc
)

12
vendor/google.golang.org/grpc/go.sum generated vendored
View file

@ -10,6 +10,8 @@ github.com/golang/mock v1.1.1 h1:G5FRp8JnTd7RQH5kemVNlMeyXQAztQ3mOWV95KxsXH8=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 h1:XQyxROzUlZH+WIQwySDgnISgOivlhjIEwaQaJEJrrN0=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
@ -17,17 +19,19 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJV
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be h1:vEDujvNQGv4jgYKudGeI/+DAX4Jffq6hpD55MmoEvKs=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd h1:/e+gpKk9r3dJobndpTytxS2gOy6m5uvpg+ISQoEcusQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 h1:5Beo0mZN8dRzgrMMkDp0jc8YXQKx9DiJ2k1dkvGsn5A=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
google.golang.org/appengine v1.1.0 h1:igQkv0AAhEIvTEpD5LIpAfav2eeVO9HBTjvKHVJPRSs=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099 h1:XJP7lxbSxWLOMNdBE4B/STaqVy6L73o0knwj2vIlxnw=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc h1:/hemPrYIhOhy8zYrNj+069zDB68us2sMGsfkFJO0iZs=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View file

@ -26,6 +26,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
@ -51,7 +52,11 @@ func init() {
internal.HealthCheckFunc = clientHealthCheck
}
func clientHealthCheck(ctx context.Context, newStream func() (interface{}, error), reportHealth func(bool), service string) error {
const healthCheckMethod = "/grpc.health.v1.Health/Watch"
// This function implements the protocol defined at:
// https://github.com/grpc/grpc/blob/master/doc/health-checking.md
func clientHealthCheck(ctx context.Context, newStream func(string) (interface{}, error), setConnectivityState func(connectivity.State), service string) error {
tryCnt := 0
retryConnection:
@ -65,7 +70,8 @@ retryConnection:
if ctx.Err() != nil {
return nil
}
rawS, err := newStream()
setConnectivityState(connectivity.Connecting)
rawS, err := newStream(healthCheckMethod)
if err != nil {
continue retryConnection
}
@ -73,7 +79,7 @@ retryConnection:
s, ok := rawS.(grpc.ClientStream)
// Ideally, this should never happen. But if it happens, the server is marked as healthy for LBing purposes.
if !ok {
reportHealth(true)
setConnectivityState(connectivity.Ready)
return fmt.Errorf("newStream returned %v (type %T); want grpc.ClientStream", rawS, rawS)
}
@ -89,19 +95,23 @@ retryConnection:
// Reports healthy for the LBing purposes if health check is not implemented in the server.
if status.Code(err) == codes.Unimplemented {
reportHealth(true)
setConnectivityState(connectivity.Ready)
return err
}
// Reports unhealthy if server's Watch method gives an error other than UNIMPLEMENTED.
if err != nil {
reportHealth(false)
setConnectivityState(connectivity.TransientFailure)
continue retryConnection
}
// As a message has been received, removes the need for backoff for the next retry by reseting the try count.
tryCnt = 0
reportHealth(resp.Status == healthpb.HealthCheckResponse_SERVING)
if resp.Status == healthpb.HealthCheckResponse_SERVING {
setConnectivityState(connectivity.Ready)
} else {
setConnectivityState(connectivity.TransientFailure)
}
}
}
}

View file

@ -24,6 +24,7 @@
package channelz
import (
"fmt"
"sort"
"sync"
"sync/atomic"
@ -95,9 +96,14 @@ func (d *dbWrapper) get() *channelMap {
// NewChannelzStorage initializes channelz data storage and id generator.
//
// This function returns a cleanup function to wait for all channelz state to be reset by the
// grpc goroutines when those entities get closed. By using this cleanup function, we make sure tests
// don't mess up each other, i.e. lingering goroutine from previous test doing entity removal happen
// to remove some entity just register by the new test, since the id space is the same.
//
// Note: This function is exported for testing purpose only. User should not call
// it in most cases.
func NewChannelzStorage() {
func NewChannelzStorage() (cleanup func() error) {
db.set(&channelMap{
topLevelChannels: make(map[int64]struct{}),
channels: make(map[int64]*channel),
@ -107,6 +113,28 @@ func NewChannelzStorage() {
subChannels: make(map[int64]*subChannel),
})
idGen.reset()
return func() error {
var err error
cm := db.get()
if cm == nil {
return nil
}
for i := 0; i < 1000; i++ {
cm.mu.Lock()
if len(cm.topLevelChannels) == 0 && len(cm.servers) == 0 && len(cm.channels) == 0 && len(cm.subChannels) == 0 && len(cm.listenSockets) == 0 && len(cm.normalSockets) == 0 {
cm.mu.Unlock()
// all things stored in the channelz map have been cleared.
return nil
}
cm.mu.Unlock()
time.Sleep(10 * time.Millisecond)
}
cm.mu.Lock()
err = fmt.Errorf("after 10s the channelz map has not been cleaned up yet, topchannels: %d, servers: %d, channels: %d, subchannels: %d, listen sockets: %d, normal sockets: %d", len(cm.topLevelChannels), len(cm.servers), len(cm.channels), len(cm.subChannels), len(cm.listenSockets), len(cm.normalSockets))
cm.mu.Unlock()
return err
}
}
// GetTopChannels returns a slice of top channel's ChannelMetric, along with a

View file

@ -23,6 +23,8 @@ package internal
import (
"context"
"time"
"google.golang.org/grpc/connectivity"
)
var (
@ -37,10 +39,25 @@ var (
// KeepaliveMinPingTime is the minimum ping interval. This must be 10s by
// default, but tests may wish to set it lower for convenience.
KeepaliveMinPingTime = 10 * time.Second
// ParseServiceConfig is a function to parse JSON service configs into
// opaque data structures.
ParseServiceConfig func(sc string) (interface{}, error)
// StatusRawProto is exported by status/status.go. This func returns a
// pointer to the wrapped Status proto for a given status.Status without a
// call to proto.Clone(). The returned Status proto should not be mutated by
// the caller.
StatusRawProto interface{} // func (*status.Status) *spb.Status
)
// HealthChecker defines the signature of the client-side LB channel health checking function.
type HealthChecker func(ctx context.Context, newStream func() (interface{}, error), reportHealth func(bool), serviceName string) error
//
// The implementation is expected to create a health checking RPC stream by
// calling newStream(), watch for the health status of serviceName, and report
// it's health back by calling setConnectivityState().
//
// The health checking protocol is defined at:
// https://github.com/grpc/grpc/blob/master/doc/health-checking.md
type HealthChecker func(ctx context.Context, newStream func(string) (interface{}, error), setConnectivityState func(connectivity.State), serviceName string) error
const (
// CredsBundleModeFallback switches GoogleDefaultCreds to fallback mode.

View file

@ -24,6 +24,7 @@
package transport
import (
"bytes"
"context"
"errors"
"fmt"
@ -347,7 +348,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
ht.stats.HandleRPC(s.ctx, inHeader)
}
s.trReader = &transportReader{
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf},
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},
windowHandler: func(int) {},
}
@ -361,7 +362,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
for buf := make([]byte, readSize); ; {
n, err := req.Body.Read(buf)
if n > 0 {
s.buf.put(recvMsg{data: buf[:n:n]})
s.buf.put(recvMsg{buffer: bytes.NewBuffer(buf[:n:n])})
buf = buf[n:]
}
if err != nil {

View file

@ -117,6 +117,8 @@ type http2Client struct {
onGoAway func(GoAwayReason)
onClose func()
bufferPool *bufferPool
}
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
@ -249,6 +251,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
onGoAway: onGoAway,
onClose: onClose,
keepaliveEnabled: keepaliveEnabled,
bufferPool: newBufferPool(),
}
t.controlBuf = newControlBuffer(t.ctxDone)
if opts.InitialWindowSize >= defaultWindowSize {
@ -367,6 +370,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
closeStream: func(err error) {
t.CloseStream(s, err)
},
freeBuffer: t.bufferPool.put,
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
@ -437,6 +441,15 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
var k string
for k, vv := range md {
// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
if isReservedHeader(k) {
continue
}
for _, v := range vv {
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
}
for _, vv := range added {
for i, v := range vv {
if i%2 == 0 {
@ -450,15 +463,6 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)})
}
}
for k, vv := range md {
// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
if isReservedHeader(k) {
continue
}
for _, v := range vv {
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
}
}
if md, ok := t.md.(*metadata.MD); ok {
for k, vv := range *md {
@ -549,7 +553,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
s.write(recvMsg{err: err})
close(s.done)
// If headerChan isn't closed, then close it.
if atomic.SwapUint32(&s.headerDone, 1) == 0 {
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
close(s.headerChan)
}
@ -713,7 +717,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
s.write(recvMsg{err: err})
}
// If headerChan isn't closed, then close it.
if atomic.SwapUint32(&s.headerDone, 1) == 0 {
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
s.noHeaders = true
close(s.headerChan)
}
@ -794,21 +798,21 @@ func (t *http2Client) Close() error {
// stream is closed. If there are no active streams, the transport is closed
// immediately. This does nothing if the transport is already draining or
// closing.
func (t *http2Client) GracefulClose() error {
func (t *http2Client) GracefulClose() {
t.mu.Lock()
// Make sure we move to draining only from active.
if t.state == draining || t.state == closing {
t.mu.Unlock()
return nil
return
}
t.state = draining
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
return t.Close()
t.Close()
return
}
t.controlBuf.put(&incomingGoAway{})
return nil
}
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
@ -946,9 +950,10 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
if len(f.Data()) > 0 {
data := make([]byte, len(f.Data()))
copy(data, f.Data())
s.write(recvMsg{data: data})
buffer := t.bufferPool.get()
buffer.Reset()
buffer.Write(f.Data())
s.write(recvMsg{buffer: buffer})
}
}
// The server has closed the stream without sending trailers. Record that
@ -1142,26 +1147,24 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
}
endStream := frame.StreamEnded()
atomic.StoreUint32(&s.bytesReceived, 1)
initialHeader := atomic.SwapUint32(&s.headerDone, 1) == 0
initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0
if !initialHeader && !endStream {
// As specified by RFC 7540, a HEADERS frame (and associated CONTINUATION frames) can only appear
// at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
// As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false)
return
}
state := &decodeState{}
// Initialize isGRPC value to be !initialHeader, since if a gRPC ResponseHeader has been received
// which indicates peer speaking gRPC, we are in gRPC mode.
// Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
state.data.isGRPC = !initialHeader
if err := state.decodeHeader(frame); err != nil {
t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream)
return
}
var isHeader bool
isHeader := false
defer func() {
if t.statsHandler != nil {
if isHeader {
@ -1180,10 +1183,10 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
}
}()
// If headers haven't been received yet.
if initialHeader {
// If headerChan hasn't been closed yet
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
if !endStream {
// Headers frame is ResponseHeader.
// HEADERS frame block carries a Response-Headers.
isHeader = true
// These values can be set without any synchronization because
// stream goroutine will read it only after seeing a closed
@ -1192,14 +1195,17 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
if len(state.data.mdata) > 0 {
s.header = state.data.mdata
}
close(s.headerChan)
return
} else {
// HEADERS frame block carries a Trailers-Only.
s.noHeaders = true
}
// Headers frame is Trailers-only.
s.noHeaders = true
close(s.headerChan)
}
if !endStream {
return
}
// if client received END_STREAM from server while stream was still active, send RST_STREAM
rst := s.getState() == streamActive
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true)

View file

@ -35,9 +35,11 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/keepalive"
@ -55,6 +57,9 @@ var (
// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
// than the limit set by peer.
ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
// statusRawProto is a function to get to the raw status proto wrapped in a
// status.Status without a proto.Clone().
statusRawProto = internal.StatusRawProto.(func(*status.Status) *spb.Status)
)
// http2Server implements the ServerTransport interface with HTTP2.
@ -119,6 +124,7 @@ type http2Server struct {
// Fields below are for channelz metric collection.
channelzID int64 // channelz unique identification number
czData *channelzData
bufferPool *bufferPool
}
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
@ -220,6 +226,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
kep: kep,
initialWindowSize: iwz,
czData: new(channelzData),
bufferPool: newBufferPool(),
}
t.controlBuf = newControlBuffer(t.ctxDone)
if dynamicWindow {
@ -405,9 +412,10 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
s.trReader = &transportReader{
reader: &recvBufferReader{
ctx: s.ctx,
ctxDone: s.ctxDone,
recv: s.buf,
ctx: s.ctx,
ctxDone: s.ctxDone,
recv: s.buf,
freeBuffer: t.bufferPool.put,
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
@ -591,9 +599,10 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
if len(f.Data()) > 0 {
data := make([]byte, len(f.Data()))
copy(data, f.Data())
s.write(recvMsg{data: data})
buffer := t.bufferPool.get()
buffer.Reset()
buffer.Write(f.Data())
s.write(recvMsg{buffer: buffer})
}
}
if f.Header().Flags.Has(http2.FlagDataEndStream) {
@ -817,7 +826,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
if p := st.Proto(); p != nil && len(p.Details) > 0 {
if p := statusRawProto(st); p != nil && len(p.Details) > 0 {
stBytes, err := proto.Marshal(p)
if err != nil {
// TODO: return error instead, when callers are able to handle it.
@ -1019,13 +1028,7 @@ func (t *http2Server) Close() error {
}
// deleteStream deletes the stream s from transport's active streams.
func (t *http2Server) deleteStream(s *Stream, eosReceived bool) (oldState streamState) {
oldState = s.swapState(streamDone)
if oldState == streamDone {
// If the stream was already done, return.
return oldState
}
func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
// In case stream sending and receiving are invoked in separate
// goroutines (e.g., bi-directional streaming), cancel needs to be
// called to interrupt the potential blocking on other goroutines.
@ -1047,15 +1050,13 @@ func (t *http2Server) deleteStream(s *Stream, eosReceived bool) (oldState stream
atomic.AddInt64(&t.czData.streamsFailed, 1)
}
}
return oldState
}
// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
oldState := t.deleteStream(s, eosReceived)
// If the stream is already closed, then don't put trailing header to controlbuf.
oldState := s.swapState(streamDone)
if oldState == streamDone {
// If the stream was already done, return.
return
}
@ -1063,14 +1064,18 @@ func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, h
streamID: s.id,
rst: rst,
rstCode: rstCode,
onWrite: func() {},
onWrite: func() {
t.deleteStream(s, eosReceived)
},
}
t.controlBuf.put(hdr)
}
// closeStream clears the footprint of a stream when the stream is not needed any more.
func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
s.swapState(streamDone)
t.deleteStream(s, eosReceived)
t.controlBuf.put(&cleanupStream{
streamID: s.id,
rst: rst,

View file

@ -22,6 +22,7 @@
package transport
import (
"bytes"
"context"
"errors"
"fmt"
@ -39,10 +40,32 @@ import (
"google.golang.org/grpc/tap"
)
type bufferPool struct {
pool sync.Pool
}
func newBufferPool() *bufferPool {
return &bufferPool{
pool: sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
},
}
}
func (p *bufferPool) get() *bytes.Buffer {
return p.pool.Get().(*bytes.Buffer)
}
func (p *bufferPool) put(b *bytes.Buffer) {
p.pool.Put(b)
}
// recvMsg represents the received msg from the transport. All transport
// protocol specific info has been removed.
type recvMsg struct {
data []byte
buffer *bytes.Buffer
// nil: received some data
// io.EOF: stream is completed. data is nil.
// other non-nil error: transport failure. data is nil.
@ -117,8 +140,9 @@ type recvBufferReader struct {
ctx context.Context
ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
recv *recvBuffer
last []byte // Stores the remaining data in the previous calls.
last *bytes.Buffer // Stores the remaining data in the previous calls.
err error
freeBuffer func(*bytes.Buffer)
}
// Read reads the next len(p) bytes from last. If last is drained, it tries to
@ -128,10 +152,13 @@ func (r *recvBufferReader) Read(p []byte) (n int, err error) {
if r.err != nil {
return 0, r.err
}
if r.last != nil && len(r.last) > 0 {
if r.last != nil {
// Read remaining data left in last call.
copied := copy(p, r.last)
r.last = r.last[copied:]
copied, _ := r.last.Read(p)
if r.last.Len() == 0 {
r.freeBuffer(r.last)
r.last = nil
}
return copied, nil
}
if r.closeStream != nil {
@ -170,8 +197,13 @@ func (r *recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error
if m.err != nil {
return 0, m.err
}
copied := copy(p, m.data)
r.last = m.data[copied:]
copied, _ := m.buffer.Read(p)
if m.buffer.Len() == 0 {
r.freeBuffer(m.buffer)
r.last = nil
} else {
r.last = m.buffer
}
return copied, nil
}
@ -204,8 +236,8 @@ type Stream struct {
// is used to adjust flow control, if needed.
requestRead func(int)
headerChan chan struct{} // closed to indicate the end of header metadata.
headerDone uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
headerChan chan struct{} // closed to indicate the end of header metadata.
headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
// hdrMu protects header and trailer metadata on the server-side.
hdrMu sync.Mutex
@ -578,9 +610,12 @@ type ClientTransport interface {
// is called only once.
Close() error
// GracefulClose starts to tear down the transport. It stops accepting
// new RPCs and wait the completion of the pending RPCs.
GracefulClose() error
// GracefulClose starts to tear down the transport: the transport will stop
// accepting new RPCs and NewStream will return error. Once all streams are
// finished, the transport will close.
//
// It does not block.
GracefulClose()
// Write sends the data for the given stream. A nil stream indicates
// the write is to be performed on the transport as a whole.

View file

@ -17,9 +17,8 @@
*/
// Package naming defines the naming API and related data structures for gRPC.
// The interface is EXPERIMENTAL and may be subject to change.
//
// Deprecated: please use package resolver.
// This package is deprecated: please use package resolver instead.
package naming
// Operation defines the corresponding operations for a name resolution change.

View file

@ -120,6 +120,14 @@ func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.
bp.mu.Unlock()
select {
case <-ctx.Done():
if connectionErr := bp.connectionError(); connectionErr != nil {
switch ctx.Err() {
case context.DeadlineExceeded:
return nil, nil, status.Errorf(codes.DeadlineExceeded, "latest connection error: %v", connectionErr)
case context.Canceled:
return nil, nil, status.Errorf(codes.Canceled, "latest connection error: %v", connectionErr)
}
}
return nil, nil, ctx.Err()
case <-ch:
}

64
vendor/google.golang.org/grpc/preloader.go generated vendored Normal file
View file

@ -0,0 +1,64 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc
import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// PreparedMsg is responsible for creating a Marshalled and Compressed object.
//
// This API is EXPERIMENTAL.
type PreparedMsg struct {
// Struct for preparing msg before sending them
encodedData []byte
hdr []byte
payload []byte
}
// Encode marshalls and compresses the message using the codec and compressor for the stream.
func (p *PreparedMsg) Encode(s Stream, msg interface{}) error {
ctx := s.Context()
rpcInfo, ok := rpcInfoFromContext(ctx)
if !ok {
return status.Errorf(codes.Internal, "grpc: unable to get rpcInfo")
}
// check if the context has the relevant information to prepareMsg
if rpcInfo.preloaderInfo == nil {
return status.Errorf(codes.Internal, "grpc: rpcInfo.preloaderInfo is nil")
}
if rpcInfo.preloaderInfo.codec == nil {
return status.Errorf(codes.Internal, "grpc: rpcInfo.preloaderInfo.codec is nil")
}
// prepare the msg
data, err := encode(rpcInfo.preloaderInfo.codec, msg)
if err != nil {
return err
}
p.encodedData = data
compData, err := compress(data, rpcInfo.preloaderInfo.cp, rpcInfo.preloaderInfo.comp)
if err != nil {
return err
}
p.hdr, p.payload = msgHeader(data, compData)
return nil
}

View file

@ -66,6 +66,9 @@ var (
var (
defaultResolver netResolver = net.DefaultResolver
// To prevent excessive re-resolution, we enforce a rate limit on DNS
// resolution requests.
minDNSResRate = 30 * time.Second
)
var customAuthorityDialler = func(authority string) func(ctx context.Context, network, address string) (net.Conn, error) {
@ -241,7 +244,13 @@ func (d *dnsResolver) watcher() {
return
case <-d.t.C:
case <-d.rn:
if !d.t.Stop() {
// Before resetting a timer, it should be stopped to prevent racing with
// reads on it's channel.
<-d.t.C
}
}
result, sc := d.lookup()
// Next lookup should happen within an interval defined by d.freq. It may be
// more often due to exponential retry on empty address list.
@ -254,6 +263,16 @@ func (d *dnsResolver) watcher() {
}
d.cc.NewServiceConfig(sc)
d.cc.NewAddress(result)
// Sleep to prevent excessive re-resolutions. Incoming resolution requests
// will be queued in d.rn.
t := time.NewTimer(minDNSResRate)
select {
case <-t.C:
case <-d.ctx.Done():
t.Stop()
return
}
}
}

View file

@ -20,6 +20,10 @@
// All APIs in this package are experimental.
package resolver
import (
"google.golang.org/grpc/serviceconfig"
)
var (
// m is a map from scheme to resolver builder.
m = make(map[string]Builder)
@ -100,11 +104,12 @@ type BuildOption struct {
// State contains the current Resolver state relevant to the ClientConn.
type State struct {
Addresses []Address // Resolved addresses for the target
ServiceConfig string // JSON representation of the service config
Addresses []Address // Resolved addresses for the target
// ServiceConfig is the parsed service config; obtained from
// serviceconfig.Parse.
ServiceConfig serviceconfig.Config
// TODO: add Err error
// TODO: add ParsedServiceConfig interface{}
}
// ClientConn contains the callbacks for resolver to notify any updates
@ -132,6 +137,21 @@ type ClientConn interface {
// Target represents a target for gRPC, as specified in:
// https://github.com/grpc/grpc/blob/master/doc/naming.md.
// It is parsed from the target string that gets passed into Dial or DialContext by the user. And
// grpc passes it to the resolver and the balancer.
//
// If the target follows the naming spec, and the parsed scheme is registered with grpc, we will
// parse the target string according to the spec. e.g. "dns://some_authority/foo.bar" will be parsed
// into &Target{Scheme: "dns", Authority: "some_authority", Endpoint: "foo.bar"}
//
// If the target does not contain a scheme, we will apply the default scheme, and set the Target to
// be the full target string. e.g. "foo.bar" will be parsed into
// &Target{Scheme: resolver.GetDefaultScheme(), Endpoint: "foo.bar"}.
//
// If the parsed scheme is not registered (i.e. no corresponding resolver available to resolve the
// endpoint), we set the Scheme to be the default scheme, and set the Endpoint to be the full target
// string. e.g. target string "unknown_scheme://authority/endpoint" will be parsed into
// &Target{Scheme: resolver.GetDefaultScheme(), Endpoint: "unknown_scheme://authority/endpoint"}.
type Target struct {
Scheme string
Authority string

View file

@ -138,19 +138,22 @@ func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
return
}
grpclog.Infof("ccResolverWrapper: got new service config: %v", sc)
if channelz.IsOn() {
ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: sc})
c, err := parseServiceConfig(sc)
if err != nil {
return
}
ccr.curState.ServiceConfig = sc
if channelz.IsOn() {
ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: c})
}
ccr.curState.ServiceConfig = c
ccr.cc.updateResolverState(ccr.curState)
}
func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
if s.ServiceConfig == ccr.curState.ServiceConfig && (len(ccr.curState.Addresses) == 0) == (len(s.Addresses) == 0) {
return
}
var updates []string
if s.ServiceConfig != ccr.curState.ServiceConfig {
oldSC, oldOK := ccr.curState.ServiceConfig.(*ServiceConfig)
newSC, newOK := s.ServiceConfig.(*ServiceConfig)
if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) {
updates = append(updates, "service config updated")
}
if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 {

View file

@ -694,14 +694,34 @@ func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interf
return nil
}
// Information about RPC
type rpcInfo struct {
failfast bool
failfast bool
preloaderInfo *compressorInfo
}
// Information about Preloader
// Responsible for storing codec, and compressors
// If stream (s) has context s.Context which stores rpcInfo that has non nil
// pointers to codec, and compressors, then we can use preparedMsg for Async message prep
// and reuse marshalled bytes
type compressorInfo struct {
codec baseCodec
cp Compressor
comp encoding.Compressor
}
type rpcInfoContextKey struct{}
func newContextWithRPCInfo(ctx context.Context, failfast bool) context.Context {
return context.WithValue(ctx, rpcInfoContextKey{}, &rpcInfo{failfast: failfast})
func newContextWithRPCInfo(ctx context.Context, failfast bool, codec baseCodec, cp Compressor, comp encoding.Compressor) context.Context {
return context.WithValue(ctx, rpcInfoContextKey{}, &rpcInfo{
failfast: failfast,
preloaderInfo: &compressorInfo{
codec: codec,
cp: cp,
comp: comp,
},
})
}
func rpcInfoFromContext(ctx context.Context) (s *rpcInfo, ok bool) {

View file

@ -86,11 +86,11 @@ type service struct {
// Server is a gRPC server to serve RPC requests.
type Server struct {
opts options
opts serverOptions
mu sync.Mutex // guards following
lis map[net.Listener]bool
conns map[io.Closer]bool
conns map[transport.ServerTransport]bool
serve bool
drain bool
cv *sync.Cond // signaled when connections close for GracefulStop
@ -108,7 +108,7 @@ type Server struct {
czData *channelzData
}
type options struct {
type serverOptions struct {
creds credentials.TransportCredentials
codec baseCodec
cp Compressor
@ -131,7 +131,7 @@ type options struct {
maxHeaderListSize *uint32
}
var defaultServerOptions = options{
var defaultServerOptions = serverOptions{
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
maxSendMessageSize: defaultServerMaxSendMessageSize,
connectionTimeout: 120 * time.Second,
@ -140,7 +140,33 @@ var defaultServerOptions = options{
}
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
type ServerOption func(*options)
type ServerOption interface {
apply(*serverOptions)
}
// EmptyServerOption does not alter the server configuration. It can be embedded
// in another structure to build custom server options.
//
// This API is EXPERIMENTAL.
type EmptyServerOption struct{}
func (EmptyServerOption) apply(*serverOptions) {}
// funcServerOption wraps a function that modifies serverOptions into an
// implementation of the ServerOption interface.
type funcServerOption struct {
f func(*serverOptions)
}
func (fdo *funcServerOption) apply(do *serverOptions) {
fdo.f(do)
}
func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
return &funcServerOption{
f: f,
}
}
// WriteBufferSize determines how much data can be batched before doing a write on the wire.
// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
@ -148,9 +174,9 @@ type ServerOption func(*options)
// Zero will disable the write buffer such that each write will be on underlying connection.
// Note: A Send call may not directly translate to a write.
func WriteBufferSize(s int) ServerOption {
return func(o *options) {
return newFuncServerOption(func(o *serverOptions) {
o.writeBufferSize = s
}
})
}
// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
@ -159,25 +185,25 @@ func WriteBufferSize(s int) ServerOption {
// Zero will disable read buffer for a connection so data framer can access the underlying
// conn directly.
func ReadBufferSize(s int) ServerOption {
return func(o *options) {
return newFuncServerOption(func(o *serverOptions) {
o.readBufferSize = s
}
})
}
// InitialWindowSize returns a ServerOption that sets window size for stream.
// The lower bound for window size is 64K and any value smaller than that will be ignored.
func InitialWindowSize(s int32) ServerOption {
return func(o *options) {
return newFuncServerOption(func(o *serverOptions) {
o.initialWindowSize = s
}
})
}
// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
// The lower bound for window size is 64K and any value smaller than that will be ignored.
func InitialConnWindowSize(s int32) ServerOption {
return func(o *options) {
return newFuncServerOption(func(o *serverOptions) {
o.initialConnWindowSize = s
}
})
}
// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
@ -187,25 +213,25 @@ func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
kp.Time = time.Second
}
return func(o *options) {
return newFuncServerOption(func(o *serverOptions) {
o.keepaliveParams = kp
}
})
}
// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
return func(o *options) {
return newFuncServerOption(func(o *serverOptions) {
o.keepalivePolicy = kep
}
})
}
// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
//
// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
func CustomCodec(codec Codec) ServerOption {
return func(o *options) {
return newFuncServerOption(func(o *serverOptions) {
o.codec = codec
}
})
}
// RPCCompressor returns a ServerOption that sets a compressor for outbound
@ -216,9 +242,9 @@ func CustomCodec(codec Codec) ServerOption {
//
// Deprecated: use encoding.RegisterCompressor instead.
func RPCCompressor(cp Compressor) ServerOption {
return func(o *options) {
return newFuncServerOption(func(o *serverOptions) {
o.cp = cp
}
})
}
// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
@ -227,9 +253,9 @@ func RPCCompressor(cp Compressor) ServerOption {
//
// Deprecated: use encoding.RegisterCompressor instead.
func RPCDecompressor(dc Decompressor) ServerOption {
return func(o *options) {
return newFuncServerOption(func(o *serverOptions) {
o.dc = dc
}
})
}
// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
@ -243,73 +269,73 @@ func MaxMsgSize(m int) ServerOption {
// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
// If this is not set, gRPC uses the default 4MB.
func MaxRecvMsgSize(m int) ServerOption {
return func(o *options) {
return newFuncServerOption(func(o *serverOptions) {
o.maxReceiveMessageSize = m
}
})
}
// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
// If this is not set, gRPC uses the default `math.MaxInt32`.
func MaxSendMsgSize(m int) ServerOption {
return func(o *options) {
return newFuncServerOption(func(o *serverOptions) {
o.maxSendMessageSize = m
}
})
}
// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
// of concurrent streams to each ServerTransport.
func MaxConcurrentStreams(n uint32) ServerOption {
return func(o *options) {
return newFuncServerOption(func(o *serverOptions) {
o.maxConcurrentStreams = n
}
})
}
// Creds returns a ServerOption that sets credentials for server connections.
func Creds(c credentials.TransportCredentials) ServerOption {
return func(o *options) {
return newFuncServerOption(func(o *serverOptions) {
o.creds = c
}
})
}
// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
// server. Only one unary interceptor can be installed. The construction of multiple
// interceptors (e.g., chaining) can be implemented at the caller.
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
return func(o *options) {
return newFuncServerOption(func(o *serverOptions) {
if o.unaryInt != nil {
panic("The unary server interceptor was already set and may not be reset.")
}
o.unaryInt = i
}
})
}
// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
// server. Only one stream interceptor can be installed.
func StreamInterceptor(i StreamServerInterceptor) ServerOption {
return func(o *options) {
return newFuncServerOption(func(o *serverOptions) {
if o.streamInt != nil {
panic("The stream server interceptor was already set and may not be reset.")
}
o.streamInt = i
}
})
}
// InTapHandle returns a ServerOption that sets the tap handle for all the server
// transport to be created. Only one can be installed.
func InTapHandle(h tap.ServerInHandle) ServerOption {
return func(o *options) {
return newFuncServerOption(func(o *serverOptions) {
if o.inTapHandle != nil {
panic("The tap handle was already set and may not be reset.")
}
o.inTapHandle = h
}
})
}
// StatsHandler returns a ServerOption that sets the stats handler for the server.
func StatsHandler(h stats.Handler) ServerOption {
return func(o *options) {
return newFuncServerOption(func(o *serverOptions) {
o.statsHandler = h
}
})
}
// UnknownServiceHandler returns a ServerOption that allows for adding a custom
@ -319,7 +345,7 @@ func StatsHandler(h stats.Handler) ServerOption {
// The handling function has full access to the Context of the request and the
// stream, and the invocation bypasses interceptors.
func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
return func(o *options) {
return newFuncServerOption(func(o *serverOptions) {
o.unknownStreamDesc = &StreamDesc{
StreamName: "unknown_service_handler",
Handler: streamHandler,
@ -327,7 +353,7 @@ func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
ClientStreams: true,
ServerStreams: true,
}
}
})
}
// ConnectionTimeout returns a ServerOption that sets the timeout for
@ -337,17 +363,17 @@ func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
//
// This API is EXPERIMENTAL.
func ConnectionTimeout(d time.Duration) ServerOption {
return func(o *options) {
return newFuncServerOption(func(o *serverOptions) {
o.connectionTimeout = d
}
})
}
// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
// of header list that the server is prepared to accept.
func MaxHeaderListSize(s uint32) ServerOption {
return func(o *options) {
return newFuncServerOption(func(o *serverOptions) {
o.maxHeaderListSize = &s
}
})
}
// NewServer creates a gRPC server which has no service registered and has not
@ -355,12 +381,12 @@ func MaxHeaderListSize(s uint32) ServerOption {
func NewServer(opt ...ServerOption) *Server {
opts := defaultServerOptions
for _, o := range opt {
o(&opts)
o.apply(&opts)
}
s := &Server{
lis: make(map[net.Listener]bool),
opts: opts,
conns: make(map[io.Closer]bool),
conns: make(map[transport.ServerTransport]bool),
m: make(map[string]*service),
quit: make(chan struct{}),
done: make(chan struct{}),
@ -760,27 +786,27 @@ func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Strea
return trInfo
}
func (s *Server) addConn(c io.Closer) bool {
func (s *Server) addConn(st transport.ServerTransport) bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.conns == nil {
c.Close()
st.Close()
return false
}
if s.drain {
// Transport added after we drained our existing conns: drain it
// immediately.
c.(transport.ServerTransport).Drain()
st.Drain()
}
s.conns[c] = true
s.conns[st] = true
return true
}
func (s *Server) removeConn(c io.Closer) {
func (s *Server) removeConn(st transport.ServerTransport) {
s.mu.Lock()
defer s.mu.Unlock()
if s.conns != nil {
delete(s.conns, c)
delete(s.conns, st)
s.cv.Broadcast()
}
}
@ -1397,8 +1423,8 @@ func (s *Server) GracefulStop() {
}
s.lis = nil
if !s.drain {
for c := range s.conns {
c.(transport.ServerTransport).Drain()
for st := range s.conns {
st.Drain()
}
s.drain = true
}

View file

@ -25,8 +25,11 @@ import (
"strings"
"time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/serviceconfig"
)
const maxInt = int(^uint(0) >> 1)
@ -61,6 +64,11 @@ type MethodConfig struct {
retryPolicy *retryPolicy
}
type lbConfig struct {
name string
cfg serviceconfig.LoadBalancingConfig
}
// ServiceConfig is provided by the service provider and contains parameters for how
// clients that connect to the service should behave.
//
@ -68,10 +76,18 @@ type MethodConfig struct {
// through name resolver, as specified here
// https://github.com/grpc/grpc/blob/master/doc/service_config.md
type ServiceConfig struct {
// LB is the load balancer the service providers recommends. The balancer specified
// via grpc.WithBalancer will override this.
serviceconfig.Config
// LB is the load balancer the service providers recommends. The balancer
// specified via grpc.WithBalancer will override this. This is deprecated;
// lbConfigs is preferred. If lbConfig and LB are both present, lbConfig
// will be used.
LB *string
// lbConfig is the service config's load balancing configuration. If
// lbConfig and LB are both present, lbConfig will be used.
lbConfig *lbConfig
// Methods contains a map for the methods in this service. If there is an
// exact match for a method (i.e. /service/method) in the map, use the
// corresponding MethodConfig. If there's no exact match, look for the
@ -233,15 +249,27 @@ type jsonMC struct {
RetryPolicy *jsonRetryPolicy
}
type loadBalancingConfig map[string]json.RawMessage
// TODO(lyuxuan): delete this struct after cleaning up old service config implementation.
type jsonSC struct {
LoadBalancingPolicy *string
LoadBalancingConfig *[]loadBalancingConfig
MethodConfig *[]jsonMC
RetryThrottling *retryThrottlingPolicy
HealthCheckConfig *healthCheckConfig
}
func init() {
internal.ParseServiceConfig = func(sc string) (interface{}, error) {
return parseServiceConfig(sc)
}
}
func parseServiceConfig(js string) (*ServiceConfig, error) {
if len(js) == 0 {
return nil, fmt.Errorf("no JSON service config provided")
}
var rsc jsonSC
err := json.Unmarshal([]byte(js), &rsc)
if err != nil {
@ -255,10 +283,38 @@ func parseServiceConfig(js string) (*ServiceConfig, error) {
healthCheckConfig: rsc.HealthCheckConfig,
rawJSONString: js,
}
if rsc.LoadBalancingConfig != nil {
for i, lbcfg := range *rsc.LoadBalancingConfig {
if len(lbcfg) != 1 {
err := fmt.Errorf("invalid loadBalancingConfig: entry %v does not contain exactly 1 policy/config pair: %q", i, lbcfg)
grpclog.Warningf(err.Error())
return nil, err
}
var name string
var jsonCfg json.RawMessage
for name, jsonCfg = range lbcfg {
}
builder := balancer.Get(name)
if builder == nil {
continue
}
sc.lbConfig = &lbConfig{name: name}
if parser, ok := builder.(balancer.ConfigParser); ok {
var err error
sc.lbConfig.cfg, err = parser.ParseConfig(jsonCfg)
if err != nil {
return nil, fmt.Errorf("error parsing loadBalancingConfig for policy %q: %v", name, err)
}
} else if string(jsonCfg) != "{}" {
grpclog.Warningf("non-empty balancer configuration %q, but balancer does not implement ParseConfig", string(jsonCfg))
}
break
}
}
if rsc.MethodConfig == nil {
return &sc, nil
}
for _, m := range *rsc.MethodConfig {
if m.Name == nil {
continue
@ -299,11 +355,11 @@ func parseServiceConfig(js string) (*ServiceConfig, error) {
}
if sc.retryThrottling != nil {
if sc.retryThrottling.MaxTokens <= 0 ||
sc.retryThrottling.MaxTokens > 1000 ||
sc.retryThrottling.TokenRatio <= 0 {
// Illegal throttling config; disable throttling.
sc.retryThrottling = nil
if mt := sc.retryThrottling.MaxTokens; mt <= 0 || mt > 1000 {
return nil, fmt.Errorf("invalid retry throttling config: maxTokens (%v) out of range (0, 1000]", mt)
}
if tr := sc.retryThrottling.TokenRatio; tr <= 0 {
return nil, fmt.Errorf("invalid retry throttling config: tokenRatio (%v) may not be negative", tr)
}
}
return &sc, nil

View file

@ -0,0 +1,48 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package serviceconfig defines types and methods for operating on gRPC
// service configs.
//
// This package is EXPERIMENTAL.
package serviceconfig
import (
"google.golang.org/grpc/internal"
)
// Config represents an opaque data structure holding a service config.
type Config interface {
isConfig()
}
// LoadBalancingConfig represents an opaque data structure holding a load
// balancer config.
type LoadBalancingConfig interface {
isLoadBalancingConfig()
}
// Parse parses the JSON service config provided into an internal form or
// returns an error if the config is invalid.
func Parse(ServiceConfigJSON string) (Config, error) {
c, err := internal.ParseServiceConfig(ServiceConfigJSON)
if err != nil {
return nil, err
}
return c.(Config), err
}

View file

@ -36,8 +36,15 @@ import (
"github.com/golang/protobuf/ptypes"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal"
)
func init() {
internal.StatusRawProto = statusRawProto
}
func statusRawProto(s *Status) *spb.Status { return s.s }
// statusError is an alias of a status proto. It implements error and Status,
// and a nil statusError should never be returned by this package.
type statusError spb.Status

View file

@ -30,7 +30,6 @@ import (
"golang.org/x/net/trace"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/balancerload"
@ -245,7 +244,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
trInfo.tr.LazyLog(&trInfo.firstLine, false)
ctx = trace.NewContext(ctx, trInfo.tr)
}
ctx = newContextWithRPCInfo(ctx, c.failFast)
ctx = newContextWithRPCInfo(ctx, c.failFast, c.codec, cp, comp)
sh := cc.dopts.copts.StatsHandler
var beginTime time.Time
if sh != nil {
@ -677,15 +676,13 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
if !cs.desc.ClientStreams {
cs.sentLast = true
}
data, err := encode(cs.codec, m)
// load hdr, payload, data
hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
if err != nil {
return err
}
compData, err := compress(data, cs.cp, cs.comp)
if err != nil {
return err
}
hdr, payload := msgHeader(data, compData)
// TODO(dfawley): should we be checking len(data) instead?
if len(payload) > *cs.callInfo.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
@ -967,19 +964,18 @@ func (a *csAttempt) finish(err error) {
a.mu.Unlock()
}
func (ac *addrConn) newClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, opts ...CallOption) (_ ClientStream, err error) {
ac.mu.Lock()
if ac.transport != t {
ac.mu.Unlock()
return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
}
// transition to CONNECTING state when an attempt starts
if ac.state != connectivity.Connecting {
ac.updateConnectivityState(connectivity.Connecting)
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
}
ac.mu.Unlock()
// newClientStream creates a ClientStream with the specified transport, on the
// given addrConn.
//
// It's expected that the given transport is either the same one in addrConn, or
// is already closed. To avoid race, transport is specified separately, instead
// of using ac.transpot.
//
// Main difference between this and ClientConn.NewStream:
// - no retry
// - no service config (or wait for service config)
// - no tracing or stats
func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption) (_ ClientStream, err error) {
if t == nil {
// TODO: return RPC error here?
return nil, errors.New("transport provided is nil")
@ -987,14 +983,6 @@ func (ac *addrConn) newClientStream(ctx context.Context, desc *StreamDesc, metho
// defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
c := &callInfo{}
for _, o := range opts {
if err := o.before(c); err != nil {
return nil, toRPCErr(err)
}
}
c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
// Possible context leak:
// The cancel function for the child context we create will only be called
// when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
@ -1007,6 +995,13 @@ func (ac *addrConn) newClientStream(ctx context.Context, desc *StreamDesc, metho
}
}()
for _, o := range opts {
if err := o.before(c); err != nil {
return nil, toRPCErr(err)
}
}
c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
if err := setCallInfoCodec(c); err != nil {
return nil, err
}
@ -1039,6 +1034,7 @@ func (ac *addrConn) newClientStream(ctx context.Context, desc *StreamDesc, metho
callHdr.Creds = c.creds
}
// Use a special addrConnStream to avoid retry.
as := &addrConnStream{
callHdr: callHdr,
ac: ac,
@ -1150,15 +1146,13 @@ func (as *addrConnStream) SendMsg(m interface{}) (err error) {
if !as.desc.ClientStreams {
as.sentLast = true
}
data, err := encode(as.codec, m)
// load hdr, payload, data
hdr, payld, _, err := prepareMsg(m, as.codec, as.cp, as.comp)
if err != nil {
return err
}
compData, err := compress(data, as.cp, as.comp)
if err != nil {
return err
}
hdr, payld := msgHeader(data, compData)
// TODO(dfawley): should we be checking len(data) instead?
if len(payld) > *as.callInfo.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize)
@ -1395,15 +1389,13 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
ss.t.IncrMsgSent()
}
}()
data, err := encode(ss.codec, m)
// load hdr, payload, data
hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp)
if err != nil {
return err
}
compData, err := compress(data, ss.cp, ss.comp)
if err != nil {
return err
}
hdr, payload := msgHeader(data, compData)
// TODO(dfawley): should we be checking len(data) instead?
if len(payload) > ss.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
@ -1496,3 +1488,24 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
func MethodFromServerStream(stream ServerStream) (string, bool) {
return Method(stream.Context())
}
// prepareMsg returns the hdr, payload and data
// using the compressors passed or using the
// passed preparedmsg
func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) {
if preparedMsg, ok := m.(*PreparedMsg); ok {
return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil
}
// The input interface is not a prepared msg.
// Marshal and Compress the data at this point
data, err = encode(codec, m)
if err != nil {
return nil, nil, nil, err
}
compData, err := compress(data, cp, comp)
if err != nil {
return nil, nil, nil, err
}
hdr, payload = msgHeader(data, compData)
return hdr, payload, data, nil
}

View file

@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
const Version = "1.20.1"
const Version = "1.22.0"

View file

@ -75,7 +75,7 @@ git ls-files "*.go" | xargs grep -L "\(Copyright [0-9]\{4,\} gRPC authors\)\|DO
# - Do not import math/rand for real library code. Use internal/grpcrand for
# thread safety.
git ls-files "*.go" | xargs grep -l '"math/rand"' 2>&1 | (! grep -v '^examples\|^stress\|grpcrand')
git ls-files "*.go" | xargs grep -l '"math/rand"' 2>&1 | (! grep -v '^examples\|^stress\|grpcrand\|wrr_test')
# - Ensure all ptypes proto packages are renamed when importing.
git ls-files "*.go" | (! xargs grep "\(import \|^\s*\)\"github.com/golang/protobuf/ptypes/")
@ -88,7 +88,7 @@ go list -f {{.Dir}} ./... | xargs go run test/go_vet/vet.go
gofmt -s -d -l . 2>&1 | fail_on_output
goimports -l . 2>&1 | (! grep -vE "(_mock|\.pb)\.go:") | fail_on_output
golint ./... 2>&1 | (! grep -vE "(_mock|\.pb)\.go:")
go tool vet -all .
go vet -all .
# - Check that generated proto files are up to date.
if [[ -z "${VET_SKIP_PROTO}" ]]; then

7
vendor/modules.txt vendored
View file

@ -367,7 +367,7 @@ github.com/hashicorp/vault-plugin-secrets-gcpkms
github.com/hashicorp/vault-plugin-secrets-kv
# github.com/hashicorp/vault/api v1.0.3-0.20190719145648-41d3939b1ff9 => ./api
github.com/hashicorp/vault/api
# github.com/hashicorp/vault/sdk v0.1.12-0.20190719143932-2c807ec75a51 => ./sdk
# github.com/hashicorp/vault/sdk v0.1.12-0.20190724154558-f57ccdd48b70 => ./sdk
github.com/hashicorp/vault/sdk/helper/salt
github.com/hashicorp/vault/sdk/helper/strutil
github.com/hashicorp/vault/sdk/helper/wrapping
@ -663,7 +663,7 @@ golang.org/x/oauth2/google
golang.org/x/oauth2/jwt
golang.org/x/oauth2/jws
golang.org/x/oauth2/clientcredentials
# golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6
# golang.org/x/sync v0.0.0-20190423024810-112230192c58
golang.org/x/sync/semaphore
# golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5
golang.org/x/sys/unix
@ -725,7 +725,7 @@ google.golang.org/genproto/googleapis/api/annotations
google.golang.org/genproto/googleapis/rpc/code
google.golang.org/genproto/googleapis/type/expr
google.golang.org/genproto/googleapis/api/httpbody
# google.golang.org/grpc v1.20.1
# google.golang.org/grpc v1.22.0
google.golang.org/grpc/grpclog
google.golang.org/grpc/codes
google.golang.org/grpc
@ -752,6 +752,7 @@ google.golang.org/grpc/peer
google.golang.org/grpc/resolver
google.golang.org/grpc/resolver/dns
google.golang.org/grpc/resolver/passthrough
google.golang.org/grpc/serviceconfig
google.golang.org/grpc/stats
google.golang.org/grpc/tap
google.golang.org/grpc/health