Teach Vault how to register with Consul

Vault will now register itself with Consul.  The active node can be found using `active.vault.service.consul`.  All standby vaults are available via `standby.vault.service.consul`.  All unsealed vaults are considered healthy and available via `vault.service.consul`.  Change in status and registration is event driven and should happen at the speed of a write to Consul (~network RTT + ~1x fsync(2)).

Healthy/active:

```
curl -X GET 'http://127.0.0.1:8500/v1/health/service/vault?pretty' && echo;
[
    {
        "Node": {
            "Node": "vm1",
            "Address": "127.0.0.1",
            "TaggedAddresses": {
                "wan": "127.0.0.1"
            },
            "CreateIndex": 3,
            "ModifyIndex": 20
        },
        "Service": {
            "ID": "vault:127.0.0.1:8200",
            "Service": "vault",
            "Tags": [
                "active"
            ],
            "Address": "127.0.0.1",
            "Port": 8200,
            "EnableTagOverride": false,
            "CreateIndex": 17,
            "ModifyIndex": 20
        },
        "Checks": [
            {
                "Node": "vm1",
                "CheckID": "serfHealth",
                "Name": "Serf Health Status",
                "Status": "passing",
                "Notes": "",
                "Output": "Agent alive and reachable",
                "ServiceID": "",
                "ServiceName": "",
                "CreateIndex": 3,
                "ModifyIndex": 3
            },
            {
                "Node": "vm1",
                "CheckID": "vault-sealed-check",
                "Name": "Vault Sealed Status",
                "Status": "passing",
                "Notes": "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server",
                "Output": "",
                "ServiceID": "vault:127.0.0.1:8200",
                "ServiceName": "vault",
                "CreateIndex": 19,
                "ModifyIndex": 19
            }
        ]
    }
]
```

Healthy/standby:

```
[snip]
        "Service": {
            "ID": "vault:127.0.0.2:8200",
            "Service": "vault",
            "Tags": [
                "standby"
            ],
            "Address": "127.0.0.2",
            "Port": 8200,
            "EnableTagOverride": false,
            "CreateIndex": 17,
            "ModifyIndex": 20
        },
        "Checks": [
            {
                "Node": "vm2",
                "CheckID": "serfHealth",
                "Name": "Serf Health Status",
                "Status": "passing",
                "Notes": "",
                "Output": "Agent alive and reachable",
                "ServiceID": "",
                "ServiceName": "",
                "CreateIndex": 3,
                "ModifyIndex": 3
            },
            {
                "Node": "vm2",
                "CheckID": "vault-sealed-check",
                "Name": "Vault Sealed Status",
                "Status": "passing",
                "Notes": "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server",
                "Output": "",
                "ServiceID": "vault:127.0.0.2:8200",
                "ServiceName": "vault",
                "CreateIndex": 19,
                "ModifyIndex": 19
            }
        ]
    }
]
```

Sealed:

```
        "Checks": [
            {
                "Node": "vm2",
                "CheckID": "serfHealth",
                "Name": "Serf Health Status",
                "Status": "passing",
                "Notes": "",
                "Output": "Agent alive and reachable",
                "ServiceID": "",
                "ServiceName": "",
                "CreateIndex": 3,
                "ModifyIndex": 3
            },
            {
                "Node": "vm2",
                "CheckID": "vault-sealed-check",
                "Name": "Vault Sealed Status",
                "Status": "critical",
                "Notes": "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server",
                "Output": "Vault Sealed",
                "ServiceID": "vault:127.0.0.2:8200",
                "ServiceName": "vault",
                "CreateIndex": 19,
                "ModifyIndex": 38
            }
        ]
```
This commit is contained in:
Sean Chittenden 2016-04-23 17:15:05 -07:00
parent 0d3ce59542
commit c0bbeba5ad
4 changed files with 673 additions and 9 deletions

View file

