From 9597035488d6c2cdde304b88747b556125b8bf1d Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 14 Apr 2015 11:49:46 -0700 Subject: [PATCH 01/11] physical: First pass at HABackend --- physical/consul.go | 14 +++++++++++ physical/consul_test.go | 6 +++++ physical/physical.go | 20 +++++++++++++++ physical/physical_test.go | 51 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 91 insertions(+) diff --git a/physical/consul.go b/physical/consul.go index c1a399801c..4c8c1886b5 100644 --- a/physical/consul.go +++ b/physical/consul.go @@ -111,3 +111,17 @@ func (c *ConsulBackend) List(prefix string) ([]string, error) { sort.Strings(out) return out, err } + +// Lock is used for mutual exclusion based on the given key. +func (c *ConsulBackend) LockWith(key string) (Lock, error) { + // Create the lock + opts := &api.LockOptions{ + Key: key, + SessionName: "Vault Lock", + } + lock, err := c.client.LockOpts(opts) + if err != nil { + return nil, fmt.Errorf("failed to create lock: %v", err) + } + return lock, nil +} diff --git a/physical/consul_test.go b/physical/consul_test.go index 6d2064e179..38a0da6ce4 100644 --- a/physical/consul_test.go +++ b/physical/consul_test.go @@ -37,4 +37,10 @@ func TestConsulBackend(t *testing.T) { testBackend(t, b) testBackend_ListPrefix(t, b) + + ha, ok := b.(HABackend) + if !ok { + t.Fatalf("consul does not implement HABackend") + } + testHABackend(t, ha, ha) } diff --git a/physical/physical.go b/physical/physical.go index 63ffe19f4b..47c4057c2d 100644 --- a/physical/physical.go +++ b/physical/physical.go @@ -23,6 +23,26 @@ type Backend interface { List(prefix string) ([]string, error) } +// HABackend is an extentions to the standard physical +// backend to support high-availability. Vault only expects to +// use mutual exclusion to allow multiple instances to act as a +// hot standby for a leader that services all requests. +type HABackend interface { + // LockWith is used for mutual exclusion based on the given key. + LockWith(key string) (Lock, error) +} + +type Lock interface { + // Lock is used to acquire the given lock + // The stopCh is optional and if closed should interrupt the lock + // acquisition attempt. The return struct should be closed when + // leadership is lost. + Lock(stopCh <-chan struct{}) (<-chan struct{}, error) + + // Unlock is used to release the lock + Unlock() error +} + // Entry is used to represent data stored by the physical backend type Entry struct { Key string diff --git a/physical/physical_test.go b/physical/physical_test.go index 2d0dbec479..93e524dda2 100644 --- a/physical/physical_test.go +++ b/physical/physical_test.go @@ -3,6 +3,7 @@ package physical import ( "reflect" "testing" + "time" ) func testNewBackend(t *testing.T) { @@ -159,3 +160,53 @@ func testBackend_ListPrefix(t *testing.T, b Backend) { t.Fatalf("bad: %v", keys) } } + +func testHABackend(t *testing.T, b HABackend, b2 HABackend) { + // Get the lock + lock, err := b.LockWith("foo") + if err != nil { + t.Fatalf("err: %v", err) + } + + // Attempt to lock + leaderCh, err := lock.Lock(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if leaderCh == nil { + t.Fatalf("failed to get leader ch") + } + + // Second acquisition should fail + lock2, err := b2.LockWith("foo") + if err != nil { + t.Fatalf("err: %v", err) + } + + // Cancel attempt in 50 msec + stopCh := make(chan struct{}) + time.AfterFunc(50*time.Millisecond, func() { + close(stopCh) + }) + + // Attempt to lock + leaderCh2, err := lock2.Lock(stopCh) + if err != nil { + t.Fatalf("err: %v", err) + } + if leaderCh2 != nil { + t.Fatalf("should not get leader ch") + } + + // Release the first lock + lock.Unlock() + + // Attempt to lock should work + leaderCh2, err = lock2.Lock(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if leaderCh2 == nil { + t.Fatalf("should get leader ch") + } +} From 18e9e587dd657f846d5734fff706a0c4d15deb3c Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 14 Apr 2015 12:04:15 -0700 Subject: [PATCH 02/11] physical: Adding inmem HA for testing --- physical/inmem_ha.go | 97 +++++++++++++++++++++++++++++++++++++++ physical/inmem_ha_test.go | 8 ++++ 2 files changed, 105 insertions(+) create mode 100644 physical/inmem_ha.go create mode 100644 physical/inmem_ha_test.go diff --git a/physical/inmem_ha.go b/physical/inmem_ha.go new file mode 100644 index 0000000000..895c7e4c0b --- /dev/null +++ b/physical/inmem_ha.go @@ -0,0 +1,97 @@ +package physical + +import ( + "fmt" + "sync" +) + +type InmemHABackend struct { + InmemBackend + locks map[string]*sync.Mutex + l sync.Mutex +} + +// NewInmemHA constructs a new in-memory HA backend. This is only for testing. +func NewInmemHA() *InmemHABackend { + in := &InmemHABackend{ + InmemBackend: *NewInmem(), + locks: make(map[string]*sync.Mutex), + } + return in +} + +// LockWith is used for mutual exclusion based on the given key. +func (i *InmemHABackend) LockWith(key string) (Lock, error) { + i.l.Lock() + defer i.l.Unlock() + + mutex, ok := i.locks[key] + if !ok { + mutex = new(sync.Mutex) + i.locks[key] = mutex + } + + return &InmemLock{mutex: mutex}, nil +} + +// InmemLock is an in-memory Lock implementation for the HABackend +type InmemLock struct { + // mutex is the underlying mutex, may be shared between + // instances of InmemLock + mutex *sync.Mutex + + held bool + leaderCh chan struct{} + l sync.Mutex +} + +func (i *InmemLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) { + i.l.Lock() + defer i.l.Unlock() + if i.held { + return nil, fmt.Errorf("lock already held") + } + + // Attempt an async acquisition + didLock := make(chan struct{}) + releaseCh := make(chan bool, 1) + go func() { + i.mutex.Lock() + close(didLock) + + // Handle an early abort + release := <-releaseCh + if release { + i.mutex.Unlock() + } + }() + + // Wait for lock acquisition or shutdown + select { + case <-didLock: + releaseCh <- false + case <-stopCh: + releaseCh <- true + return nil, nil + } + + // Create the leader channel + i.held = true + i.leaderCh = make(chan struct{}) + return i.leaderCh, nil +} + +func (i *InmemLock) Unlock() error { + i.l.Lock() + defer i.l.Unlock() + + if !i.held { + return nil + } + + close(i.leaderCh) + i.leaderCh = nil + i.held = false + i.mutex.Unlock() + return nil +} diff --git a/physical/inmem_ha_test.go b/physical/inmem_ha_test.go new file mode 100644 index 0000000000..a13f8c7977 --- /dev/null +++ b/physical/inmem_ha_test.go @@ -0,0 +1,8 @@ +package physical + +import "testing" + +func TestInmemHA(t *testing.T) { + inm := NewInmemHA() + testHABackend(t, inm, inm) +} From 8f4855d02d6b53b60602d3fddf011bd30d78ffd1 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 14 Apr 2015 13:32:56 -0700 Subject: [PATCH 03/11] vault: stopExpiration should be idempotent --- vault/expiration.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/vault/expiration.go b/vault/expiration.go index f4c1d00345..601722cf80 100644 --- a/vault/expiration.go +++ b/vault/expiration.go @@ -96,10 +96,12 @@ func (c *Core) setupExpiration() error { // stopExpiration is used to stop the expiration manager before // sealing the Vault. func (c *Core) stopExpiration() error { - if err := c.expiration.Stop(); err != nil { - return err + if c.expiration != nil { + if err := c.expiration.Stop(); err != nil { + return err + } + c.expiration = nil } - c.expiration = nil return nil } From 7aa7efebfb9397e23a0c4f9786fffee743009255 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 14 Apr 2015 14:06:15 -0700 Subject: [PATCH 04/11] vault: first pass at HA standby mode --- vault/core.go | 159 +++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 145 insertions(+), 14 deletions(-) diff --git a/vault/core.go b/vault/core.go index a0ab724e47..0a478bdf7d 100644 --- a/vault/core.go +++ b/vault/core.go @@ -24,6 +24,14 @@ const ( // it even with the Vault sealed. This is required so that we know // how many secret parts must be used to reconstruct the master key. coreSealConfigPath = "core/seal-config" + + // coreLockPath is the path used to acquire a coordinating lock + // for a highly-available deploy. + coreLockPath = "core/lock" + + // lockRetryInterval is the interval we re-attempt to acquire the + // HA lock if an error is encountered + lockRetryInterval = 10 * time.Second ) var ( @@ -96,6 +104,9 @@ func (e *ErrInvalidKey) Error() string { // interface for API handlers and is responsible for managing the logical and physical // backends, router, security barrier, and audit trails. type Core struct { + // HABackend may be available depending on the physical backend + ha physical.HABackend + // physical backend is the un-trusted backend with durable data physical physical.Backend @@ -118,6 +129,10 @@ type Core struct { stateLock sync.RWMutex sealed bool + active bool + standbyDoneCh chan struct{} + standbyStopCh chan struct{} + // unlockParts has the keys provided to Unseal until // the threshold number of parts is available. unlockParts [][]byte @@ -173,6 +188,12 @@ type CoreConfig struct { // NewCore isk used to construct a new core func NewCore(conf *CoreConfig) (*Core, error) { + // Check if this backend supports an HA configuraiton + var haBackend physical.HABackend + if ha, ok := conf.Physical.(physical.HABackend); ok { + haBackend = ha + } + // Wrap the backend in a cache unless disabled if !conf.DisableCache { _, isCache := conf.Physical.(*physical.Cache) @@ -196,10 +217,12 @@ func NewCore(conf *CoreConfig) (*Core, error) { // Setup the core c := &Core{ + ha: haBackend, physical: conf.Physical, barrier: barrier, router: NewRouter(), sealed: true, + active: false, logger: conf.Logger, } @@ -695,15 +718,22 @@ func (c *Core) Unseal(key []byte) (bool, error) { } c.logger.Printf("[INFO] core: vault is unsealed") - // Do post-unseal setup - c.logger.Printf("[INFO] core: post-unseal setup starting") - if err := c.postUnseal(); err != nil { - c.logger.Printf("[ERR] core: post-unseal setup failed: %v", err) - c.barrier.Seal() - c.logger.Printf("[WARN] core: vault is sealed") - return false, err + // Do post-unseal setup if HA is not enabled + if c.ha == nil { + c.active = true + if err := c.postUnseal(); err != nil { + c.logger.Printf("[ERR] core: post-unseal setup failed: %v", err) + c.barrier.Seal() + c.logger.Printf("[WARN] core: vault is sealed") + return false, err + } + } else { + // Go to standby mode, wait until we are active to unseal + c.logger.Printf("[INFO] core: HA backend configured, enabling standby") + c.standbyDoneCh = make(chan struct{}) + c.standbyStopCh = make(chan struct{}) + go c.standby(c.standbyDoneCh, c.standbyStopCh) } - c.logger.Printf("[INFO] core: post-unseal setup complete") // Success! c.sealed = false @@ -726,15 +756,24 @@ func (c *Core) Seal(token string) error { return err } + // Enable that we are sealed to prevent furthur transactions c.sealed = true - // Do pre-seal teardown - c.logger.Printf("[INFO] core: pre-seal teardown starting") - if err := c.preSeal(); err != nil { - c.logger.Printf("[ERR] core: pre-seal teardown failed: %v", err) - return fmt.Errorf("internal error") + // Do pre-seal teardown if HA is not enabled + if c.ha == nil { + if err := c.preSeal(); err != nil { + c.logger.Printf("[ERR] core: pre-seal teardown failed: %v", err) + return fmt.Errorf("internal error") + } + } else { + // Signal the standby goroutine to shutdown, wait for completion + close(c.standbyStopCh) + + // Release the lock while we wait to avoid deadlocking + c.stateLock.Unlock() + <-c.standbyDoneCh + c.stateLock.Lock() } - c.logger.Printf("[INFO] core: pre-seal teardown complete") if err := c.barrier.Seal(); err != nil { return err @@ -749,6 +788,7 @@ func (c *Core) Seal(token string) error { // credential stores, etc. func (c *Core) postUnseal() error { defer metrics.MeasureSince([]string{"core", "post_unseal"}, time.Now()) + c.logger.Printf("[INFO] core: post-unseal setup starting") if cache, ok := c.physical.(*physical.Cache); ok { cache.Purge() } @@ -781,6 +821,7 @@ func (c *Core) postUnseal() error { } c.metricsCh = make(chan struct{}) go c.emitMetrics(c.metricsCh) + c.logger.Printf("[INFO] core: post-unseal setup complete") return nil } @@ -788,6 +829,7 @@ func (c *Core) postUnseal() error { // for any state teardown required. func (c *Core) preSeal() error { defer metrics.MeasureSince([]string{"core", "pre_seal"}, time.Now()) + c.logger.Printf("[INFO] core: pre-seal teardown starting") if c.metricsCh != nil { close(c.metricsCh) c.metricsCh = nil @@ -813,9 +855,98 @@ func (c *Core) preSeal() error { if cache, ok := c.physical.(*physical.Cache); ok { cache.Purge() } + c.logger.Printf("[INFO] core: pre-seal teardown complete") return nil } +// Standby is a long running routine that is used when an HA backend +// is enabled. It waits until we are leader and switches this Vault to +// active. +func (c *Core) standby(doneCh, stopCh chan struct{}) { + defer close(doneCh) + for { + // Check for a shutdown + select { + case <-stopCh: + return + default: + } + + // Create a lock + lock, err := c.ha.LockWith(coreLockPath) + if err != nil { + c.logger.Printf("[ERR] core: failed to create lock: %v", err) + return + } + + // Attempt the acquisition + leaderCh := c.acquireLock(lock, stopCh) + + // Bail if we are being shutdown + if leaderCh == nil { + return + } + c.logger.Printf("[INFO] core: acquired lock, enabling active operation") + + // Attempt the post-unseal process + c.stateLock.Lock() + err = c.postUnseal() + if err == nil { + c.active = true + } + c.stateLock.Unlock() + + // Handle a failure to unseal + if err != nil { + c.logger.Printf("[ERR] core: post-unseal setup failed: %v", err) + lock.Unlock() + continue + } + + // Monitor a loss of leadership + select { + case <-leaderCh: + c.logger.Printf("[WARN] core: leadership lost, stopping active operation") + case <-stopCh: + c.logger.Printf("[WARN] core: stopping active operation") + } + + // Attempt the pre-seal process + c.stateLock.Lock() + c.active = false + err = c.preSeal() + c.stateLock.Unlock() + + // Give up leadership + lock.Unlock() + + // Check for a failure to prepare to seal + if err := c.preSeal(); err != nil { + c.logger.Printf("[ERR] core: pre-seal teardown failed: %v", err) + continue + } + } +} + +// acquireLock blocks until the lock is acquired, returning the leaderCh +func (c *Core) acquireLock(lock physical.Lock, stopCh <-chan struct{}) <-chan struct{} { + for { + // Attempt lock acquisition + leaderCh, err := lock.Lock(stopCh) + if err == nil { + return leaderCh + } + + // Retry the acquisition + c.logger.Printf("[ERR] core: failed to acquire lock: %v", err) + select { + case <-time.After(lockRetryInterval): + case <-stopCh: + return nil + } + } +} + // emitMetrics is used to periodically expose metrics while runnig func (c *Core) emitMetrics(stopCh chan struct{}) { for { From fd045f86c3e25adb89a1e80e242c4c7ce43dcac2 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 14 Apr 2015 14:09:11 -0700 Subject: [PATCH 05/11] vault: reject operation if standby --- vault/core.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/vault/core.go b/vault/core.go index 0a478bdf7d..5f0eb1026b 100644 --- a/vault/core.go +++ b/vault/core.go @@ -39,6 +39,10 @@ var ( // a sealed barrier. No operation is expected to succeed before unsealing ErrSealed = errors.New("Vault is sealed") + // ErrStandby is returned if an operation is performed on + // a standby Vault. No operation is expected to succeed until active. + ErrStandby = errors.New("Vault is in standby mode") + // ErrAlreadyInit is returned if the core is already // initialized. This prevents a re-initialization. ErrAlreadyInit = errors.New("Vault is already initialized") @@ -261,6 +265,9 @@ func (c *Core) HandleRequest(req *logical.Request) (*logical.Response, error) { if c.sealed { return nil, ErrSealed } + if !c.active { + return nil, ErrStandby + } if c.router.LoginPath(req.Path) { return c.handleLoginRequest(req) @@ -605,6 +612,13 @@ func (c *Core) Sealed() (bool, error) { return c.sealed, nil } +// Standby checks if the Vault is in standby mode +func (c *Core) Standby() (bool, error) { + c.stateLock.RLock() + defer c.stateLock.RUnlock() + return !c.active, nil +} + // SealConfiguration is used to return information // about the configuration of the Vault and it's current // status. From b395bce1d97069e5da9ea1f4b1aff0b40226998c Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 14 Apr 2015 16:06:58 -0700 Subject: [PATCH 06/11] vault: testing standby mode --- vault/core.go | 2 +- vault/core_test.go | 125 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 126 insertions(+), 1 deletion(-) diff --git a/vault/core.go b/vault/core.go index 5f0eb1026b..871bc76b14 100644 --- a/vault/core.go +++ b/vault/core.go @@ -743,7 +743,6 @@ func (c *Core) Unseal(key []byte) (bool, error) { } } else { // Go to standby mode, wait until we are active to unseal - c.logger.Printf("[INFO] core: HA backend configured, enabling standby") c.standbyDoneCh = make(chan struct{}) c.standbyStopCh = make(chan struct{}) go c.standby(c.standbyDoneCh, c.standbyStopCh) @@ -878,6 +877,7 @@ func (c *Core) preSeal() error { // active. func (c *Core) standby(doneCh, stopCh chan struct{}) { defer close(doneCh) + c.logger.Printf("[INFO] core: entering standby mode") for { // Check for a shutdown select { diff --git a/vault/core_test.go b/vault/core_test.go index 453fa36aac..ed2f91007f 100644 --- a/vault/core_test.go +++ b/vault/core_test.go @@ -983,3 +983,128 @@ func TestCore_HandleRequest_CreateToken_Lease(t *testing.T) { t.Fatalf("bad: %#v", resp.Auth) } } + +func TestCore_Standby(t *testing.T) { + // Create the first core and initialize it + inm := physical.NewInmemHA() + core, err := NewCore(&CoreConfig{Physical: inm}) + if err != nil { + t.Fatalf("err: %v", err) + } + key, root := TestCoreInit(t, core) + if _, err := core.Unseal(TestKeyCopy(key)); err != nil { + t.Fatalf("unseal err: %s", err) + } + + // Verify unsealed + sealed, err := core.Sealed() + if err != nil { + t.Fatalf("err checking seal status: %s", err) + } + if sealed { + t.Fatal("should not be sealed") + } + + // Wait for core to become active + start := time.Now() + var standby bool + for time.Now().Sub(start) < time.Second { + standby, err = core.Standby() + if err != nil { + t.Fatalf("err: %v", err) + } + if !standby { + break + } + } + if standby { + t.Fatalf("should not be in standby mode") + } + + // Put a secret + req := &logical.Request{ + Operation: logical.WriteOperation, + Path: "secret/foo", + Data: map[string]interface{}{ + "foo": "bar", + }, + ClientToken: root, + } + _, err = core.HandleRequest(req) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Create a second core, attached to same in-memory store + core2, err := NewCore(&CoreConfig{Physical: inm}) + if err != nil { + t.Fatalf("err: %v", err) + } + if _, err := core2.Unseal(TestKeyCopy(key)); err != nil { + t.Fatalf("unseal err: %s", err) + } + + // Verify unsealed + sealed, err = core2.Sealed() + if err != nil { + t.Fatalf("err checking seal status: %s", err) + } + if sealed { + t.Fatal("should not be sealed") + } + + // Core2 should be in standby + standby, err = core2.Standby() + if err != nil { + t.Fatalf("err: %v", err) + } + if !standby { + t.Fatalf("should be standby") + } + + // Seal the first core, should step down + err = core.Seal(root) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Core should be in standby + standby, err = core.Standby() + if err != nil { + t.Fatalf("err: %v", err) + } + if !standby { + t.Fatalf("should be standby") + } + + // Wait for core2 to become active + start = time.Now() + for time.Now().Sub(start) < time.Second { + standby, err = core2.Standby() + if err != nil { + t.Fatalf("err: %v", err) + } + if !standby { + break + } + } + if standby { + t.Fatalf("should not be in standby mode") + } + + // Read the secret + req = &logical.Request{ + Operation: logical.ReadOperation, + Path: "secret/foo", + ClientToken: root, + } + resp, err := core2.HandleRequest(req) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Verify the response + if resp.Data["foo"] != "bar" { + t.Fatalf("bad: %#v", resp) + } +} From ceab0a5031703ffef5dfeb246dcf14058ff81344 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 14 Apr 2015 16:08:14 -0700 Subject: [PATCH 07/11] vault: testing standby mode --- vault/core_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/vault/core_test.go b/vault/core_test.go index ed2f91007f..3fd41e7cf2 100644 --- a/vault/core_test.go +++ b/vault/core_test.go @@ -1062,6 +1062,12 @@ func TestCore_Standby(t *testing.T) { t.Fatalf("should be standby") } + // Request should fail in standby mode + _, err = core2.HandleRequest(req) + if err != ErrStandby { + t.Fatalf("err: %v", err) + } + // Seal the first core, should step down err = core.Seal(root) if err != nil { From 7cfaf64315d2795b4ee12435a41bcefe8159b441 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 14 Apr 2015 16:11:39 -0700 Subject: [PATCH 08/11] vault: rename internal variable --- vault/core.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/vault/core.go b/vault/core.go index 871bc76b14..1d21577f34 100644 --- a/vault/core.go +++ b/vault/core.go @@ -133,7 +133,7 @@ type Core struct { stateLock sync.RWMutex sealed bool - active bool + standby bool standbyDoneCh chan struct{} standbyStopCh chan struct{} @@ -226,7 +226,7 @@ func NewCore(conf *CoreConfig) (*Core, error) { barrier: barrier, router: NewRouter(), sealed: true, - active: false, + standby: true, logger: conf.Logger, } @@ -265,7 +265,7 @@ func (c *Core) HandleRequest(req *logical.Request) (*logical.Response, error) { if c.sealed { return nil, ErrSealed } - if !c.active { + if c.standby { return nil, ErrStandby } @@ -616,7 +616,7 @@ func (c *Core) Sealed() (bool, error) { func (c *Core) Standby() (bool, error) { c.stateLock.RLock() defer c.stateLock.RUnlock() - return !c.active, nil + return c.standby, nil } // SealConfiguration is used to return information @@ -734,7 +734,7 @@ func (c *Core) Unseal(key []byte) (bool, error) { // Do post-unseal setup if HA is not enabled if c.ha == nil { - c.active = true + c.standby = false if err := c.postUnseal(); err != nil { c.logger.Printf("[ERR] core: post-unseal setup failed: %v", err) c.barrier.Seal() @@ -745,7 +745,7 @@ func (c *Core) Unseal(key []byte) (bool, error) { // Go to standby mode, wait until we are active to unseal c.standbyDoneCh = make(chan struct{}) c.standbyStopCh = make(chan struct{}) - go c.standby(c.standbyDoneCh, c.standbyStopCh) + go c.runStandby(c.standbyDoneCh, c.standbyStopCh) } // Success! @@ -872,10 +872,10 @@ func (c *Core) preSeal() error { return nil } -// Standby is a long running routine that is used when an HA backend +// runStandby is a long running routine that is used when an HA backend // is enabled. It waits until we are leader and switches this Vault to // active. -func (c *Core) standby(doneCh, stopCh chan struct{}) { +func (c *Core) runStandby(doneCh, stopCh chan struct{}) { defer close(doneCh) c.logger.Printf("[INFO] core: entering standby mode") for { @@ -906,7 +906,7 @@ func (c *Core) standby(doneCh, stopCh chan struct{}) { c.stateLock.Lock() err = c.postUnseal() if err == nil { - c.active = true + c.standby = false } c.stateLock.Unlock() @@ -927,7 +927,7 @@ func (c *Core) standby(doneCh, stopCh chan struct{}) { // Attempt the pre-seal process c.stateLock.Lock() - c.active = false + c.standby = true err = c.preSeal() c.stateLock.Unlock() From db309326c8a6f5cb114e042f01be4f79720f91f3 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 14 Apr 2015 16:36:53 -0700 Subject: [PATCH 09/11] physical: Support association of value with lock --- physical/consul.go | 39 ++++++++++++++++++++++++-- physical/inmem_ha.go | 58 +++++++++++++++++++++++++++------------ physical/physical.go | 5 +++- physical/physical_test.go | 28 +++++++++++++++++-- 4 files changed, 107 insertions(+), 23 deletions(-) diff --git a/physical/consul.go b/physical/consul.go index 4c8c1886b5..1dc8901edd 100644 --- a/physical/consul.go +++ b/physical/consul.go @@ -113,15 +113,50 @@ func (c *ConsulBackend) List(prefix string) ([]string, error) { } // Lock is used for mutual exclusion based on the given key. -func (c *ConsulBackend) LockWith(key string) (Lock, error) { +func (c *ConsulBackend) LockWith(key, value string) (Lock, error) { // Create the lock opts := &api.LockOptions{ Key: key, + Value: []byte(value), SessionName: "Vault Lock", } lock, err := c.client.LockOpts(opts) if err != nil { return nil, fmt.Errorf("failed to create lock: %v", err) } - return lock, nil + cl := &ConsulLock{ + client: c.client, + key: key, + lock: lock, + } + return cl, nil +} + +// ConsulLock is used to provide the Lock interface backed by Consul +type ConsulLock struct { + client *api.Client + key string + lock *api.Lock +} + +func (c *ConsulLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) { + return c.lock.Lock(stopCh) +} + +func (c *ConsulLock) Unlock() error { + return c.lock.Unlock() +} + +func (c *ConsulLock) Value() (bool, string, error) { + kv := c.client.KV() + pair, _, err := kv.Get(c.key, nil) + if err != nil { + return false, "", err + } + if pair == nil { + return false, "", nil + } + held := pair.Session != "" + value := string(pair.Value) + return held, value, nil } diff --git a/physical/inmem_ha.go b/physical/inmem_ha.go index 895c7e4c0b..92ee314f77 100644 --- a/physical/inmem_ha.go +++ b/physical/inmem_ha.go @@ -7,38 +7,36 @@ import ( type InmemHABackend struct { InmemBackend - locks map[string]*sync.Mutex + locks map[string]string l sync.Mutex + cond *sync.Cond } // NewInmemHA constructs a new in-memory HA backend. This is only for testing. func NewInmemHA() *InmemHABackend { in := &InmemHABackend{ InmemBackend: *NewInmem(), - locks: make(map[string]*sync.Mutex), + locks: make(map[string]string), } + in.cond = sync.NewCond(&in.l) return in } // LockWith is used for mutual exclusion based on the given key. -func (i *InmemHABackend) LockWith(key string) (Lock, error) { - i.l.Lock() - defer i.l.Unlock() - - mutex, ok := i.locks[key] - if !ok { - mutex = new(sync.Mutex) - i.locks[key] = mutex +func (i *InmemHABackend) LockWith(key, value string) (Lock, error) { + l := &InmemLock{ + in: i, + key: key, + value: value, } - - return &InmemLock{mutex: mutex}, nil + return l, nil } // InmemLock is an in-memory Lock implementation for the HABackend type InmemLock struct { - // mutex is the underlying mutex, may be shared between - // instances of InmemLock - mutex *sync.Mutex + in *InmemHABackend + key string + value string held bool leaderCh chan struct{} @@ -56,13 +54,26 @@ func (i *InmemLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) { didLock := make(chan struct{}) releaseCh := make(chan bool, 1) go func() { - i.mutex.Lock() + // Wait to acquire the lock + i.in.l.Lock() + _, ok := i.in.locks[i.key] + for ok { + i.in.cond.Wait() + _, ok = i.in.locks[i.key] + } + i.in.locks[i.key] = i.value + i.in.l.Unlock() + + // Signal that lock is held close(didLock) // Handle an early abort release := <-releaseCh if release { - i.mutex.Unlock() + i.in.l.Lock() + delete(i.in.locks, i.key) + i.in.l.Unlock() + i.in.cond.Broadcast() } }() @@ -92,6 +103,17 @@ func (i *InmemLock) Unlock() error { close(i.leaderCh) i.leaderCh = nil i.held = false - i.mutex.Unlock() + + i.in.l.Lock() + delete(i.in.locks, i.key) + i.in.l.Unlock() + i.in.cond.Broadcast() return nil } + +func (i *InmemLock) Value() (bool, string, error) { + i.in.l.Lock() + val, ok := i.in.locks[i.key] + i.in.l.Unlock() + return ok, val, nil +} diff --git a/physical/physical.go b/physical/physical.go index 47c4057c2d..51b607ac33 100644 --- a/physical/physical.go +++ b/physical/physical.go @@ -29,7 +29,7 @@ type Backend interface { // hot standby for a leader that services all requests. type HABackend interface { // LockWith is used for mutual exclusion based on the given key. - LockWith(key string) (Lock, error) + LockWith(key, value string) (Lock, error) } type Lock interface { @@ -41,6 +41,9 @@ type Lock interface { // Unlock is used to release the lock Unlock() error + + // Returns the value of the lock and if it is held + Value() (bool, string, error) } // Entry is used to represent data stored by the physical backend diff --git a/physical/physical_test.go b/physical/physical_test.go index 93e524dda2..f6fce1a11a 100644 --- a/physical/physical_test.go +++ b/physical/physical_test.go @@ -163,7 +163,7 @@ func testBackend_ListPrefix(t *testing.T, b Backend) { func testHABackend(t *testing.T, b HABackend, b2 HABackend) { // Get the lock - lock, err := b.LockWith("foo") + lock, err := b.LockWith("foo", "bar") if err != nil { t.Fatalf("err: %v", err) } @@ -177,8 +177,20 @@ func testHABackend(t *testing.T, b HABackend, b2 HABackend) { t.Fatalf("failed to get leader ch") } + // Check the value + held, val, err := lock.Value() + if err != nil { + t.Fatalf("err: %v", err) + } + if !held { + t.Fatalf("should be held") + } + if val != "bar" { + t.Fatalf("bad value: %v", err) + } + // Second acquisition should fail - lock2, err := b2.LockWith("foo") + lock2, err := b2.LockWith("foo", "baz") if err != nil { t.Fatalf("err: %v", err) } @@ -209,4 +221,16 @@ func testHABackend(t *testing.T, b HABackend, b2 HABackend) { if leaderCh2 == nil { t.Fatalf("should get leader ch") } + + // Check the value + held, val, err = lock.Value() + if err != nil { + t.Fatalf("err: %v", err) + } + if !held { + t.Fatalf("should be held") + } + if val != "baz" { + t.Fatalf("bad value: %v", err) + } } From 8454d0f942df4f2726b36750a4281bdf91798bb9 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 14 Apr 2015 16:44:48 -0700 Subject: [PATCH 10/11] vault: leader should advertise address --- vault/core.go | 60 ++++++++++++++++++++++++++++++++++++++-------- vault/core_test.go | 4 ++-- 2 files changed, 52 insertions(+), 12 deletions(-) diff --git a/vault/core.go b/vault/core.go index 1d21577f34..75f91b8fdd 100644 --- a/vault/core.go +++ b/vault/core.go @@ -29,6 +29,10 @@ const ( // for a highly-available deploy. coreLockPath = "core/lock" + // coreLeaderPrefix is the prefix used for the UUID that contains + // the currently elected leader. + coreLeaderPrefix = "core/leader/" + // lockRetryInterval is the interval we re-attempt to acquire the // HA lock if an error is encountered lockRetryInterval = 10 * time.Second @@ -111,6 +115,9 @@ type Core struct { // HABackend may be available depending on the physical backend ha physical.HABackend + // AdvertiseAddr is the address we advertise as leader if held + advertiseAddr string + // physical backend is the un-trusted backend with durable data physical physical.Backend @@ -186,8 +193,9 @@ type CoreConfig struct { AuditBackends map[string]audit.Factory Physical physical.Backend Logger *log.Logger - DisableCache bool // Disables the LRU cache on the physical backend - CacheSize int // Custom cache size of zero for default + DisableCache bool // Disables the LRU cache on the physical backend + CacheSize int // Custom cache size of zero for default + AdvertiseAddr string // Set as the leader address for HA } // NewCore isk used to construct a new core @@ -197,6 +205,9 @@ func NewCore(conf *CoreConfig) (*Core, error) { if ha, ok := conf.Physical.(physical.HABackend); ok { haBackend = ha } + if haBackend != nil && conf.AdvertiseAddr == "" { + return nil, fmt.Errorf("missing advertisement address") + } // Wrap the backend in a cache unless disabled if !conf.DisableCache { @@ -221,13 +232,14 @@ func NewCore(conf *CoreConfig) (*Core, error) { // Setup the core c := &Core{ - ha: haBackend, - physical: conf.Physical, - barrier: barrier, - router: NewRouter(), - sealed: true, - standby: true, - logger: conf.Logger, + ha: haBackend, + advertiseAddr: conf.AdvertiseAddr, + physical: conf.Physical, + barrier: barrier, + router: NewRouter(), + sealed: true, + standby: true, + logger: conf.Logger, } // Setup the backends @@ -887,7 +899,8 @@ func (c *Core) runStandby(doneCh, stopCh chan struct{}) { } // Create a lock - lock, err := c.ha.LockWith(coreLockPath) + uuid := generateUUID() + lock, err := c.ha.LockWith(coreLockPath, uuid) if err != nil { c.logger.Printf("[ERR] core: failed to create lock: %v", err) return @@ -902,6 +915,13 @@ func (c *Core) runStandby(doneCh, stopCh chan struct{}) { } c.logger.Printf("[INFO] core: acquired lock, enabling active operation") + // Advertise ourself as leader + if err := c.advertiseLeader(uuid); err != nil { + c.logger.Printf("[ERR] core: leader advertisement setup failed: %v", err) + lock.Unlock() + continue + } + // Attempt the post-unseal process c.stateLock.Lock() err = c.postUnseal() @@ -925,6 +945,11 @@ func (c *Core) runStandby(doneCh, stopCh chan struct{}) { c.logger.Printf("[WARN] core: stopping active operation") } + // Clear ourself as leader + if err := c.clearLeader(uuid); err != nil { + c.logger.Printf("[ERR] core: clearing leader advertisement failed: %v", err) + } + // Attempt the pre-seal process c.stateLock.Lock() c.standby = true @@ -961,6 +986,21 @@ func (c *Core) acquireLock(lock physical.Lock, stopCh <-chan struct{}) <-chan st } } +// advertiseLeader is used to advertise the current node as leader +func (c *Core) advertiseLeader(uuid string) error { + ent := &Entry{ + Key: coreLeaderPrefix + uuid, + Value: []byte(c.advertiseAddr), + } + return c.barrier.Put(ent) +} + +// clearLeader is used to clear our leadership entry +func (c *Core) clearLeader(uuid string) error { + key := coreLeaderPrefix + uuid + return c.barrier.Delete(key) +} + // emitMetrics is used to periodically expose metrics while runnig func (c *Core) emitMetrics(stopCh chan struct{}) { for { diff --git a/vault/core_test.go b/vault/core_test.go index 3fd41e7cf2..f99ee75c7b 100644 --- a/vault/core_test.go +++ b/vault/core_test.go @@ -987,7 +987,7 @@ func TestCore_HandleRequest_CreateToken_Lease(t *testing.T) { func TestCore_Standby(t *testing.T) { // Create the first core and initialize it inm := physical.NewInmemHA() - core, err := NewCore(&CoreConfig{Physical: inm}) + core, err := NewCore(&CoreConfig{Physical: inm, AdvertiseAddr: "foo"}) if err != nil { t.Fatalf("err: %v", err) } @@ -1036,7 +1036,7 @@ func TestCore_Standby(t *testing.T) { } // Create a second core, attached to same in-memory store - core2, err := NewCore(&CoreConfig{Physical: inm}) + core2, err := NewCore(&CoreConfig{Physical: inm, AdvertiseAddr: "bar"}) if err != nil { t.Fatalf("err: %v", err) } From b8a3a76a6b2dc2725c20b56e02c798b7245049d7 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 14 Apr 2015 16:53:40 -0700 Subject: [PATCH 11/11] vault: expose the current leader --- vault/core.go | 52 ++++++++++++++++++++++++++++++++++++++++++++++ vault/core_test.go | 36 ++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+) diff --git a/vault/core.go b/vault/core.go index 75f91b8fdd..2745a44771 100644 --- a/vault/core.go +++ b/vault/core.go @@ -58,6 +58,10 @@ var ( // ErrInternalError is returned when we don't want to leak // any information about an internal error ErrInternalError = errors.New("internal error") + + // ErrHANotEnabled is returned if the operation only makes sense + // in an HA setting + ErrHANotEnabled = errors.New("Vault is not configured for highly-available mode") ) // SealConfig is used to describe the seal configuration @@ -631,6 +635,54 @@ func (c *Core) Standby() (bool, error) { return c.standby, nil } +// Leader is used to get the current active leader +func (c *Core) Leader() (bool, string, error) { + c.stateLock.RLock() + defer c.stateLock.RUnlock() + // Check if sealed + if c.sealed { + return false, "", ErrSealed + } + + // Check if HA enabled + if c.ha == nil { + return false, "", ErrHANotEnabled + } + + // Check if we are the leader + if !c.standby { + return true, c.advertiseAddr, nil + } + + // Initialize a lock + lock, err := c.ha.LockWith(coreLockPath, "read") + if err != nil { + return false, "", err + } + + // Read the value + held, value, err := lock.Value() + if err != nil { + return false, "", err + } + if !held { + return false, "", nil + } + + // Value is the UUID of the leader, fetch the key + key := coreLeaderPrefix + value + entry, err := c.barrier.Get(key) + if err != nil { + return false, "", err + } + if entry == nil { + return false, "", nil + } + + // Leader address is in the entry + return false, string(entry.Value), nil +} + // SealConfiguration is used to return information // about the configuration of the Vault and it's current // status. diff --git a/vault/core_test.go b/vault/core_test.go index f99ee75c7b..8eeed93922 100644 --- a/vault/core_test.go +++ b/vault/core_test.go @@ -1035,6 +1035,18 @@ func TestCore_Standby(t *testing.T) { t.Fatalf("err: %v", err) } + // Check the leader is local + isLeader, advertise, err := core.Leader() + if err != nil { + t.Fatalf("err: %v", err) + } + if !isLeader { + t.Fatalf("should be leader") + } + if advertise != "foo" { + t.Fatalf("Bad advertise: %v", advertise) + } + // Create a second core, attached to same in-memory store core2, err := NewCore(&CoreConfig{Physical: inm, AdvertiseAddr: "bar"}) if err != nil { @@ -1068,6 +1080,18 @@ func TestCore_Standby(t *testing.T) { t.Fatalf("err: %v", err) } + // Check the leader is not local + isLeader, advertise, err = core2.Leader() + if err != nil { + t.Fatalf("err: %v", err) + } + if isLeader { + t.Fatalf("should not be leader") + } + if advertise != "foo" { + t.Fatalf("Bad advertise: %v", advertise) + } + // Seal the first core, should step down err = core.Seal(root) if err != nil { @@ -1113,4 +1137,16 @@ func TestCore_Standby(t *testing.T) { if resp.Data["foo"] != "bar" { t.Fatalf("bad: %#v", resp) } + + // Check the leader is local + isLeader, advertise, err = core2.Leader() + if err != nil { + t.Fatalf("err: %v", err) + } + if !isLeader { + t.Fatalf("should be leader") + } + if advertise != "bar" { + t.Fatalf("Bad advertise: %v", advertise) + } }