From c0bbeba5adf837bbdab4bdb954f071a2524cf549 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Sat, 23 Apr 2016 17:15:05 -0700 Subject: [PATCH] Teach Vault how to register with Consul Vault will now register itself with Consul. The active node can be found using `active.vault.service.consul`. All standby vaults are available via `standby.vault.service.consul`. All unsealed vaults are considered healthy and available via `vault.service.consul`. Change in status and registration is event driven and should happen at the speed of a write to Consul (~network RTT + ~1x fsync(2)). Healthy/active: ``` curl -X GET 'http://127.0.0.1:8500/v1/health/service/vault?pretty' && echo; [ { "Node": { "Node": "vm1", "Address": "127.0.0.1", "TaggedAddresses": { "wan": "127.0.0.1" }, "CreateIndex": 3, "ModifyIndex": 20 }, "Service": { "ID": "vault:127.0.0.1:8200", "Service": "vault", "Tags": [ "active" ], "Address": "127.0.0.1", "Port": 8200, "EnableTagOverride": false, "CreateIndex": 17, "ModifyIndex": 20 }, "Checks": [ { "Node": "vm1", "CheckID": "serfHealth", "Name": "Serf Health Status", "Status": "passing", "Notes": "", "Output": "Agent alive and reachable", "ServiceID": "", "ServiceName": "", "CreateIndex": 3, "ModifyIndex": 3 }, { "Node": "vm1", "CheckID": "vault-sealed-check", "Name": "Vault Sealed Status", "Status": "passing", "Notes": "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server", "Output": "", "ServiceID": "vault:127.0.0.1:8200", "ServiceName": "vault", "CreateIndex": 19, "ModifyIndex": 19 } ] } ] ``` Healthy/standby: ``` [snip] "Service": { "ID": "vault:127.0.0.2:8200", "Service": "vault", "Tags": [ "standby" ], "Address": "127.0.0.2", "Port": 8200, "EnableTagOverride": false, "CreateIndex": 17, "ModifyIndex": 20 }, "Checks": [ { "Node": "vm2", "CheckID": "serfHealth", "Name": "Serf Health Status", "Status": "passing", "Notes": "", "Output": "Agent alive and reachable", "ServiceID": "", "ServiceName": "", "CreateIndex": 3, "ModifyIndex": 3 }, { "Node": "vm2", "CheckID": "vault-sealed-check", "Name": "Vault Sealed Status", "Status": "passing", "Notes": "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server", "Output": "", "ServiceID": "vault:127.0.0.2:8200", "ServiceName": "vault", "CreateIndex": 19, "ModifyIndex": 19 } ] } ] ``` Sealed: ``` "Checks": [ { "Node": "vm2", "CheckID": "serfHealth", "Name": "Serf Health Status", "Status": "passing", "Notes": "", "Output": "Agent alive and reachable", "ServiceID": "", "ServiceName": "", "CreateIndex": 3, "ModifyIndex": 3 }, { "Node": "vm2", "CheckID": "vault-sealed-check", "Name": "Vault Sealed Status", "Status": "critical", "Notes": "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server", "Output": "Vault Sealed", "ServiceID": "vault:127.0.0.2:8200", "ServiceName": "vault", "CreateIndex": 19, "ModifyIndex": 38 } ] ``` --- command/server.go | 6 + physical/consul.go | 293 +++++++++++++- physical/consul_test.go | 361 ++++++++++++++++++ .../hashicorp/consul/lib/cluster.go | 22 +- 4 files changed, 673 insertions(+), 9 deletions(-) diff --git a/command/server.go b/command/server.go index 0af41cddd2..87b5575019 100644 --- a/command/server.go +++ b/command/server.go @@ -203,6 +203,9 @@ func (c *ServerCommand) Run(args []string) int { if envAA := os.Getenv("VAULT_ADVERTISE_ADDR"); envAA != "" { coreConfig.AdvertiseAddr = envAA + if consulBackend, ok := (backend).(*physical.ConsulBackend); ok { + consulBackend.UpdateAdvertiseAddr(envAA) + } } // Attempt to detect the advertise address, if possible @@ -220,6 +223,9 @@ func (c *ServerCommand) Run(args []string) int { c.Ui.Error("Failed to detect advertise address.") } else { coreConfig.AdvertiseAddr = advertise + if consulBackend, ok := (backend).(*physical.ConsulBackend); ok { + consulBackend.UpdateAdvertiseAddr(advertise) + } } } diff --git a/physical/consul.go b/physical/consul.go index 00a59af3b5..4a639a982d 100644 --- a/physical/consul.go +++ b/physical/consul.go @@ -3,8 +3,11 @@ package physical import ( "fmt" "io/ioutil" + "net" + "net/url" "strconv" "strings" + "sync" "time" "crypto/tls" @@ -12,18 +15,50 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/errwrap" "github.com/hashicorp/go-cleanhttp" ) +const ( + // checkJitterFactor specifies the jitter factor used to stagger checks + checkJitterFactor = 16 + + // checkMinBuffer specifies provides a guarantee that a check will not + // be executed too close to the TTL check timeout + checkMinBuffer = 100 * time.Millisecond + + // defaultCheckTimeout changes the timeout of TTL checks + defaultCheckTimeout = 5 * time.Second + + // defaultCheckInterval specifies the default interval used to send + // checks + defaultCheckInterval = 4 * time.Second + + // defaultServiceName is the default Consul service name used when + // advertising a Vault instance. + defaultServiceName = "vault" +) + // ConsulBackend is a physical backend that stores data at specific // prefix within Consul. It is used for most production situations as // it allows Vault to run on multiple machines in a highly-available manner. type ConsulBackend struct { - path string - client *api.Client - kv *api.KV - permitPool *PermitPool + path string + client *api.Client + kv *api.KV + permitPool *PermitPool + serviceLock sync.RWMutex + service *api.AgentServiceRegistration + sealedCheck *api.AgentCheckRegistration + advertiseAddr string + consulClientConf *api.Config + serviceName string + running bool + active bool + sealed bool + checkTimeout time.Duration + checkTimer *time.Timer } // newConsulBackend constructs a Consul backend using the given API client @@ -43,6 +78,28 @@ func newConsulBackend(conf map[string]string) (Backend, error) { path = strings.TrimPrefix(path, "/") } + // Get the service name to advertise in Consul + service, ok := conf["service"] + if !ok { + service = defaultServiceName + } + + checkTimeout := defaultCheckTimeout + checkTimeoutStr, ok := conf["check_timeout"] + if ok { + d, err := time.ParseDuration(checkTimeoutStr) + if err != nil { + return nil, err + } + + min, _ := lib.DurationMinusBufferDomain(d, checkMinBuffer, checkJitterFactor) + if d < min { + return nil, fmt.Errorf("Consul check_timeout must be greater than %v", min) + } + + checkTimeout = d + } + // Configure the client consulConf := api.DefaultConfig() @@ -84,14 +141,234 @@ func newConsulBackend(conf map[string]string) (Backend, error) { // Setup the backend c := &ConsulBackend{ - path: path, - client: client, - kv: client.KV(), - permitPool: NewPermitPool(maxParInt), + path: path, + client: client, + kv: client.KV(), + permitPool: NewPermitPool(maxParInt), + consulClientConf: consulConf, + serviceName: service, + checkTimeout: checkTimeout, + checkTimer: time.NewTimer(checkTimeout), } return c, nil } +// UpdateAdvertiseAddr provides a pre-initialization hook for updating +// Consul's advertise address. +func (c *ConsulBackend) UpdateAdvertiseAddr(addr string) error { + if c.running { + return fmt.Errorf("service registration unable to update advertise address, backend already running") + } + + url, err := url.Parse(addr) + if err != nil { + return errwrap.Wrapf(fmt.Sprintf(`updating advertise address failed to parse URL "%v": {{err}}`, addr), err) + } + + _, portStr, err := net.SplitHostPort(url.Host) + if err != nil { + return errwrap.Wrapf(fmt.Sprintf(`updating advertise address failed to find a host:port in advertise address "%v": {{err}}`, url.Host), err) + } + _, err = strconv.ParseInt(portStr, 10, 0) + if err != nil { + return errwrap.Wrapf(fmt.Sprintf(`updating advertise address failed to parse port "%v": {{err}}`, portStr), err) + } + + c.advertiseAddr = addr + return nil +} + +// serviceTags returns all of the relevant tags for Consul. Assumes +// c.serviceLock held for writing. +func serviceTags(active bool) []string { + activeTag := "standby" + if active { + activeTag = "active" + } + return []string{activeTag} +} + +func (c *ConsulBackend) AdvertiseActive(active bool) error { + c.serviceLock.Lock() + defer c.serviceLock.Unlock() + + // Vault is still bootstrapping + if c.service == nil { + return nil + } + + c.service.Tags = serviceTags(active) + agent := c.client.Agent() + if err := agent.ServiceRegister(c.service); err != nil { + return errwrap.Wrapf("service registration failed: {{err}}", err) + } + + // Save a cached copy of the active state: no way to query Core + c.active = active + + return nil +} + +func (c *ConsulBackend) AdvertiseSealed(sealed bool) error { + c.serviceLock.Lock() + defer c.serviceLock.Unlock() + c.sealed = sealed + + // Vault is still bootstrapping + if c.service == nil { + return nil + } + + // Push a TTL check immediately to update the state + c.runCheck() + + return nil +} + +func (c *ConsulBackend) RunServiceDiscovery(shutdownCh ShutdownChannel) (err error) { + c.serviceLock.Lock() + defer c.serviceLock.Unlock() + + if c.running { + return fmt.Errorf("service registration routine already running") + } + + url, err := url.Parse(c.advertiseAddr) + if err != nil { + return errwrap.Wrapf(fmt.Sprintf(`service registration failed to parse URL "%v": {{err}}`, c.advertiseAddr), err) + } + + host, portStr, err := net.SplitHostPort(url.Host) + if err != nil { + return errwrap.Wrapf(fmt.Sprintf(`service registration failed to find a host:port in advertise address "%v": {{err}}`, url.Host), err) + } + port, err := strconv.ParseInt(portStr, 10, 0) + if err != nil { + return errwrap.Wrapf(fmt.Sprintf(`service registration failed to parse port "%v": {{err}}`, portStr), err) + } + + serviceID, err := c.serviceID() + if err != nil { + return err + } + + c.service = &api.AgentServiceRegistration{ + ID: serviceID, + Name: c.serviceName, + Tags: serviceTags(c.active), + Port: int(port), + Address: host, + EnableTagOverride: false, + } + + checkStatus := "failing" + if !c.sealed { + checkStatus = "passing" + } + + c.sealedCheck = &api.AgentCheckRegistration{ + ID: c.checkID(), + Name: "Vault Sealed Status", + Notes: "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server", + ServiceID: serviceID, + AgentServiceCheck: api.AgentServiceCheck{ + TTL: c.checkTimeout.String(), + Status: checkStatus, + }, + } + + agent := c.client.Agent() + if err := agent.ServiceRegister(c.service); err != nil { + return errwrap.Wrapf("service registration failed: {{err}}", err) + } + + if err := agent.CheckRegister(c.sealedCheck); err != nil { + return errwrap.Wrapf("service registration check registration failed: {{err}}", err) + } + + go c.checkRunner(shutdownCh) + c.running = true + + // Deregister upon shutdown + go func() { + shutdown: + for { + select { + case <-shutdownCh: + // wtb logger: log.Printf("[DEBUG]: Shutting down consul backend") + break shutdown + } + } + + if err := agent.ServiceDeregister(serviceID); err != nil { + // wtb logger: log.Printf("[WARNING]: service deregistration failed: {{err}}", err) + } + c.running = false + }() + + return nil +} + +// checkRunner periodically runs TTL checks +func (c *ConsulBackend) checkRunner(shutdownCh ShutdownChannel) { + defer c.checkTimer.Stop() + + for { + select { + case <-c.checkTimer.C: + go func() { + c.serviceLock.Lock() + defer c.serviceLock.Unlock() + c.runCheck() + }() + case <-shutdownCh: + return + } + } +} + +// runCheck immediately pushes a TTL check. Assumes c.serviceLock is held +// exclusively. +func (c *ConsulBackend) runCheck() { + // Reset timer before calling run check in order to not slide the + // window of the next check. + c.checkTimer.Reset(lib.DurationMinusBuffer(c.checkTimeout, checkMinBuffer, checkJitterFactor)) + + // Run a TTL check + agent := c.client.Agent() + if !c.sealed { + agent.UpdateTTL(c.checkID(), "Vault Unsealed", api.HealthPassing) + } else { + agent.UpdateTTL(c.checkID(), "Vault Sealed", api.HealthCritical) + } +} + +// checkID returns the ID used for a Consul Check. Assume at least a read +// lock is held. +func (c *ConsulBackend) checkID() string { + return "vault-sealed-check" +} + +// serviceID returns the Vault ServiceID for use in Consul. Assume at least +// a read lock is held. +func (c *ConsulBackend) serviceID() (string, error) { + url, err := url.Parse(c.advertiseAddr) + if err != nil { + return "", errwrap.Wrapf(fmt.Sprintf(`service registration failed to parse URL "%v": {{err}}`, c.advertiseAddr), err) + } + + host, portStr, err := net.SplitHostPort(url.Host) + if err != nil { + return "", errwrap.Wrapf(fmt.Sprintf(`service registration failed to find a host:port in advertise address "%v": {{err}}`, url.Host), err) + } + port, err := strconv.ParseInt(portStr, 10, 0) + if err != nil { + return "", errwrap.Wrapf(fmt.Sprintf(`service registration failed to parse port "%v": {{err}}`, portStr), err) + } + + return fmt.Sprintf("%s:%s:%d", c.serviceName, host, int(port)), nil +} + func setupTLSConfig(conf map[string]string) (*tls.Config, error) { serverName := strings.Split(conf["address"], ":") diff --git a/physical/consul_test.go b/physical/consul_test.go index b4d5d6bb9d..b343de0356 100644 --- a/physical/consul_test.go +++ b/physical/consul_test.go @@ -3,12 +3,373 @@ package physical import ( "fmt" "os" + "reflect" "testing" "time" "github.com/hashicorp/consul/api" ) +type consulConf map[string]string + +var ( + addrCount int = 0 +) + +func testHostIP() string { + a := addrCount + addrCount++ + return fmt.Sprintf("127.0.0.%d", a) +} + +func testConsulBackend(t *testing.T) *ConsulBackend { + return testConsulBackendConfig(t, &consulConf{}) +} + +func testConsulBackendConfig(t *testing.T, conf *consulConf) *ConsulBackend { + const serviceID = "vaultTestService" + be, err := newConsulBackend(*conf) + if err != nil { + t.Fatalf("Expected Consul to initialize: %v", err) + } + + c, ok := be.(*ConsulBackend) + if !ok { + t.Fatalf("Expected ConsulBackend") + } + + c.service = &api.AgentServiceRegistration{ + ID: serviceID, + Name: c.serviceName, + Tags: serviceTags(c.active), + Port: 8200, + Address: testHostIP(), + EnableTagOverride: false, + } + + c.sealedCheck = &api.AgentCheckRegistration{ + ID: c.checkID(), + Name: "Vault Sealed Status", + Notes: "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server", + ServiceID: serviceID, + AgentServiceCheck: api.AgentServiceCheck{ + TTL: c.checkTimeout.String(), + Status: api.HealthPassing, + }, + } + + return c +} + +func testConsul_testConsulBackend(t *testing.T) { + c := testConsulBackend(t) + if c == nil { + t.Fatalf("bad") + } + + if c.active != false { + t.Fatalf("bad") + } + + if c.sealed != false { + t.Fatalf("bad") + } + + if c.service == nil { + t.Fatalf("bad") + } + + if c.sealedCheck == nil { + t.Fatalf("bad") + } +} + +func TestConsul_newConsulBackend(t *testing.T) { + tests := []struct { + Name string + Config map[string]string + Fail bool + checkTimeout time.Duration + path string + service string + address string + scheme string + token string + max_parallel int + }{ + { + Name: "Valid default config", + Config: map[string]string{}, + checkTimeout: 5 * time.Second, + path: "vault", + service: "vault", + address: "127.0.0.1", + scheme: "http", + token: "", + max_parallel: 4, + }, + { + Name: "Valid modified config", + Config: map[string]string{ + "path": "seaTech/", + "service": "astronomy", + "check_timeout": "6s", + "address": "127.0.0.2", + "scheme": "https", + "token": "deadbeef-cafeefac-deadc0de-feedface", + "max_parallel": "4", + }, + checkTimeout: 6 * time.Second, + path: "seaTech/", + service: "astronomy", + address: "127.0.0.2", + scheme: "https", + token: "deadbeef-cafeefac-deadc0de-feedface", + max_parallel: 4, + }, + { + Config: map[string]string{ + "check_timeout": "99ms", + }, + Fail: true, + }, + } + + for _, test := range tests { + be, err := newConsulBackend(test.Config) + if test.Fail && err == nil { + t.Fatalf("Expected config %s to fail", test.Name) + } else if !test.Fail && err != nil { + t.Fatalf("Expected config %s to not fail: %v", test.Name, err) + } + + c, ok := be.(*ConsulBackend) + if !ok { + t.Fatalf("Expected ConsulBackend") + } + + if test.checkTimeout != c.checkTimeout { + t.Errorf("bad: %v != %v", test.checkTimeout, c.checkTimeout) + } + + if test.path != c.path { + t.Errorf("bad: %v != %v", test.path, c.path) + } + + if test.service != c.serviceName { + t.Errorf("bad: %v != %v", test.service, c.serviceName) + } + + if test.address != c.consulClientConf.Address { + t.Errorf("bad: %v != %v", test.address, c.consulClientConf.Address) + } + + if test.scheme != c.consulClientConf.Scheme { + t.Errorf("bad: %v != %v", test.scheme, c.consulClientConf.Scheme) + } + + if test.token != c.consulClientConf.Token { + t.Errorf("bad: %v != %v", test.token, c.consulClientConf.Token) + } + + // FIXME(sean@): Unable to test max_parallel + // if test.max_parallel != cap(c.permitPool) { + // t.Errorf("bad: %v != %v", test.max_parallel, cap(c.permitPool)) + // } + } +} + +func TestConsul_serviceTags(t *testing.T) { + tests := []struct { + active bool + tags []string + }{ + { + active: true, + tags: []string{"active"}, + }, + { + active: false, + tags: []string{"standby"}, + }, + } + + for _, test := range tests { + tags := serviceTags(test.active) + if !reflect.DeepEqual(tags[:], test.tags[:]) { + t.Errorf("Bad %v: %v %v", test.active, tags, test.tags) + } + } +} + +func TestConsul_UpdateAdvertiseAddr(t *testing.T) { + tests := []struct { + addr string + pass bool + }{ + { + addr: "http://127.0.0.1:8200/", + pass: true, + }, + { + addr: "http://127.0.0.1:8200", + pass: true, + }, + { + addr: "127.0.0.1:8200", + pass: false, + }, + { + addr: "127.0.0.1", + pass: false, + }, + } + for _, test := range tests { + c := testConsulBackend(t) + if c == nil { + t.Fatalf("bad") + } + + err := c.UpdateAdvertiseAddr(test.addr) + if test.pass { + if err != nil { + t.Fatalf("bad: %v", err) + } + } else { + if err == nil { + t.Fatalf("bad, expected fail") + } else { + continue + } + } + + if c.advertiseAddr != test.addr { + t.Fatalf("bad: %v != %v", c.advertiseAddr, test.addr) + } + } +} + +func TestConsul_AdvertiseActive(t *testing.T) { + c := testConsulBackend(t) + + if c.active != false { + t.Fatalf("bad") + } + + if err := c.AdvertiseActive(true); err != nil { + t.Fatalf("bad: %v", err) + } + + if err := c.AdvertiseActive(true); err != nil { + t.Fatalf("bad: %v", err) + } + + if err := c.AdvertiseActive(false); err != nil { + t.Fatalf("bad: %v", err) + } + + if err := c.AdvertiseActive(false); err != nil { + t.Fatalf("bad: %v", err) + } + + if err := c.AdvertiseActive(true); err != nil { + t.Fatalf("bad: %v", err) + } +} + +func TestConsul_AdvertiseSealed(t *testing.T) { + c := testConsulBackend(t) + + if c.sealed != false { + t.Fatalf("bad") + } + + if err := c.AdvertiseSealed(true); err != nil { + t.Fatalf("bad: %v", err) + } + if c.sealed != true { + t.Fatalf("bad") + } + + if err := c.AdvertiseSealed(true); err != nil { + t.Fatalf("bad: %v", err) + } + if c.sealed != true { + t.Fatalf("bad") + } + + if err := c.AdvertiseSealed(false); err != nil { + t.Fatalf("bad: %v", err) + } + if c.sealed != false { + t.Fatalf("bad") + } + + if err := c.AdvertiseSealed(false); err != nil { + t.Fatalf("bad: %v", err) + } + if c.sealed != false { + t.Fatalf("bad") + } + + if err := c.AdvertiseSealed(true); err != nil { + t.Fatalf("bad: %v", err) + } + if c.sealed != true { + t.Fatalf("bad") + } +} + +func TestConsul_checkID(t *testing.T) { + c := testConsulBackend(t) + if c.checkID() != "vault-sealed-check" { + t.Errorf("bad") + } +} + +func TestConsul_serviceID(t *testing.T) { + passingTests := []struct { + advertiseAddr string + serviceName string + expected string + }{ + { + advertiseAddr: "http://127.0.0.1:8200", + serviceName: "sea-tech-astronomy", + expected: "sea-tech-astronomy:127.0.0.1:8200", + }, + { + advertiseAddr: "http://127.0.0.1:8200/", + serviceName: "sea-tech-astronomy", + expected: "sea-tech-astronomy:127.0.0.1:8200", + }, + { + advertiseAddr: "https://127.0.0.1:8200/", + serviceName: "sea-tech-astronomy", + expected: "sea-tech-astronomy:127.0.0.1:8200", + }, + } + + for _, test := range passingTests { + c := testConsulBackendConfig(t, &consulConf{ + "service": test.serviceName, + }) + + if err := c.UpdateAdvertiseAddr(test.advertiseAddr); err != nil { + t.Fatalf("bad: %v", err) + } + + serviceID, err := c.serviceID() + if err != nil { + t.Fatalf("bad: %v", err) + } + + if serviceID != test.expected { + t.Fatalf("bad: %v != %v", serviceID, test.expected) + } + } +} + func TestConsulBackend(t *testing.T) { addr := os.Getenv("CONSUL_ADDR") if addr == "" { diff --git a/vendor/github.com/hashicorp/consul/lib/cluster.go b/vendor/github.com/hashicorp/consul/lib/cluster.go index 5df39251d8..a95232c573 100644 --- a/vendor/github.com/hashicorp/consul/lib/cluster.go +++ b/vendor/github.com/hashicorp/consul/lib/cluster.go @@ -10,12 +10,32 @@ import ( // servicing Consul TTL Checks in advance of the TTL. func DurationMinusBuffer(intv time.Duration, buffer time.Duration, jitter int64) time.Duration { d := intv - buffer - d -= RandomStagger(time.Duration(int64(d) / jitter)) + if jitter == 0 { + d -= RandomStagger(d) + } else { + d -= RandomStagger(time.Duration(int64(d) / jitter)) + } return d } +// DurationMinusBufferDomain returns the domain of valid durations from a +// call to DurationMinusBuffer. This function is used to check user +// specified input values to DurationMinusBuffer. +func DurationMinusBufferDomain(intv time.Duration, buffer time.Duration, jitter int64) (min time.Duration, max time.Duration) { + max = intv - buffer + if jitter == 0 { + min = max + } else { + min = max - time.Duration(int64(max)/jitter) + } + return min, max +} + // Returns a random stagger interval between 0 and the duration func RandomStagger(intv time.Duration) time.Duration { + if intv == 0 { + return 0 + } return time.Duration(uint64(rand.Int63()) % uint64(intv)) }