@ -203,6 +203,9 @@ func (c *ServerCommand) Run(args []string) int {
if envAA := os.Getenv("VAULT_ADVERTISE_ADDR"); envAA != "" {
coreConfig.AdvertiseAddr = envAA
if consulBackend, ok := (backend).(*physical.ConsulBackend); ok {
consulBackend.UpdateAdvertiseAddr(envAA)
}
}
// Attempt to detect the advertise address, if possible
@ -220,6 +223,9 @@ func (c *ServerCommand) Run(args []string) int {
c.Ui.Error("Failed to detect advertise address.")
} else {
coreConfig.AdvertiseAddr = advertise
if consulBackend, ok := (backend).(*physical.ConsulBackend); ok {
consulBackend.UpdateAdvertiseAddr(advertise)
}
}
}

View file

@ -3,8 +3,11 @@ package physical
import (
"fmt"
"io/ioutil"
"net"
"net/url"
"strconv"
"strings"
"sync"
"time"
"crypto/tls"
@ -12,18 +15,50 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/errwrap"
"github.com/hashicorp/go-cleanhttp"
)
const (
// checkJitterFactor specifies the jitter factor used to stagger checks
checkJitterFactor = 16
// checkMinBuffer specifies provides a guarantee that a check will not
// be executed too close to the TTL check timeout
checkMinBuffer = 100 * time.Millisecond
// defaultCheckTimeout changes the timeout of TTL checks
defaultCheckTimeout = 5 * time.Second
// defaultCheckInterval specifies the default interval used to send
// checks
defaultCheckInterval = 4 * time.Second
// defaultServiceName is the default Consul service name used when
// advertising a Vault instance.
defaultServiceName = "vault"
)
// ConsulBackend is a physical backend that stores data at specific
// prefix within Consul. It is used for most production situations as
// it allows Vault to run on multiple machines in a highly-available manner.
type ConsulBackend struct {
path string
client *api.Client
kv *api.KV
permitPool *PermitPool
path string
client *api.Client
kv *api.KV
permitPool *PermitPool
serviceLock sync.RWMutex
service *api.AgentServiceRegistration
sealedCheck *api.AgentCheckRegistration
advertiseAddr string
consulClientConf *api.Config
serviceName string
running bool
active bool
sealed bool
checkTimeout time.Duration
checkTimer *time.Timer
}
// newConsulBackend constructs a Consul backend using the given API client
@ -43,6 +78,28 @@ func newConsulBackend(conf map[string]string) (Backend, error) {
path = strings.TrimPrefix(path, "/")
}
// Get the service name to advertise in Consul
service, ok := conf["service"]
if !ok {
service = defaultServiceName
}
checkTimeout := defaultCheckTimeout
checkTimeoutStr, ok := conf["check_timeout"]
if ok {
d, err := time.ParseDuration(checkTimeoutStr)
if err != nil {
return nil, err
}
min, _ := lib.DurationMinusBufferDomain(d, checkMinBuffer, checkJitterFactor)
if d < min {
return nil, fmt.Errorf("Consul check_timeout must be greater than %v", min)
}
checkTimeout = d
}
// Configure the client
consulConf := api.DefaultConfig()
@ -84,14 +141,234 @@ func newConsulBackend(conf map[string]string) (Backend, error) {
// Setup the backend
c := &ConsulBackend{
path: path,
client: client,
kv: client.KV(),
permitPool: NewPermitPool(maxParInt),
path: path,
client: client,
kv: client.KV(),
permitPool: NewPermitPool(maxParInt),
consulClientConf: consulConf,
serviceName: service,
checkTimeout: checkTimeout,
checkTimer: time.NewTimer(checkTimeout),
}
return c, nil
}
// UpdateAdvertiseAddr provides a pre-initialization hook for updating
// Consul's advertise address.
func (c *ConsulBackend) UpdateAdvertiseAddr(addr string) error {
if c.running {
return fmt.Errorf("service registration unable to update advertise address, backend already running")
}
url, err := url.Parse(addr)
if err != nil {
return errwrap.Wrapf(fmt.Sprintf(`updating advertise address failed to parse URL "%v": {{err}}`, addr), err)
}
_, portStr, err := net.SplitHostPort(url.Host)
if err != nil {
return errwrap.Wrapf(fmt.Sprintf(`updating advertise address failed to find a host:port in advertise address "%v": {{err}}`, url.Host), err)
}
_, err = strconv.ParseInt(portStr, 10, 0)
if err != nil {
return errwrap.Wrapf(fmt.Sprintf(`updating advertise address failed to parse port "%v": {{err}}`, portStr), err)
}
c.advertiseAddr = addr
return nil
}
// serviceTags returns all of the relevant tags for Consul. Assumes
// c.serviceLock held for writing.
func serviceTags(active bool) []string {
activeTag := "standby"
if active {
activeTag = "active"
}
return []string{activeTag}
}
func (c *ConsulBackend) AdvertiseActive(active bool) error {
c.serviceLock.Lock()
defer c.serviceLock.Unlock()
// Vault is still bootstrapping
if c.service == nil {
return nil
}
c.service.Tags = serviceTags(active)
agent := c.client.Agent()
if err := agent.ServiceRegister(c.service); err != nil {
return errwrap.Wrapf("service registration failed: {{err}}", err)
}
// Save a cached copy of the active state: no way to query Core
c.active = active
return nil
}
func (c *ConsulBackend) AdvertiseSealed(sealed bool) error {
c.serviceLock.Lock()
defer c.serviceLock.Unlock()
c.sealed = sealed
// Vault is still bootstrapping
if c.service == nil {
return nil
}
// Push a TTL check immediately to update the state
c.runCheck()
return nil
}
func (c *ConsulBackend) RunServiceDiscovery(shutdownCh ShutdownChannel) (err error) {
c.serviceLock.Lock()
defer c.serviceLock.Unlock()
if c.running {
return fmt.Errorf("service registration routine already running")
}
url, err := url.Parse(c.advertiseAddr)
if err != nil {
return errwrap.Wrapf(fmt.Sprintf(`service registration failed to parse URL "%v": {{err}}`, c.advertiseAddr), err)
}
host, portStr, err := net.SplitHostPort(url.Host)
if err != nil {
return errwrap.Wrapf(fmt.Sprintf(`service registration failed to find a host:port in advertise address "%v": {{err}}`, url.Host), err)
}
port, err := strconv.ParseInt(portStr, 10, 0)
if err != nil {
return errwrap.Wrapf(fmt.Sprintf(`service registration failed to parse port "%v": {{err}}`, portStr), err)
}
serviceID, err := c.serviceID()
if err != nil {
return err
}
c.service = &api.AgentServiceRegistration{
ID: serviceID,
Name: c.serviceName,
Tags: serviceTags(c.active),
Port: int(port),
Address: host,
EnableTagOverride: false,
}
checkStatus := "failing"
if !c.sealed {
checkStatus = "passing"
}
c.sealedCheck = &api.AgentCheckRegistration{
ID: c.checkID(),
Name: "Vault Sealed Status",
Notes: "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server",
ServiceID: serviceID,
AgentServiceCheck: api.AgentServiceCheck{
TTL: c.checkTimeout.String(),
Status: checkStatus,
},
}
agent := c.client.Agent()
if err := agent.ServiceRegister(c.service); err != nil {
return errwrap.Wrapf("service registration failed: {{err}}", err)
}
if err := agent.CheckRegister(c.sealedCheck); err != nil {
return errwrap.Wrapf("service registration check registration failed: {{err}}", err)
}
go c.checkRunner(shutdownCh)
c.running = true
// Deregister upon shutdown
go func() {
shutdown:
for {
select {
case <-shutdownCh:
// wtb logger: log.Printf("[DEBUG]: Shutting down consul backend")
break shutdown
}
}
if err := agent.ServiceDeregister(serviceID); err != nil {
// wtb logger: log.Printf("[WARNING]: service deregistration failed: {{err}}", err)
}
c.running = false
}()
return nil
}
// checkRunner periodically runs TTL checks
func (c *ConsulBackend) checkRunner(shutdownCh ShutdownChannel) {
defer c.checkTimer.Stop()
for {
select {
case <-c.checkTimer.C:
go func() {
c.serviceLock.Lock()
defer c.serviceLock.Unlock()
c.runCheck()
}()
case <-shutdownCh:
return
}
}
}
// runCheck immediately pushes a TTL check. Assumes c.serviceLock is held
// exclusively.
func (c *ConsulBackend) runCheck() {
// Reset timer before calling run check in order to not slide the
// window of the next check.
c.checkTimer.Reset(lib.DurationMinusBuffer(c.checkTimeout, checkMinBuffer, checkJitterFactor))
// Run a TTL check
agent := c.client.Agent()
if !c.sealed {
agent.UpdateTTL(c.checkID(), "Vault Unsealed", api.HealthPassing)
} else {
agent.UpdateTTL(c.checkID(), "Vault Sealed", api.HealthCritical)
}
}
// checkID returns the ID used for a Consul Check. Assume at least a read
// lock is held.
func (c *ConsulBackend) checkID() string {
return "vault-sealed-check"
}
// serviceID returns the Vault ServiceID for use in Consul. Assume at least
// a read lock is held.
func (c *ConsulBackend) serviceID() (string, error) {
url, err := url.Parse(c.advertiseAddr)
if err != nil {
return "", errwrap.Wrapf(fmt.Sprintf(`service registration failed to parse URL "%v": {{err}}`, c.advertiseAddr), err)
}
host, portStr, err := net.SplitHostPort(url.Host)
if err != nil {
return "", errwrap.Wrapf(fmt.Sprintf(`service registration failed to find a host:port in advertise address "%v": {{err}}`, url.Host), err)
}
port, err := strconv.ParseInt(portStr, 10, 0)
if err != nil {
return "", errwrap.Wrapf(fmt.Sprintf(`service registration failed to parse port "%v": {{err}}`, portStr), err)
}
return fmt.Sprintf("%s:%s:%d", c.serviceName, host, int(port)), nil
}
func setupTLSConfig(conf map[string]string) (*tls.Config, error) {
serverName := strings.Split(conf["address"], ":")

View file

@ -3,12 +3,373 @@ package physical
import (
"fmt"
"os"
"reflect"
"testing"
"time"
"github.com/hashicorp/consul/api"
)
type consulConf map[string]string
var (
addrCount int = 0
)
func testHostIP() string {
a := addrCount
addrCount++
return fmt.Sprintf("127.0.0.%d", a)
}
func testConsulBackend(t *testing.T) *ConsulBackend {
return testConsulBackendConfig(t, &consulConf{})
}
func testConsulBackendConfig(t *testing.T, conf *consulConf) *ConsulBackend {
const serviceID = "vaultTestService"
be, err := newConsulBackend(*conf)
if err != nil {
t.Fatalf("Expected Consul to initialize: %v", err)
}
c, ok := be.(*ConsulBackend)
if !ok {
t.Fatalf("Expected ConsulBackend")
}
c.service = &api.AgentServiceRegistration{
ID: serviceID,
Name: c.serviceName,
Tags: serviceTags(c.active),
Port: 8200,
Address: testHostIP(),
EnableTagOverride: false,
}
c.sealedCheck = &api.AgentCheckRegistration{
ID: c.checkID(),
Name: "Vault Sealed Status",
Notes: "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server",
ServiceID: serviceID,
AgentServiceCheck: api.AgentServiceCheck{
TTL: c.checkTimeout.String(),
Status: api.HealthPassing,
},
}
return c
}
func testConsul_testConsulBackend(t *testing.T) {
c := testConsulBackend(t)
if c == nil {
t.Fatalf("bad")
}
if c.active != false {
t.Fatalf("bad")
}
if c.sealed != false {
t.Fatalf("bad")
}
if c.service == nil {
t.Fatalf("bad")
}
if c.sealedCheck == nil {
t.Fatalf("bad")
}
}
func TestConsul_newConsulBackend(t *testing.T) {
tests := []struct {
Name string
Config map[string]string
Fail bool
checkTimeout time.Duration
path string
service string
address string
scheme string
token string
max_parallel int
}{
{
Name: "Valid default config",
Config: map[string]string{},
checkTimeout: 5 * time.Second,
path: "vault",
service: "vault",
address: "127.0.0.1",
scheme: "http",
token: "",
max_parallel: 4,
},
{
Name: "Valid modified config",
Config: map[string]string{
"path": "seaTech/",
"service": "astronomy",
"check_timeout": "6s",
"address": "127.0.0.2",
"scheme": "https",
"token": "deadbeef-cafeefac-deadc0de-feedface",
"max_parallel": "4",
},
checkTimeout: 6 * time.Second,
path: "seaTech/",
service: "astronomy",
address: "127.0.0.2",
scheme: "https",
token: "deadbeef-cafeefac-deadc0de-feedface",
max_parallel: 4,
},
{
Config: map[string]string{
"check_timeout": "99ms",
},
Fail: true,
},
}
for _, test := range tests {
be, err := newConsulBackend(test.Config)
if test.Fail && err == nil {
t.Fatalf("Expected config %s to fail", test.Name)
} else if !test.Fail && err != nil {
t.Fatalf("Expected config %s to not fail: %v", test.Name, err)
}
c, ok := be.(*ConsulBackend)
if !ok {
t.Fatalf("Expected ConsulBackend")
}
if test.checkTimeout != c.checkTimeout {
t.Errorf("bad: %v != %v", test.checkTimeout, c.checkTimeout)
}
if test.path != c.path {
t.Errorf("bad: %v != %v", test.path, c.path)
}
if test.service != c.serviceName {
t.Errorf("bad: %v != %v", test.service, c.serviceName)
}
if test.address != c.consulClientConf.Address {
t.Errorf("bad: %v != %v", test.address, c.consulClientConf.Address)
}
if test.scheme != c.consulClientConf.Scheme {
t.Errorf("bad: %v != %v", test.scheme, c.consulClientConf.Scheme)
}
if test.token != c.consulClientConf.Token {
t.Errorf("bad: %v != %v", test.token, c.consulClientConf.Token)
}
// FIXME(sean@): Unable to test max_parallel
// if test.max_parallel != cap(c.permitPool) {
// t.Errorf("bad: %v != %v", test.max_parallel, cap(c.permitPool))
// }
}
}
func TestConsul_serviceTags(t *testing.T) {
tests := []struct {
active bool
tags []string
}{
{
active: true,
tags: []string{"active"},
},
{
active: false,
tags: []string{"standby"},
},
}
for _, test := range tests {
tags := serviceTags(test.active)
if !reflect.DeepEqual(tags[:], test.tags[:]) {
t.Errorf("Bad %v: %v %v", test.active, tags, test.tags)
}
}
}
func TestConsul_UpdateAdvertiseAddr(t *testing.T) {
tests := []struct {
addr string
pass bool
}{
{
addr: "http://127.0.0.1:8200/",
pass: true,
},
{
addr: "http://127.0.0.1:8200",
pass: true,
},
{
addr: "127.0.0.1:8200",
pass: false,
},
{
addr: "127.0.0.1",
pass: false,
},
}
for _, test := range tests {
c := testConsulBackend(t)
if c == nil {
t.Fatalf("bad")
}
err := c.UpdateAdvertiseAddr(test.addr)
if test.pass {
if err != nil {
t.Fatalf("bad: %v", err)
}
} else {
if err == nil {
t.Fatalf("bad, expected fail")
} else {
continue
}
}
if c.advertiseAddr != test.addr {
t.Fatalf("bad: %v != %v", c.advertiseAddr, test.addr)
}
}
}
func TestConsul_AdvertiseActive(t *testing.T) {
c := testConsulBackend(t)
if c.active != false {
t.Fatalf("bad")
}
if err := c.AdvertiseActive(true); err != nil {
t.Fatalf("bad: %v", err)
}
if err := c.AdvertiseActive(true); err != nil {
t.Fatalf("bad: %v", err)
}
if err := c.AdvertiseActive(false); err != nil {
t.Fatalf("bad: %v", err)
}
if err := c.AdvertiseActive(false); err != nil {
t.Fatalf("bad: %v", err)
}
if err := c.AdvertiseActive(true); err != nil {
t.Fatalf("bad: %v", err)
}
}
func TestConsul_AdvertiseSealed(t *testing.T) {
c := testConsulBackend(t)
if c.sealed != false {
t.Fatalf("bad")
}
if err := c.AdvertiseSealed(true); err != nil {
t.Fatalf("bad: %v", err)
}
if c.sealed != true {
t.Fatalf("bad")
}
if err := c.AdvertiseSealed(true); err != nil {
t.Fatalf("bad: %v", err)
}
if c.sealed != true {
t.Fatalf("bad")
}
if err := c.AdvertiseSealed(false); err != nil {
t.Fatalf("bad: %v", err)
}
if c.sealed != false {
t.Fatalf("bad")
}
if err := c.AdvertiseSealed(false); err != nil {
t.Fatalf("bad: %v", err)
}
if c.sealed != false {
t.Fatalf("bad")
}
if err := c.AdvertiseSealed(true); err != nil {
t.Fatalf("bad: %v", err)
}
if c.sealed != true {
t.Fatalf("bad")
}
}
func TestConsul_checkID(t *testing.T) {
c := testConsulBackend(t)
if c.checkID() != "vault-sealed-check" {
t.Errorf("bad")
}
}
func TestConsul_serviceID(t *testing.T) {
passingTests := []struct {
advertiseAddr string
serviceName string
expected string
}{
{
advertiseAddr: "http://127.0.0.1:8200",
serviceName: "sea-tech-astronomy",
expected: "sea-tech-astronomy:127.0.0.1:8200",
},
{
advertiseAddr: "http://127.0.0.1:8200/",
serviceName: "sea-tech-astronomy",
expected: "sea-tech-astronomy:127.0.0.1:8200",
},
{
advertiseAddr: "https://127.0.0.1:8200/",
serviceName: "sea-tech-astronomy",
expected: "sea-tech-astronomy:127.0.0.1:8200",
},
}
for _, test := range passingTests {
c := testConsulBackendConfig(t, &consulConf{
"service": test.serviceName,
})
if err := c.UpdateAdvertiseAddr(test.advertiseAddr); err != nil {
t.Fatalf("bad: %v", err)
}
serviceID, err := c.serviceID()
if err != nil {
t.Fatalf("bad: %v", err)
}
if serviceID != test.expected {
t.Fatalf("bad: %v != %v", serviceID, test.expected)
}
}
}
func TestConsulBackend(t *testing.T) {
addr := os.Getenv("CONSUL_ADDR")
if addr == "" {

View file

@ -10,12 +10,32 @@ import (
// servicing Consul TTL Checks in advance of the TTL.
func DurationMinusBuffer(intv time.Duration, buffer time.Duration, jitter int64) time.Duration {
d := intv - buffer
d -= RandomStagger(time.Duration(int64(d) / jitter))
if jitter == 0 {
d -= RandomStagger(d)
} else {
d -= RandomStagger(time.Duration(int64(d) / jitter))
}
return d
}
// DurationMinusBufferDomain returns the domain of valid durations from a
// call to DurationMinusBuffer. This function is used to check user
// specified input values to DurationMinusBuffer.
func DurationMinusBufferDomain(intv time.Duration, buffer time.Duration, jitter int64) (min time.Duration, max time.Duration) {
max = intv - buffer
if jitter == 0 {
min = max
} else {
min = max - time.Duration(int64(max)/jitter)
}
return min, max
}
// Returns a random stagger interval between 0 and the duration
func RandomStagger(intv time.Duration) time.Duration {
if intv == 0 {
return 0
}
return time.Duration(uint64(rand.Int63()) % uint64(intv))
}