From 025b995f24e5b7254fdb96d40b45c88f7f01bf20 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Tue, 26 Apr 2016 15:37:13 -0700 Subject: [PATCH 1/7] Fix logger output Pointed out by: ryanuber --- physical/consul.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/physical/consul.go b/physical/consul.go index c5aacf9acf..30c1afb7b3 100644 --- a/physical/consul.go +++ b/physical/consul.go @@ -313,7 +313,7 @@ func (c *ConsulBackend) RunServiceDiscovery(shutdownCh ShutdownChannel, advertis } if err := agent.ServiceDeregister(serviceID); err != nil { - c.logger.Printf("[WARN]: consul: service deregistration failed: {{err}}", err) + c.logger.Printf("[WARN]: consul: service deregistration failed: %v", err) } c.running = false }() From eedd7f0c3949f867b12d1a5b5f80c6db8fdb4da8 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Thu, 28 Apr 2016 10:56:41 -0700 Subject: [PATCH 2/7] Change the interface of ServiceDiscovery Instead of passing state, signal that the state has changed and provide a callback handler that can query Core. --- command/server.go | 16 +++++++++++++++- physical/physical.go | 20 +++++++++++++------- vault/core.go | 32 ++++++++++++-------------------- 3 files changed, 40 insertions(+), 28 deletions(-) diff --git a/command/server.go b/command/server.go index 10b94d1e1b..88212dea85 100644 --- a/command/server.go +++ b/command/server.go @@ -290,7 +290,21 @@ func (c *ServerCommand) Run(args []string) int { if coreConfig.HAPhysical != nil { sd, ok := coreConfig.HAPhysical.(physical.ServiceDiscovery) if ok { - if err := sd.RunServiceDiscovery(c.ShutdownCh, coreConfig.AdvertiseAddr); err != nil { + activeFunc := func() bool { + if isLeader, _, err := core.Leader(); err != nil { + return isLeader + } + return false + } + + sealedFunc := func() bool { + if sealed, err := core.Sealed(); err != nil { + return sealed + } + return true + } + + if err := sd.RunServiceDiscovery(c.ShutdownCh, coreConfig.AdvertiseAddr, activeFunc, sealedFunc); err != nil { c.Ui.Error(fmt.Sprintf("Error initializing service discovery: %v", err)) return 1 } diff --git a/physical/physical.go b/physical/physical.go index 5b98815b7b..51b331685b 100644 --- a/physical/physical.go +++ b/physical/physical.go @@ -48,21 +48,27 @@ type AdvertiseDetect interface { DetectHostAddr() (string, error) } +// Callback signatures for RunServiceDiscovery +type activeFunction func() bool +type sealedFunction func() bool + // ServiceDiscovery is an optional interface that an HABackend can implement. // If they do, the state of a backend is advertised to the service discovery // network. type ServiceDiscovery interface { - // AdvertiseActive is used to reflect whether or not a backend is in - // an active or standby state. - AdvertiseActive(bool) error + // NotifyActiveStateChange is used by Core to notify a backend + // capable of ServiceDiscovery that this Vault instance has changed + // its status to active or standby. + NotifyActiveStateChange() error - // AdvertiseSealed is used to reflect whether or not a backend is in - // a sealed state or not. - AdvertiseSealed(bool) error + // NotifySealedStateChange is used by Core to notify a backend + // capable of ServiceDiscovery that Vault has changed its Sealed + // status to sealed or unsealed. + NotifySealedStateChange() error // Run executes any background service discovery tasks until the // shutdown channel is closed. - RunServiceDiscovery(shutdownCh ShutdownChannel, advertiseAddr string) error + RunServiceDiscovery(shutdownCh ShutdownChannel, advertiseAddr string, activeFunc activeFunction, sealedFunc sealedFunction) error } type Lock interface { diff --git a/vault/core.go b/vault/core.go index 4c209e0665..ff47c6047a 100644 --- a/vault/core.go +++ b/vault/core.go @@ -945,11 +945,9 @@ func (c *Core) Unseal(key []byte) (bool, error) { if c.ha != nil { sd, ok := c.ha.(physical.ServiceDiscovery) if ok { - go func() { - if err := sd.AdvertiseSealed(false); err != nil { - c.logger.Printf("[WARN] core: failed to advertise unsealed status: %v", err) - } - }() + if err := sd.NotifySealedStateChange(); err != nil { + c.logger.Printf("[WARN] core: failed to notify unsealed status: %v", err) + } } } return true, nil @@ -1100,11 +1098,9 @@ func (c *Core) sealInternal() error { if c.ha != nil { sd, ok := c.ha.(physical.ServiceDiscovery) if ok { - go func() { - if err := sd.AdvertiseSealed(true); err != nil { - c.logger.Printf("[WARN] core: failed to advertise sealed status: %v", err) - } - }() + if err := sd.NotifySealedStateChange(); err != nil { + c.logger.Printf("[WARN] core: failed to notify sealed status: %v", err) + } } } @@ -1424,11 +1420,9 @@ func (c *Core) advertiseLeader(uuid string, leaderLostCh <-chan struct{}) error sd, ok := c.ha.(physical.ServiceDiscovery) if ok { - go func() { - if err := sd.AdvertiseActive(true); err != nil { - c.logger.Printf("[WARN] core: failed to advertise active status: %v", err) - } - }() + if err := sd.NotifyActiveStateChange(); err != nil { + c.logger.Printf("[WARN] core: failed to notify active status: %v", err) + } } return nil } @@ -1460,11 +1454,9 @@ func (c *Core) clearLeader(uuid string) error { // Advertise ourselves as a standby sd, ok := c.ha.(physical.ServiceDiscovery) if ok { - go func() { - if err := sd.AdvertiseActive(false); err != nil { - c.logger.Printf("[WARN] core: failed to advertise standby status: %v", err) - } - }() + if err := sd.NotifyActiveStateChange(); err != nil { + c.logger.Printf("[WARN] core: failed to notify standby status: %v", err) + } } return err From f2fd046abfcd11b83881606ab1e2fe2fbd3aa092 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Thu, 28 Apr 2016 10:57:04 -0700 Subject: [PATCH 3/7] Name the output parameters for Leader --- vault/core.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vault/core.go b/vault/core.go index ff47c6047a..62f0afcfcb 100644 --- a/vault/core.go +++ b/vault/core.go @@ -784,7 +784,7 @@ func (c *Core) Standby() (bool, error) { } // Leader is used to get the current active leader -func (c *Core) Leader() (bool, string, error) { +func (c *Core) Leader() (isLeader bool, leaderAddr string, err error) { c.stateLock.RLock() defer c.stateLock.RUnlock() // Check if HA enabled From 46f48f79f400a122bc1f4499dbd56d3254f77456 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Thu, 28 Apr 2016 11:00:39 -0700 Subject: [PATCH 4/7] Debug log consul configuration parameters when set --- physical/consul.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/physical/consul.go b/physical/consul.go index 30c1afb7b3..054cc113a8 100644 --- a/physical/consul.go +++ b/physical/consul.go @@ -79,6 +79,7 @@ func newConsulBackend(conf map[string]string, logger *log.Logger) (Backend, erro if !ok { path = "vault/" } + logger.Printf("[DEBUG]: consul: config path set to %v", path) // Ensure path is suffixed but not prefixed if !strings.HasSuffix(path, "/") { @@ -100,12 +101,14 @@ func newConsulBackend(conf map[string]string, logger *log.Logger) (Backend, erro } disableRegistration = b } + logger.Printf("[DEBUG]: consul: config disable_registration set to %v", disableRegistration) // Get the service name to advertise in Consul service, ok := conf["service"] if !ok { service = defaultServiceName } + logger.Printf("[DEBUG]: consul: config service set to %s", service) checkTimeout := defaultCheckTimeout checkTimeoutStr, ok := conf["check_timeout"] @@ -121,6 +124,7 @@ func newConsulBackend(conf map[string]string, logger *log.Logger) (Backend, erro } checkTimeout = d + logger.Printf("[DEBUG]: consul: config check_timeout set to %v", d) } // Configure the client @@ -128,12 +132,15 @@ func newConsulBackend(conf map[string]string, logger *log.Logger) (Backend, erro if addr, ok := conf["address"]; ok { consulConf.Address = addr + logger.Printf("[DEBUG]: consul: config address set to %d", addr) } if scheme, ok := conf["scheme"]; ok { consulConf.Scheme = scheme + logger.Printf("[DEBUG]: consul: config scheme set to %d", scheme) } if token, ok := conf["token"]; ok { consulConf.Token = token + logger.Printf("[DEBUG]: consul: config token set") } if consulConf.Scheme == "https" { @@ -146,6 +153,7 @@ func newConsulBackend(conf map[string]string, logger *log.Logger) (Backend, erro transport.MaxIdleConnsPerHost = 4 transport.TLSClientConfig = tlsClientConfig consulConf.HttpClient.Transport = transport + logger.Printf("[DEBUG]: consul: configured TLS") } client, err := api.NewClient(consulConf) From 77dc1bd96b055489341b4129c31c430100f582a7 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Thu, 28 Apr 2016 11:04:49 -0700 Subject: [PATCH 5/7] Refactor the Consul Backend to match evented demuxer --- physical/consul.go | 517 ++++++++++++++++++++++------------------ physical/consul_test.go | 137 +++-------- 2 files changed, 311 insertions(+), 343 deletions(-) diff --git a/physical/consul.go b/physical/consul.go index 054cc113a8..d2c76104bb 100644 --- a/physical/consul.go +++ b/physical/consul.go @@ -30,22 +30,24 @@ const ( // be executed too close to the TTL check timeout checkMinBuffer = 100 * time.Millisecond + // consulRetryInterval specifies the retry duration to use when an + // API call to the Consul agent fails. + consulRetryInterval = 1 * time.Second + // defaultCheckTimeout changes the timeout of TTL checks defaultCheckTimeout = 5 * time.Second - // defaultCheckInterval specifies the default interval used to send - // checks - defaultCheckInterval = 4 * time.Second - // defaultServiceName is the default Consul service name used when // advertising a Vault instance. defaultServiceName = "vault" - // registrationRetryInterval specifies the retry duration to use when - // a registration to the Consul agent fails. - registrationRetryInterval = 1 * time.Second + // reconcileTimeout is how often Vault should query Consul to detect + // and fix any state drift. + reconcileTimeout = 60 * time.Second ) +type notifyEvent struct{} + // ConsulBackend is a physical backend that stores data at specific // prefix within Consul. It is used for most production situations as // it allows Vault to run on multiple machines in a highly-available manner. @@ -56,19 +58,14 @@ type ConsulBackend struct { kv *api.KV permitPool *PermitPool serviceLock sync.RWMutex - service *api.AgentServiceRegistration - sealedCheck *api.AgentCheckRegistration - registrationLock int64 advertiseHost string advertisePort int64 - consulClientConf *api.Config serviceName string - running bool - active bool - unsealed bool disableRegistration bool checkTimeout time.Duration - checkTimer *time.Timer + + notifyActiveCh chan notifyEvent + notifySealedCh chan notifyEvent } // newConsulBackend constructs a Consul backend using the given API client @@ -178,235 +175,13 @@ func newConsulBackend(conf map[string]string, logger *log.Logger) (Backend, erro client: client, kv: client.KV(), permitPool: NewPermitPool(maxParInt), - consulClientConf: consulConf, serviceName: service, checkTimeout: checkTimeout, - checkTimer: time.NewTimer(checkTimeout), disableRegistration: disableRegistration, } return c, nil } -// serviceTags returns all of the relevant tags for Consul. -func serviceTags(active bool) []string { - activeTag := "standby" - if active { - activeTag = "active" - } - return []string{activeTag} -} - -func (c *ConsulBackend) AdvertiseActive(active bool) error { - c.serviceLock.Lock() - defer c.serviceLock.Unlock() - - // Vault is still bootstrapping - if c.service == nil { - return nil - } - - // Save a cached copy of the active state: no way to query Core - c.active = active - - // Ensure serial registration to the Consul agent. Allow for - // concurrent calls to update active status while a single task - // attempts, until successful, to update the Consul Agent. - if !c.disableRegistration && atomic.CompareAndSwapInt64(&c.registrationLock, 0, 1) { - defer atomic.CompareAndSwapInt64(&c.registrationLock, 1, 0) - - // Retry agent registration until successful - for { - c.service.Tags = serviceTags(c.active) - agent := c.client.Agent() - err := agent.ServiceRegister(c.service) - if err == nil { - // Success - return nil - } - - c.logger.Printf("[WARN] consul: service registration failed: %v", err) - c.serviceLock.Unlock() - time.Sleep(registrationRetryInterval) - c.serviceLock.Lock() - - if !c.running { - // Shutting down - return err - } - } - } - - // Successful concurrent update to active state - return nil -} - -func (c *ConsulBackend) AdvertiseSealed(sealed bool) error { - c.serviceLock.Lock() - defer c.serviceLock.Unlock() - c.unsealed = !sealed - - // Vault is still bootstrapping - if c.service == nil { - return nil - } - - if !c.disableRegistration { - // Push a TTL check immediately to update the state - c.runCheck() - } - - return nil -} - -func (c *ConsulBackend) RunServiceDiscovery(shutdownCh ShutdownChannel, advertiseAddr string) (err error) { - c.serviceLock.Lock() - defer c.serviceLock.Unlock() - - if c.disableRegistration { - return nil - } - - if err := c.setAdvertiseAddr(advertiseAddr); err != nil { - return err - } - - serviceID := c.serviceID() - - c.service = &api.AgentServiceRegistration{ - ID: serviceID, - Name: c.serviceName, - Tags: serviceTags(c.active), - Port: int(c.advertisePort), - Address: c.advertiseHost, - EnableTagOverride: false, - } - - checkStatus := api.HealthCritical - if c.unsealed { - checkStatus = api.HealthPassing - } - - c.sealedCheck = &api.AgentCheckRegistration{ - ID: c.checkID(), - Name: "Vault Sealed Status", - Notes: "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server", - ServiceID: serviceID, - AgentServiceCheck: api.AgentServiceCheck{ - TTL: c.checkTimeout.String(), - Status: checkStatus, - }, - } - - agent := c.client.Agent() - if err := agent.ServiceRegister(c.service); err != nil { - return errwrap.Wrapf("service registration failed: {{err}}", err) - } - - if err := agent.CheckRegister(c.sealedCheck); err != nil { - return errwrap.Wrapf("service registration check registration failed: {{err}}", err) - } - - go c.checkRunner(shutdownCh) - c.running = true - - // Deregister upon shutdown - go func() { - shutdown: - for { - select { - case <-shutdownCh: - c.logger.Printf("[INFO]: consul: Shutting down consul backend") - break shutdown - } - } - - if err := agent.ServiceDeregister(serviceID); err != nil { - c.logger.Printf("[WARN]: consul: service deregistration failed: %v", err) - } - c.running = false - }() - - return nil -} - -// checkRunner periodically runs TTL checks -func (c *ConsulBackend) checkRunner(shutdownCh ShutdownChannel) { - defer c.checkTimer.Stop() - - for { - select { - case <-c.checkTimer.C: - go func() { - c.serviceLock.Lock() - defer c.serviceLock.Unlock() - c.runCheck() - }() - case <-shutdownCh: - return - } - } -} - -// runCheck immediately pushes a TTL check. Assumes c.serviceLock is held -// exclusively. -func (c *ConsulBackend) runCheck() { - // Reset timer before calling run check in order to not slide the - // window of the next check. - c.checkTimer.Reset(lib.DurationMinusBuffer(c.checkTimeout, checkMinBuffer, checkJitterFactor)) - - // Run a TTL check - agent := c.client.Agent() - if c.unsealed { - agent.PassTTL(c.checkID(), "Vault Unsealed") - } else { - agent.FailTTL(c.checkID(), "Vault Sealed") - } -} - -// checkID returns the ID used for a Consul Check. Assume at least a read -// lock is held. -func (c *ConsulBackend) checkID() string { - return "vault-sealed-check" -} - -// serviceID returns the Vault ServiceID for use in Consul. Assume at least -// a read lock is held. -func (c *ConsulBackend) serviceID() string { - return fmt.Sprintf("%s:%s:%d", c.serviceName, c.advertiseHost, c.advertisePort) -} - -func (c *ConsulBackend) setAdvertiseAddr(addr string) (err error) { - if addr == "" { - return fmt.Errorf("advertise address must not be empty") - } - - url, err := url.Parse(addr) - if err != nil { - return errwrap.Wrapf(fmt.Sprintf(`failed to parse advertise URL "%v": {{err}}`, addr), err) - } - - var portStr string - c.advertiseHost, portStr, err = net.SplitHostPort(url.Host) - if err != nil { - if url.Scheme == "http" { - portStr = "80" - } else if url.Scheme == "https" { - portStr = "443" - } else if url.Scheme == "unix" { - portStr = "-1" - c.advertiseHost = url.Path - } else { - return errwrap.Wrapf(fmt.Sprintf(`failed to find a host:port in advertise address "%v": {{err}}`, url.Host), err) - } - } - c.advertisePort, err = strconv.ParseInt(portStr, 10, 0) - if err != nil || c.advertisePort < -1 || c.advertisePort > 65535 { - return errwrap.Wrapf(fmt.Sprintf(`failed to parse valid port "%v": {{err}}`, portStr), err) - } - - return nil -} - func setupTLSConfig(conf map[string]string) (*tls.Config, error) { serverName := strings.Split(conf["address"], ":") @@ -585,3 +360,271 @@ func (c *ConsulLock) Value() (bool, string, error) { value := string(pair.Value) return held, value, nil } + +func (c *ConsulBackend) NotifyActiveStateChange() error { + select { + case c.notifyActiveCh <- notifyEvent{}: + default: + // NOTE: If this occurs Vault's active status could be out of + // sync with Consul until reconcileTimer expires. + c.logger.Printf("[WARN]: consul: Concurrent state change notify dropped") + } + + return nil +} + +func (c *ConsulBackend) NotifySealedStateChange() error { + select { + case c.notifySealedCh <- notifyEvent{}: + return nil + default: + // NOTE: If this occurs Vault's sealed status could be out of + // sync with Consul until checkTimer expires. + c.logger.Printf("[WARN]: consul: Concurrent sealed state change notify dropped") + } + + return nil +} + +func (c *ConsulBackend) checkDuration() time.Duration { + return lib.DurationMinusBuffer(c.checkTimeout, checkMinBuffer, checkJitterFactor) +} + +func (c *ConsulBackend) RunServiceDiscovery(shutdownCh ShutdownChannel, advertiseAddr string, activeFunc activeFunction, sealedFunc sealedFunction) (err error) { + if err := c.setAdvertiseAddr(advertiseAddr); err != nil { + return err + } + + go c.runEventDemuxer(shutdownCh, advertiseAddr, activeFunc, sealedFunc) + + return nil +} + +func (c *ConsulBackend) runEventDemuxer(shutdownCh ShutdownChannel, advertiseAddr string, activeFunc activeFunction, sealedFunc sealedFunction) { + // Fire the reconcileTimer immediately upon starting the event demuxer + reconcileTimer := time.NewTimer(0) + defer reconcileTimer.Stop() + + // Schedule the first check. Consul TTL checks are passing by + // default, checkTimer does not need to be run immediately. + checkTimer := time.NewTimer(c.checkDuration()) + defer checkTimer.Stop() + + // Use a reactor pattern to handle and dispatch events to singleton + // goroutine handlers for execution. It is not acceptable to drop + // inbound events from Notify*(). + // + // goroutines are dispatched if the demuxer can acquire a lock (via + // an atomic CAS incr) on the handler. Handlers are responsible for + // deregistering themselves (atomic CAS decr). Handlers and the + // demuxer share a lock to synchronize information at the beginning + // and end of a handler's life (or after a handler wakes up from + // sleeping during a back-off/retry). + var shutdown bool + var checkLock int64 + var registeredServiceID string + var serviceRegLock int64 +shutdown: + for { + select { + case <-c.notifyActiveCh: + // Run reconcile immediately upon active state change notification + reconcileTimer.Reset(0) + case <-c.notifySealedCh: + // Run check timer immediately upon a seal state change notification + checkTimer.Reset(0) + case <-reconcileTimer.C: + // Unconditionally rearm the reconcileTimer + reconcileTimer.Reset(reconcileTimeout - lib.RandomStagger(reconcileTimeout/checkJitterFactor)) + + // Abort if service discovery is disabled or a + // reconcile handler is already active + if !c.disableRegistration && atomic.CompareAndSwapInt64(&serviceRegLock, 0, 1) { + // Enter handler with serviceRegLock held + go func() { + defer atomic.CompareAndSwapInt64(&serviceRegLock, 1, 0) + for !shutdown { + serviceID, err := c.reconcileConsul(registeredServiceID, activeFunc, sealedFunc) + if err != nil { + c.logger.Printf("[WARN]: consul: reconcile unable to talk with Consul backend: %v", err) + time.Sleep(consulRetryInterval) + continue + } + + c.serviceLock.Lock() + defer c.serviceLock.Unlock() + + registeredServiceID = serviceID + return + } + }() + } + case <-checkTimer.C: + checkTimer.Reset(c.checkDuration()) + // Abort if service discovery is disabled or a + // reconcile handler is active + if !c.disableRegistration && atomic.CompareAndSwapInt64(&checkLock, 0, 1) { + // Enter handler with serviceRegLock held + go func() { + defer atomic.CompareAndSwapInt64(&checkLock, 1, 0) + for !shutdown { + unsealed := sealedFunc() + if err := c.runCheck(unsealed); err != nil { + c.logger.Printf("[WARN]: consul: check unable to talk with Consul backend: %v", err) + time.Sleep(consulRetryInterval) + continue + } + return + } + }() + } + case <-shutdownCh: + c.logger.Printf("[INFO]: consul: Shutting down consul backend") + shutdown = true + break shutdown + } + } + + c.serviceLock.RLock() + defer c.serviceLock.RUnlock() + if err := c.client.Agent().ServiceDeregister(registeredServiceID); err != nil { + c.logger.Printf("[WARN]: consul: service deregistration failed: %v", err) + } +} + +// checkID returns the ID used for a Consul Check. Assume at least a read +// lock is held. +func (c *ConsulBackend) checkID() string { + return "vault-sealed-check" +} + +// reconcileConsul queries the state of Vault Core and Consul and fixes up +// Consul's state according to what's in Vault. reconcileConsul is called +// without any locks held and can be run concurrently, therefore no changes +// to ConsulBackend can be made in this method (i.e. wtb const receiver for +// compiler enforced safety). +func (c *ConsulBackend) reconcileConsul(registeredServiceID string, activeFunc activeFunction, sealedFunc sealedFunction) (serviceID string, err error) { + // Query vault Core for its current state + active := activeFunc() + sealed := sealedFunc() + + agent := c.client.Agent() + + // Get the current state of Vault from Consul + var currentVaultService *api.AgentService + if services, err := agent.Services(); err == nil { + if service, ok := services[c.serviceName]; ok { + currentVaultService = service + } + } + + serviceID = c.serviceID() + tags := serviceTags(active) + + var reregister bool + if currentVaultService == nil || registeredServiceID == "" { + reregister = true + } else { + if len(currentVaultService.Tags) != 1 || currentVaultService.Tags[0] != tags[0] { + reregister = true + } + } + + if !reregister { + return "", nil + } + + service := &api.AgentServiceRegistration{ + ID: serviceID, + Name: c.serviceName, + Tags: tags, + Port: int(c.advertisePort), + Address: c.advertiseHost, + EnableTagOverride: false, + } + + checkStatus := api.HealthCritical + if !sealed { + checkStatus = api.HealthPassing + } + + sealedCheck := &api.AgentCheckRegistration{ + ID: c.checkID(), + Name: "Vault Sealed Status", + Notes: "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server", + ServiceID: serviceID, + AgentServiceCheck: api.AgentServiceCheck{ + TTL: c.checkTimeout.String(), + Status: checkStatus, + }, + } + + if err := agent.ServiceRegister(service); err != nil { + return "", errwrap.Wrapf(`service registration failed: {{err}}`, err) + } + + if err := agent.CheckRegister(sealedCheck); err != nil { + return serviceID, errwrap.Wrapf(`service check registration failed: {{err}}`, err) + } + + return serviceID, nil +} + +// runCheck immediately pushes a TTL check. Assumes c.serviceLock is held +// exclusively. +func (c *ConsulBackend) runCheck(unsealed bool) error { + // Run a TTL check + agent := c.client.Agent() + if unsealed { + return agent.PassTTL(c.checkID(), "Vault Unsealed") + } else { + return agent.FailTTL(c.checkID(), "Vault Sealed") + } +} + +// serviceID returns the Vault ServiceID for use in Consul. Assume at least +// a read lock is held. +func (c *ConsulBackend) serviceID() string { + return fmt.Sprintf("%s:%s:%d", c.serviceName, c.advertiseHost, c.advertisePort) +} + +// serviceTags returns all of the relevant tags for Consul. +func serviceTags(active bool) []string { + activeTag := "standby" + if active { + activeTag = "active" + } + return []string{activeTag} +} + +func (c *ConsulBackend) setAdvertiseAddr(addr string) (err error) { + if addr == "" { + return fmt.Errorf("advertise address must not be empty") + } + + url, err := url.Parse(addr) + if err != nil { + return errwrap.Wrapf(fmt.Sprintf(`failed to parse advertise URL "%v": {{err}}`, addr), err) + } + + var portStr string + c.advertiseHost, portStr, err = net.SplitHostPort(url.Host) + if err != nil { + if url.Scheme == "http" { + portStr = "80" + } else if url.Scheme == "https" { + portStr = "443" + } else if url.Scheme == "unix" { + portStr = "-1" + c.advertiseHost = url.Path + } else { + return errwrap.Wrapf(fmt.Sprintf(`failed to find a host:port in advertise address "%v": {{err}}`, url.Host), err) + } + } + c.advertisePort, err = strconv.ParseInt(portStr, 10, 0) + if err != nil || c.advertisePort < -1 || c.advertisePort > 65535 { + return errwrap.Wrapf(fmt.Sprintf(`failed to parse valid port "%v": {{err}}`, portStr), err) + } + + return nil +} diff --git a/physical/consul_test.go b/physical/consul_test.go index e7d4c3cbca..32284e7719 100644 --- a/physical/consul_test.go +++ b/physical/consul_test.go @@ -3,6 +3,7 @@ package physical import ( "fmt" "log" + "math/rand" "os" "reflect" "testing" @@ -39,28 +40,6 @@ func testConsulBackendConfig(t *testing.T, conf *consulConf) *ConsulBackend { t.Fatalf("Expected ConsulBackend") } - c.consulClientConf = api.DefaultConfig() - - c.service = &api.AgentServiceRegistration{ - ID: c.serviceID(), - Name: c.serviceName, - Tags: serviceTags(c.active), - Port: 8200, - Address: testHostIP(), - EnableTagOverride: false, - } - - c.sealedCheck = &api.AgentCheckRegistration{ - ID: c.checkID(), - Name: "Vault Sealed Status", - Notes: "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server", - ServiceID: c.serviceID(), - AgentServiceCheck: api.AgentServiceCheck{ - TTL: c.checkTimeout.String(), - Status: api.HealthPassing, - }, - } - return c } @@ -69,21 +48,27 @@ func testConsul_testConsulBackend(t *testing.T) { if c == nil { t.Fatalf("bad") } +} - if c.active != false { - t.Fatalf("bad") +func testActiveFunc(activePct float64) activeFunction { + return func() bool { + var active bool + standbyProb := rand.Float64() + if standbyProb > activePct { + active = true + } + return active } +} - if c.unsealed != false { - t.Fatalf("bad") - } - - if c.service == nil { - t.Fatalf("bad") - } - - if c.sealedCheck == nil { - t.Fatalf("bad") +func testSealedFunc(sealedPct float64) sealedFunction { + return func() bool { + var sealed bool + unsealedProb := rand.Float64() + if unsealedProb > sealedPct { + sealed = true + } + return sealed } } @@ -165,8 +150,15 @@ func TestConsul_newConsulBackend(t *testing.T) { } c.disableRegistration = true + if c.disableRegistration == false { + addr := os.Getenv("CONSUL_HTTP_ADDR") + if addr == "" { + continue + } + } + var shutdownCh ShutdownChannel - if err := c.RunServiceDiscovery(shutdownCh, test.advertiseAddr); err != nil { + if err := c.RunServiceDiscovery(shutdownCh, test.advertiseAddr, testActiveFunc(0.5), testSealedFunc(0.5)); err != nil { t.Fatalf("bad: %v", err) } @@ -182,18 +174,6 @@ func TestConsul_newConsulBackend(t *testing.T) { t.Errorf("bad: %v != %v", test.service, c.serviceName) } - if test.address != c.consulClientConf.Address { - t.Errorf("bad: %s %v != %v", test.name, test.address, c.consulClientConf.Address) - } - - if test.scheme != c.consulClientConf.Scheme { - t.Errorf("bad: %v != %v", test.scheme, c.consulClientConf.Scheme) - } - - if test.token != c.consulClientConf.Token { - t.Errorf("bad: %v != %v", test.token, c.consulClientConf.Token) - } - // FIXME(sean@): Unable to test max_parallel // if test.max_parallel != cap(c.permitPool) { // t.Errorf("bad: %v != %v", test.max_parallel, cap(c.permitPool)) @@ -289,7 +269,7 @@ func TestConsul_setAdvertiseAddr(t *testing.T) { } } -func TestConsul_AdvertiseActive(t *testing.T) { +func TestConsul_NotifyActiveStateChange(t *testing.T) { addr := os.Getenv("CONSUL_HTTP_ADDR") if addr == "" { t.Skipf("No consul process running, skipping test") @@ -297,32 +277,12 @@ func TestConsul_AdvertiseActive(t *testing.T) { c := testConsulBackend(t) - if c.active != false { - t.Fatalf("bad") - } - - if err := c.AdvertiseActive(true); err != nil { - t.Fatalf("bad: %v", err) - } - - if err := c.AdvertiseActive(true); err != nil { - t.Fatalf("bad: %v", err) - } - - if err := c.AdvertiseActive(false); err != nil { - t.Fatalf("bad: %v", err) - } - - if err := c.AdvertiseActive(false); err != nil { - t.Fatalf("bad: %v", err) - } - - if err := c.AdvertiseActive(true); err != nil { + if err := c.NotifyActiveStateChange(); err != nil { t.Fatalf("bad: %v", err) } } -func TestConsul_AdvertiseSealed(t *testing.T) { +func TestConsul_NotifySealedStateChange(t *testing.T) { addr := os.Getenv("CONSUL_HTTP_ADDR") if addr == "" { t.Skipf("No consul process running, skipping test") @@ -330,44 +290,9 @@ func TestConsul_AdvertiseSealed(t *testing.T) { c := testConsulBackend(t) - if c.unsealed == true { - t.Fatalf("bad") - } - - if err := c.AdvertiseSealed(true); err != nil { + if err := c.NotifySealedStateChange(); err != nil { t.Fatalf("bad: %v", err) } - if c.unsealed == true { - t.Fatalf("bad") - } - - if err := c.AdvertiseSealed(true); err != nil { - t.Fatalf("bad: %v", err) - } - if c.unsealed == true { - t.Fatalf("bad") - } - - if err := c.AdvertiseSealed(false); err != nil { - t.Fatalf("bad: %v", err) - } - if c.unsealed == false { - t.Fatalf("bad") - } - - if err := c.AdvertiseSealed(false); err != nil { - t.Fatalf("bad: %v", err) - } - if c.unsealed == false { - t.Fatalf("bad") - } - - if err := c.AdvertiseSealed(true); err != nil { - t.Fatalf("bad: %v", err) - } - if c.unsealed == true { - t.Fatalf("bad") - } } func TestConsul_checkID(t *testing.T) { From 01fac00f20f4cae6fd47798004f70729c00ccea9 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Thu, 28 Apr 2016 13:16:17 -0700 Subject: [PATCH 6/7] Remove useless return statement --- physical/consul.go | 1 - 1 file changed, 1 deletion(-) diff --git a/physical/consul.go b/physical/consul.go index d2c76104bb..f9c26dc732 100644 --- a/physical/consul.go +++ b/physical/consul.go @@ -376,7 +376,6 @@ func (c *ConsulBackend) NotifyActiveStateChange() error { func (c *ConsulBackend) NotifySealedStateChange() error { select { case c.notifySealedCh <- notifyEvent{}: - return nil default: // NOTE: If this occurs Vault's sealed status could be out of // sync with Consul until checkTimer expires. From 1df788a73cd3273286f44ecaa32de85e9ab4a110 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Fri, 29 Apr 2016 11:35:33 -0700 Subject: [PATCH 7/7] Be idiomatic. Use a switch instead of if/else --- physical/consul.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/physical/consul.go b/physical/consul.go index f9c26dc732..11dfaf048b 100644 --- a/physical/consul.go +++ b/physical/consul.go @@ -521,10 +521,14 @@ func (c *ConsulBackend) reconcileConsul(registeredServiceID string, activeFunc a tags := serviceTags(active) var reregister bool - if currentVaultService == nil || registeredServiceID == "" { + switch { + case currentVaultService == nil, + registeredServiceID == "": reregister = true - } else { - if len(currentVaultService.Tags) != 1 || currentVaultService.Tags[0] != tags[0] { + default: + switch { + case len(currentVaultService.Tags) != 1, + currentVaultService.Tags[0] != tags[0]: reregister = true } }