diff --git a/physical/consul.go b/physical/consul.go index c1a399801c..1dc8901edd 100644 --- a/physical/consul.go +++ b/physical/consul.go @@ -111,3 +111,52 @@ func (c *ConsulBackend) List(prefix string) ([]string, error) { sort.Strings(out) return out, err } + +// Lock is used for mutual exclusion based on the given key. +func (c *ConsulBackend) LockWith(key, value string) (Lock, error) { + // Create the lock + opts := &api.LockOptions{ + Key: key, + Value: []byte(value), + SessionName: "Vault Lock", + } + lock, err := c.client.LockOpts(opts) + if err != nil { + return nil, fmt.Errorf("failed to create lock: %v", err) + } + cl := &ConsulLock{ + client: c.client, + key: key, + lock: lock, + } + return cl, nil +} + +// ConsulLock is used to provide the Lock interface backed by Consul +type ConsulLock struct { + client *api.Client + key string + lock *api.Lock +} + +func (c *ConsulLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) { + return c.lock.Lock(stopCh) +} + +func (c *ConsulLock) Unlock() error { + return c.lock.Unlock() +} + +func (c *ConsulLock) Value() (bool, string, error) { + kv := c.client.KV() + pair, _, err := kv.Get(c.key, nil) + if err != nil { + return false, "", err + } + if pair == nil { + return false, "", nil + } + held := pair.Session != "" + value := string(pair.Value) + return held, value, nil +} diff --git a/physical/consul_test.go b/physical/consul_test.go index 6d2064e179..38a0da6ce4 100644 --- a/physical/consul_test.go +++ b/physical/consul_test.go @@ -37,4 +37,10 @@ func TestConsulBackend(t *testing.T) { testBackend(t, b) testBackend_ListPrefix(t, b) + + ha, ok := b.(HABackend) + if !ok { + t.Fatalf("consul does not implement HABackend") + } + testHABackend(t, ha, ha) } diff --git a/physical/inmem_ha.go b/physical/inmem_ha.go new file mode 100644 index 0000000000..92ee314f77 --- /dev/null +++ b/physical/inmem_ha.go @@ -0,0 +1,119 @@ +package physical + +import ( + "fmt" + "sync" +) + +type InmemHABackend struct { + InmemBackend + locks map[string]string + l sync.Mutex + cond *sync.Cond +} + +// NewInmemHA constructs a new in-memory HA backend. This is only for testing. +func NewInmemHA() *InmemHABackend { + in := &InmemHABackend{ + InmemBackend: *NewInmem(), + locks: make(map[string]string), + } + in.cond = sync.NewCond(&in.l) + return in +} + +// LockWith is used for mutual exclusion based on the given key. +func (i *InmemHABackend) LockWith(key, value string) (Lock, error) { + l := &InmemLock{ + in: i, + key: key, + value: value, + } + return l, nil +} + +// InmemLock is an in-memory Lock implementation for the HABackend +type InmemLock struct { + in *InmemHABackend + key string + value string + + held bool + leaderCh chan struct{} + l sync.Mutex +} + +func (i *InmemLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) { + i.l.Lock() + defer i.l.Unlock() + if i.held { + return nil, fmt.Errorf("lock already held") + } + + // Attempt an async acquisition + didLock := make(chan struct{}) + releaseCh := make(chan bool, 1) + go func() { + // Wait to acquire the lock + i.in.l.Lock() + _, ok := i.in.locks[i.key] + for ok { + i.in.cond.Wait() + _, ok = i.in.locks[i.key] + } + i.in.locks[i.key] = i.value + i.in.l.Unlock() + + // Signal that lock is held + close(didLock) + + // Handle an early abort + release := <-releaseCh + if release { + i.in.l.Lock() + delete(i.in.locks, i.key) + i.in.l.Unlock() + i.in.cond.Broadcast() + } + }() + + // Wait for lock acquisition or shutdown + select { + case <-didLock: + releaseCh <- false + case <-stopCh: + releaseCh <- true + return nil, nil + } + + // Create the leader channel + i.held = true + i.leaderCh = make(chan struct{}) + return i.leaderCh, nil +} + +func (i *InmemLock) Unlock() error { + i.l.Lock() + defer i.l.Unlock() + + if !i.held { + return nil + } + + close(i.leaderCh) + i.leaderCh = nil + i.held = false + + i.in.l.Lock() + delete(i.in.locks, i.key) + i.in.l.Unlock() + i.in.cond.Broadcast() + return nil +} + +func (i *InmemLock) Value() (bool, string, error) { + i.in.l.Lock() + val, ok := i.in.locks[i.key] + i.in.l.Unlock() + return ok, val, nil +} diff --git a/physical/inmem_ha_test.go b/physical/inmem_ha_test.go new file mode 100644 index 0000000000..a13f8c7977 --- /dev/null +++ b/physical/inmem_ha_test.go @@ -0,0 +1,8 @@ +package physical + +import "testing" + +func TestInmemHA(t *testing.T) { + inm := NewInmemHA() + testHABackend(t, inm, inm) +} diff --git a/physical/physical.go b/physical/physical.go index 63ffe19f4b..51b607ac33 100644 --- a/physical/physical.go +++ b/physical/physical.go @@ -23,6 +23,29 @@ type Backend interface { List(prefix string) ([]string, error) } +// HABackend is an extentions to the standard physical +// backend to support high-availability. Vault only expects to +// use mutual exclusion to allow multiple instances to act as a +// hot standby for a leader that services all requests. +type HABackend interface { + // LockWith is used for mutual exclusion based on the given key. + LockWith(key, value string) (Lock, error) +} + +type Lock interface { + // Lock is used to acquire the given lock + // The stopCh is optional and if closed should interrupt the lock + // acquisition attempt. The return struct should be closed when + // leadership is lost. + Lock(stopCh <-chan struct{}) (<-chan struct{}, error) + + // Unlock is used to release the lock + Unlock() error + + // Returns the value of the lock and if it is held + Value() (bool, string, error) +} + // Entry is used to represent data stored by the physical backend type Entry struct { Key string diff --git a/physical/physical_test.go b/physical/physical_test.go index 2d0dbec479..f6fce1a11a 100644 --- a/physical/physical_test.go +++ b/physical/physical_test.go @@ -3,6 +3,7 @@ package physical import ( "reflect" "testing" + "time" ) func testNewBackend(t *testing.T) { @@ -159,3 +160,77 @@ func testBackend_ListPrefix(t *testing.T, b Backend) { t.Fatalf("bad: %v", keys) } } + +func testHABackend(t *testing.T, b HABackend, b2 HABackend) { + // Get the lock + lock, err := b.LockWith("foo", "bar") + if err != nil { + t.Fatalf("err: %v", err) + } + + // Attempt to lock + leaderCh, err := lock.Lock(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if leaderCh == nil { + t.Fatalf("failed to get leader ch") + } + + // Check the value + held, val, err := lock.Value() + if err != nil { + t.Fatalf("err: %v", err) + } + if !held { + t.Fatalf("should be held") + } + if val != "bar" { + t.Fatalf("bad value: %v", err) + } + + // Second acquisition should fail + lock2, err := b2.LockWith("foo", "baz") + if err != nil { + t.Fatalf("err: %v", err) + } + + // Cancel attempt in 50 msec + stopCh := make(chan struct{}) + time.AfterFunc(50*time.Millisecond, func() { + close(stopCh) + }) + + // Attempt to lock + leaderCh2, err := lock2.Lock(stopCh) + if err != nil { + t.Fatalf("err: %v", err) + } + if leaderCh2 != nil { + t.Fatalf("should not get leader ch") + } + + // Release the first lock + lock.Unlock() + + // Attempt to lock should work + leaderCh2, err = lock2.Lock(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if leaderCh2 == nil { + t.Fatalf("should get leader ch") + } + + // Check the value + held, val, err = lock.Value() + if err != nil { + t.Fatalf("err: %v", err) + } + if !held { + t.Fatalf("should be held") + } + if val != "baz" { + t.Fatalf("bad value: %v", err) + } +} diff --git a/vault/core.go b/vault/core.go index a0ab724e47..2745a44771 100644 --- a/vault/core.go +++ b/vault/core.go @@ -24,6 +24,18 @@ const ( // it even with the Vault sealed. This is required so that we know // how many secret parts must be used to reconstruct the master key. coreSealConfigPath = "core/seal-config" + + // coreLockPath is the path used to acquire a coordinating lock + // for a highly-available deploy. + coreLockPath = "core/lock" + + // coreLeaderPrefix is the prefix used for the UUID that contains + // the currently elected leader. + coreLeaderPrefix = "core/leader/" + + // lockRetryInterval is the interval we re-attempt to acquire the + // HA lock if an error is encountered + lockRetryInterval = 10 * time.Second ) var ( @@ -31,6 +43,10 @@ var ( // a sealed barrier. No operation is expected to succeed before unsealing ErrSealed = errors.New("Vault is sealed") + // ErrStandby is returned if an operation is performed on + // a standby Vault. No operation is expected to succeed until active. + ErrStandby = errors.New("Vault is in standby mode") + // ErrAlreadyInit is returned if the core is already // initialized. This prevents a re-initialization. ErrAlreadyInit = errors.New("Vault is already initialized") @@ -42,6 +58,10 @@ var ( // ErrInternalError is returned when we don't want to leak // any information about an internal error ErrInternalError = errors.New("internal error") + + // ErrHANotEnabled is returned if the operation only makes sense + // in an HA setting + ErrHANotEnabled = errors.New("Vault is not configured for highly-available mode") ) // SealConfig is used to describe the seal configuration @@ -96,6 +116,12 @@ func (e *ErrInvalidKey) Error() string { // interface for API handlers and is responsible for managing the logical and physical // backends, router, security barrier, and audit trails. type Core struct { + // HABackend may be available depending on the physical backend + ha physical.HABackend + + // AdvertiseAddr is the address we advertise as leader if held + advertiseAddr string + // physical backend is the un-trusted backend with durable data physical physical.Backend @@ -118,6 +144,10 @@ type Core struct { stateLock sync.RWMutex sealed bool + standby bool + standbyDoneCh chan struct{} + standbyStopCh chan struct{} + // unlockParts has the keys provided to Unseal until // the threshold number of parts is available. unlockParts [][]byte @@ -167,12 +197,22 @@ type CoreConfig struct { AuditBackends map[string]audit.Factory Physical physical.Backend Logger *log.Logger - DisableCache bool // Disables the LRU cache on the physical backend - CacheSize int // Custom cache size of zero for default + DisableCache bool // Disables the LRU cache on the physical backend + CacheSize int // Custom cache size of zero for default + AdvertiseAddr string // Set as the leader address for HA } // NewCore isk used to construct a new core func NewCore(conf *CoreConfig) (*Core, error) { + // Check if this backend supports an HA configuraiton + var haBackend physical.HABackend + if ha, ok := conf.Physical.(physical.HABackend); ok { + haBackend = ha + } + if haBackend != nil && conf.AdvertiseAddr == "" { + return nil, fmt.Errorf("missing advertisement address") + } + // Wrap the backend in a cache unless disabled if !conf.DisableCache { _, isCache := conf.Physical.(*physical.Cache) @@ -196,11 +236,14 @@ func NewCore(conf *CoreConfig) (*Core, error) { // Setup the core c := &Core{ - physical: conf.Physical, - barrier: barrier, - router: NewRouter(), - sealed: true, - logger: conf.Logger, + ha: haBackend, + advertiseAddr: conf.AdvertiseAddr, + physical: conf.Physical, + barrier: barrier, + router: NewRouter(), + sealed: true, + standby: true, + logger: conf.Logger, } // Setup the backends @@ -238,6 +281,9 @@ func (c *Core) HandleRequest(req *logical.Request) (*logical.Response, error) { if c.sealed { return nil, ErrSealed } + if c.standby { + return nil, ErrStandby + } if c.router.LoginPath(req.Path) { return c.handleLoginRequest(req) @@ -582,6 +628,61 @@ func (c *Core) Sealed() (bool, error) { return c.sealed, nil } +// Standby checks if the Vault is in standby mode +func (c *Core) Standby() (bool, error) { + c.stateLock.RLock() + defer c.stateLock.RUnlock() + return c.standby, nil +} + +// Leader is used to get the current active leader +func (c *Core) Leader() (bool, string, error) { + c.stateLock.RLock() + defer c.stateLock.RUnlock() + // Check if sealed + if c.sealed { + return false, "", ErrSealed + } + + // Check if HA enabled + if c.ha == nil { + return false, "", ErrHANotEnabled + } + + // Check if we are the leader + if !c.standby { + return true, c.advertiseAddr, nil + } + + // Initialize a lock + lock, err := c.ha.LockWith(coreLockPath, "read") + if err != nil { + return false, "", err + } + + // Read the value + held, value, err := lock.Value() + if err != nil { + return false, "", err + } + if !held { + return false, "", nil + } + + // Value is the UUID of the leader, fetch the key + key := coreLeaderPrefix + value + entry, err := c.barrier.Get(key) + if err != nil { + return false, "", err + } + if entry == nil { + return false, "", nil + } + + // Leader address is in the entry + return false, string(entry.Value), nil +} + // SealConfiguration is used to return information // about the configuration of the Vault and it's current // status. @@ -695,15 +796,21 @@ func (c *Core) Unseal(key []byte) (bool, error) { } c.logger.Printf("[INFO] core: vault is unsealed") - // Do post-unseal setup - c.logger.Printf("[INFO] core: post-unseal setup starting") - if err := c.postUnseal(); err != nil { - c.logger.Printf("[ERR] core: post-unseal setup failed: %v", err) - c.barrier.Seal() - c.logger.Printf("[WARN] core: vault is sealed") - return false, err + // Do post-unseal setup if HA is not enabled + if c.ha == nil { + c.standby = false + if err := c.postUnseal(); err != nil { + c.logger.Printf("[ERR] core: post-unseal setup failed: %v", err) + c.barrier.Seal() + c.logger.Printf("[WARN] core: vault is sealed") + return false, err + } + } else { + // Go to standby mode, wait until we are active to unseal + c.standbyDoneCh = make(chan struct{}) + c.standbyStopCh = make(chan struct{}) + go c.runStandby(c.standbyDoneCh, c.standbyStopCh) } - c.logger.Printf("[INFO] core: post-unseal setup complete") // Success! c.sealed = false @@ -726,15 +833,24 @@ func (c *Core) Seal(token string) error { return err } + // Enable that we are sealed to prevent furthur transactions c.sealed = true - // Do pre-seal teardown - c.logger.Printf("[INFO] core: pre-seal teardown starting") - if err := c.preSeal(); err != nil { - c.logger.Printf("[ERR] core: pre-seal teardown failed: %v", err) - return fmt.Errorf("internal error") + // Do pre-seal teardown if HA is not enabled + if c.ha == nil { + if err := c.preSeal(); err != nil { + c.logger.Printf("[ERR] core: pre-seal teardown failed: %v", err) + return fmt.Errorf("internal error") + } + } else { + // Signal the standby goroutine to shutdown, wait for completion + close(c.standbyStopCh) + + // Release the lock while we wait to avoid deadlocking + c.stateLock.Unlock() + <-c.standbyDoneCh + c.stateLock.Lock() } - c.logger.Printf("[INFO] core: pre-seal teardown complete") if err := c.barrier.Seal(); err != nil { return err @@ -749,6 +865,7 @@ func (c *Core) Seal(token string) error { // credential stores, etc. func (c *Core) postUnseal() error { defer metrics.MeasureSince([]string{"core", "post_unseal"}, time.Now()) + c.logger.Printf("[INFO] core: post-unseal setup starting") if cache, ok := c.physical.(*physical.Cache); ok { cache.Purge() } @@ -781,6 +898,7 @@ func (c *Core) postUnseal() error { } c.metricsCh = make(chan struct{}) go c.emitMetrics(c.metricsCh) + c.logger.Printf("[INFO] core: post-unseal setup complete") return nil } @@ -788,6 +906,7 @@ func (c *Core) postUnseal() error { // for any state teardown required. func (c *Core) preSeal() error { defer metrics.MeasureSince([]string{"core", "pre_seal"}, time.Now()) + c.logger.Printf("[INFO] core: pre-seal teardown starting") if c.metricsCh != nil { close(c.metricsCh) c.metricsCh = nil @@ -813,9 +932,127 @@ func (c *Core) preSeal() error { if cache, ok := c.physical.(*physical.Cache); ok { cache.Purge() } + c.logger.Printf("[INFO] core: pre-seal teardown complete") return nil } +// runStandby is a long running routine that is used when an HA backend +// is enabled. It waits until we are leader and switches this Vault to +// active. +func (c *Core) runStandby(doneCh, stopCh chan struct{}) { + defer close(doneCh) + c.logger.Printf("[INFO] core: entering standby mode") + for { + // Check for a shutdown + select { + case <-stopCh: + return + default: + } + + // Create a lock + uuid := generateUUID() + lock, err := c.ha.LockWith(coreLockPath, uuid) + if err != nil { + c.logger.Printf("[ERR] core: failed to create lock: %v", err) + return + } + + // Attempt the acquisition + leaderCh := c.acquireLock(lock, stopCh) + + // Bail if we are being shutdown + if leaderCh == nil { + return + } + c.logger.Printf("[INFO] core: acquired lock, enabling active operation") + + // Advertise ourself as leader + if err := c.advertiseLeader(uuid); err != nil { + c.logger.Printf("[ERR] core: leader advertisement setup failed: %v", err) + lock.Unlock() + continue + } + + // Attempt the post-unseal process + c.stateLock.Lock() + err = c.postUnseal() + if err == nil { + c.standby = false + } + c.stateLock.Unlock() + + // Handle a failure to unseal + if err != nil { + c.logger.Printf("[ERR] core: post-unseal setup failed: %v", err) + lock.Unlock() + continue + } + + // Monitor a loss of leadership + select { + case <-leaderCh: + c.logger.Printf("[WARN] core: leadership lost, stopping active operation") + case <-stopCh: + c.logger.Printf("[WARN] core: stopping active operation") + } + + // Clear ourself as leader + if err := c.clearLeader(uuid); err != nil { + c.logger.Printf("[ERR] core: clearing leader advertisement failed: %v", err) + } + + // Attempt the pre-seal process + c.stateLock.Lock() + c.standby = true + err = c.preSeal() + c.stateLock.Unlock() + + // Give up leadership + lock.Unlock() + + // Check for a failure to prepare to seal + if err := c.preSeal(); err != nil { + c.logger.Printf("[ERR] core: pre-seal teardown failed: %v", err) + continue + } + } +} + +// acquireLock blocks until the lock is acquired, returning the leaderCh +func (c *Core) acquireLock(lock physical.Lock, stopCh <-chan struct{}) <-chan struct{} { + for { + // Attempt lock acquisition + leaderCh, err := lock.Lock(stopCh) + if err == nil { + return leaderCh + } + + // Retry the acquisition + c.logger.Printf("[ERR] core: failed to acquire lock: %v", err) + select { + case <-time.After(lockRetryInterval): + case <-stopCh: + return nil + } + } +} + +// advertiseLeader is used to advertise the current node as leader +func (c *Core) advertiseLeader(uuid string) error { + ent := &Entry{ + Key: coreLeaderPrefix + uuid, + Value: []byte(c.advertiseAddr), + } + return c.barrier.Put(ent) +} + +// clearLeader is used to clear our leadership entry +func (c *Core) clearLeader(uuid string) error { + key := coreLeaderPrefix + uuid + return c.barrier.Delete(key) +} + // emitMetrics is used to periodically expose metrics while runnig func (c *Core) emitMetrics(stopCh chan struct{}) { for { diff --git a/vault/core_test.go b/vault/core_test.go index 453fa36aac..8eeed93922 100644 --- a/vault/core_test.go +++ b/vault/core_test.go @@ -983,3 +983,170 @@ func TestCore_HandleRequest_CreateToken_Lease(t *testing.T) { t.Fatalf("bad: %#v", resp.Auth) } } + +func TestCore_Standby(t *testing.T) { + // Create the first core and initialize it + inm := physical.NewInmemHA() + core, err := NewCore(&CoreConfig{Physical: inm, AdvertiseAddr: "foo"}) + if err != nil { + t.Fatalf("err: %v", err) + } + key, root := TestCoreInit(t, core) + if _, err := core.Unseal(TestKeyCopy(key)); err != nil { + t.Fatalf("unseal err: %s", err) + } + + // Verify unsealed + sealed, err := core.Sealed() + if err != nil { + t.Fatalf("err checking seal status: %s", err) + } + if sealed { + t.Fatal("should not be sealed") + } + + // Wait for core to become active + start := time.Now() + var standby bool + for time.Now().Sub(start) < time.Second { + standby, err = core.Standby() + if err != nil { + t.Fatalf("err: %v", err) + } + if !standby { + break + } + } + if standby { + t.Fatalf("should not be in standby mode") + } + + // Put a secret + req := &logical.Request{ + Operation: logical.WriteOperation, + Path: "secret/foo", + Data: map[string]interface{}{ + "foo": "bar", + }, + ClientToken: root, + } + _, err = core.HandleRequest(req) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Check the leader is local + isLeader, advertise, err := core.Leader() + if err != nil { + t.Fatalf("err: %v", err) + } + if !isLeader { + t.Fatalf("should be leader") + } + if advertise != "foo" { + t.Fatalf("Bad advertise: %v", advertise) + } + + // Create a second core, attached to same in-memory store + core2, err := NewCore(&CoreConfig{Physical: inm, AdvertiseAddr: "bar"}) + if err != nil { + t.Fatalf("err: %v", err) + } + if _, err := core2.Unseal(TestKeyCopy(key)); err != nil { + t.Fatalf("unseal err: %s", err) + } + + // Verify unsealed + sealed, err = core2.Sealed() + if err != nil { + t.Fatalf("err checking seal status: %s", err) + } + if sealed { + t.Fatal("should not be sealed") + } + + // Core2 should be in standby + standby, err = core2.Standby() + if err != nil { + t.Fatalf("err: %v", err) + } + if !standby { + t.Fatalf("should be standby") + } + + // Request should fail in standby mode + _, err = core2.HandleRequest(req) + if err != ErrStandby { + t.Fatalf("err: %v", err) + } + + // Check the leader is not local + isLeader, advertise, err = core2.Leader() + if err != nil { + t.Fatalf("err: %v", err) + } + if isLeader { + t.Fatalf("should not be leader") + } + if advertise != "foo" { + t.Fatalf("Bad advertise: %v", advertise) + } + + // Seal the first core, should step down + err = core.Seal(root) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Core should be in standby + standby, err = core.Standby() + if err != nil { + t.Fatalf("err: %v", err) + } + if !standby { + t.Fatalf("should be standby") + } + + // Wait for core2 to become active + start = time.Now() + for time.Now().Sub(start) < time.Second { + standby, err = core2.Standby() + if err != nil { + t.Fatalf("err: %v", err) + } + if !standby { + break + } + } + if standby { + t.Fatalf("should not be in standby mode") + } + + // Read the secret + req = &logical.Request{ + Operation: logical.ReadOperation, + Path: "secret/foo", + ClientToken: root, + } + resp, err := core2.HandleRequest(req) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Verify the response + if resp.Data["foo"] != "bar" { + t.Fatalf("bad: %#v", resp) + } + + // Check the leader is local + isLeader, advertise, err = core2.Leader() + if err != nil { + t.Fatalf("err: %v", err) + } + if !isLeader { + t.Fatalf("should be leader") + } + if advertise != "bar" { + t.Fatalf("Bad advertise: %v", advertise) + } +} diff --git a/vault/expiration.go b/vault/expiration.go index f4c1d00345..601722cf80 100644 --- a/vault/expiration.go +++ b/vault/expiration.go @@ -96,10 +96,12 @@ func (c *Core) setupExpiration() error { // stopExpiration is used to stop the expiration manager before // sealing the Vault. func (c *Core) stopExpiration() error { - if err := c.expiration.Stop(); err != nil { - return err + if c.expiration != nil { + if err := c.expiration.Stop(); err != nil { + return err + } + c.expiration = nil } - c.expiration = nil return nil }