Merge pull request #5 from hashicorp/f-ha

Support for High-Availability mode based on Backends
This commit is contained in:
Armon Dadgar 2015-04-14 16:59:35 -07:00
commit 310e1c503d
9 changed files with 710 additions and 24 deletions

View file

@ -111,3 +111,52 @@ func (c *ConsulBackend) List(prefix string) ([]string, error) {
sort.Strings(out)
return out, err
}
// Lock is used for mutual exclusion based on the given key.
func (c *ConsulBackend) LockWith(key, value string) (Lock, error) {
// Create the lock
opts := &api.LockOptions{
Key: key,
Value: []byte(value),
SessionName: "Vault Lock",
}
lock, err := c.client.LockOpts(opts)
if err != nil {
return nil, fmt.Errorf("failed to create lock: %v", err)
}
cl := &ConsulLock{
client: c.client,
key: key,
lock: lock,
}
return cl, nil
}
// ConsulLock is used to provide the Lock interface backed by Consul
type ConsulLock struct {
client *api.Client
key string
lock *api.Lock
}
func (c *ConsulLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
return c.lock.Lock(stopCh)
}
func (c *ConsulLock) Unlock() error {
return c.lock.Unlock()
}
func (c *ConsulLock) Value() (bool, string, error) {
kv := c.client.KV()
pair, _, err := kv.Get(c.key, nil)
if err != nil {
return false, "", err
}
if pair == nil {
return false, "", nil
}
held := pair.Session != ""
value := string(pair.Value)
return held, value, nil
}

View file

@ -37,4 +37,10 @@ func TestConsulBackend(t *testing.T) {
testBackend(t, b)
testBackend_ListPrefix(t, b)
ha, ok := b.(HABackend)
if !ok {
t.Fatalf("consul does not implement HABackend")
}
testHABackend(t, ha, ha)
}

119
physical/inmem_ha.go Normal file
View file

@ -0,0 +1,119 @@
package physical
import (
"fmt"
"sync"
)
type InmemHABackend struct {
InmemBackend
locks map[string]string
l sync.Mutex
cond *sync.Cond
}
// NewInmemHA constructs a new in-memory HA backend. This is only for testing.
func NewInmemHA() *InmemHABackend {
in := &InmemHABackend{
InmemBackend: *NewInmem(),
locks: make(map[string]string),
}
in.cond = sync.NewCond(&in.l)
return in
}
// LockWith is used for mutual exclusion based on the given key.
func (i *InmemHABackend) LockWith(key, value string) (Lock, error) {
l := &InmemLock{
in: i,
key: key,
value: value,
}
return l, nil
}
// InmemLock is an in-memory Lock implementation for the HABackend
type InmemLock struct {
in *InmemHABackend
key string
value string
held bool
leaderCh chan struct{}
l sync.Mutex
}
func (i *InmemLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
i.l.Lock()
defer i.l.Unlock()
if i.held {
return nil, fmt.Errorf("lock already held")
}
// Attempt an async acquisition
didLock := make(chan struct{})
releaseCh := make(chan bool, 1)
go func() {
// Wait to acquire the lock
i.in.l.Lock()
_, ok := i.in.locks[i.key]
for ok {
i.in.cond.Wait()
_, ok = i.in.locks[i.key]
}
i.in.locks[i.key] = i.value
i.in.l.Unlock()
// Signal that lock is held
close(didLock)
// Handle an early abort
release := <-releaseCh
if release {
i.in.l.Lock()
delete(i.in.locks, i.key)
i.in.l.Unlock()
i.in.cond.Broadcast()
}
}()
// Wait for lock acquisition or shutdown
select {
case <-didLock:
releaseCh <- false
case <-stopCh:
releaseCh <- true
return nil, nil
}
// Create the leader channel
i.held = true
i.leaderCh = make(chan struct{})
return i.leaderCh, nil
}
func (i *InmemLock) Unlock() error {
i.l.Lock()
defer i.l.Unlock()
if !i.held {
return nil
}
close(i.leaderCh)
i.leaderCh = nil
i.held = false
i.in.l.Lock()
delete(i.in.locks, i.key)
i.in.l.Unlock()
i.in.cond.Broadcast()
return nil
}
func (i *InmemLock) Value() (bool, string, error) {
i.in.l.Lock()
val, ok := i.in.locks[i.key]
i.in.l.Unlock()
return ok, val, nil
}

View file

@ -0,0 +1,8 @@
package physical
import "testing"
func TestInmemHA(t *testing.T) {
inm := NewInmemHA()
testHABackend(t, inm, inm)
}

View file

@ -23,6 +23,29 @@ type Backend interface {
List(prefix string) ([]string, error)
}
// HABackend is an extentions to the standard physical
// backend to support high-availability. Vault only expects to
// use mutual exclusion to allow multiple instances to act as a
// hot standby for a leader that services all requests.
type HABackend interface {
// LockWith is used for mutual exclusion based on the given key.
LockWith(key, value string) (Lock, error)
}
type Lock interface {
// Lock is used to acquire the given lock
// The stopCh is optional and if closed should interrupt the lock
// acquisition attempt. The return struct should be closed when
// leadership is lost.
Lock(stopCh <-chan struct{}) (<-chan struct{}, error)
// Unlock is used to release the lock
Unlock() error
// Returns the value of the lock and if it is held
Value() (bool, string, error)
}
// Entry is used to represent data stored by the physical backend
type Entry struct {
Key string

View file

@ -3,6 +3,7 @@ package physical
import (
"reflect"
"testing"
"time"
)
func testNewBackend(t *testing.T) {
@ -159,3 +160,77 @@ func testBackend_ListPrefix(t *testing.T, b Backend) {
t.Fatalf("bad: %v", keys)
}
}
func testHABackend(t *testing.T, b HABackend, b2 HABackend) {
// Get the lock
lock, err := b.LockWith("foo", "bar")
if err != nil {
t.Fatalf("err: %v", err)
}
// Attempt to lock
leaderCh, err := lock.Lock(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if leaderCh == nil {
t.Fatalf("failed to get leader ch")
}
// Check the value
held, val, err := lock.Value()
if err != nil {
t.Fatalf("err: %v", err)
}
if !held {
t.Fatalf("should be held")
}
if val != "bar" {
t.Fatalf("bad value: %v", err)
}
// Second acquisition should fail
lock2, err := b2.LockWith("foo", "baz")
if err != nil {
t.Fatalf("err: %v", err)
}
// Cancel attempt in 50 msec
stopCh := make(chan struct{})
time.AfterFunc(50*time.Millisecond, func() {
close(stopCh)
})
// Attempt to lock
leaderCh2, err := lock2.Lock(stopCh)
if err != nil {
t.Fatalf("err: %v", err)
}
if leaderCh2 != nil {
t.Fatalf("should not get leader ch")
}
// Release the first lock
lock.Unlock()
// Attempt to lock should work
leaderCh2, err = lock2.Lock(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if leaderCh2 == nil {
t.Fatalf("should get leader ch")
}
// Check the value
held, val, err = lock.Value()
if err != nil {
t.Fatalf("err: %v", err)
}
if !held {
t.Fatalf("should be held")
}
if val != "baz" {
t.Fatalf("bad value: %v", err)
}
}

View file

@ -24,6 +24,18 @@ const (
// it even with the Vault sealed. This is required so that we know
// how many secret parts must be used to reconstruct the master key.
coreSealConfigPath = "core/seal-config"
// coreLockPath is the path used to acquire a coordinating lock
// for a highly-available deploy.
coreLockPath = "core/lock"
// coreLeaderPrefix is the prefix used for the UUID that contains
// the currently elected leader.
coreLeaderPrefix = "core/leader/"
// lockRetryInterval is the interval we re-attempt to acquire the
// HA lock if an error is encountered
lockRetryInterval = 10 * time.Second
)
var (
@ -31,6 +43,10 @@ var (
// a sealed barrier. No operation is expected to succeed before unsealing
ErrSealed = errors.New("Vault is sealed")
// ErrStandby is returned if an operation is performed on
// a standby Vault. No operation is expected to succeed until active.
ErrStandby = errors.New("Vault is in standby mode")
// ErrAlreadyInit is returned if the core is already
// initialized. This prevents a re-initialization.
ErrAlreadyInit = errors.New("Vault is already initialized")
@ -42,6 +58,10 @@ var (
// ErrInternalError is returned when we don't want to leak
// any information about an internal error
ErrInternalError = errors.New("internal error")
// ErrHANotEnabled is returned if the operation only makes sense
// in an HA setting
ErrHANotEnabled = errors.New("Vault is not configured for highly-available mode")
)
// SealConfig is used to describe the seal configuration
@ -96,6 +116,12 @@ func (e *ErrInvalidKey) Error() string {
// interface for API handlers and is responsible for managing the logical and physical
// backends, router, security barrier, and audit trails.
type Core struct {
// HABackend may be available depending on the physical backend
ha physical.HABackend
// AdvertiseAddr is the address we advertise as leader if held
advertiseAddr string
// physical backend is the un-trusted backend with durable data
physical physical.Backend
@ -118,6 +144,10 @@ type Core struct {
stateLock sync.RWMutex
sealed bool
standby bool
standbyDoneCh chan struct{}
standbyStopCh chan struct{}
// unlockParts has the keys provided to Unseal until
// the threshold number of parts is available.
unlockParts [][]byte
@ -167,12 +197,22 @@ type CoreConfig struct {
AuditBackends map[string]audit.Factory
Physical physical.Backend
Logger *log.Logger
DisableCache bool // Disables the LRU cache on the physical backend
CacheSize int // Custom cache size of zero for default
DisableCache bool // Disables the LRU cache on the physical backend
CacheSize int // Custom cache size of zero for default
AdvertiseAddr string // Set as the leader address for HA
}
// NewCore isk used to construct a new core
func NewCore(conf *CoreConfig) (*Core, error) {
// Check if this backend supports an HA configuraiton
var haBackend physical.HABackend
if ha, ok := conf.Physical.(physical.HABackend); ok {
haBackend = ha
}
if haBackend != nil && conf.AdvertiseAddr == "" {
return nil, fmt.Errorf("missing advertisement address")
}
// Wrap the backend in a cache unless disabled
if !conf.DisableCache {
_, isCache := conf.Physical.(*physical.Cache)
@ -196,11 +236,14 @@ func NewCore(conf *CoreConfig) (*Core, error) {
// Setup the core
c := &Core{
physical: conf.Physical,
barrier: barrier,
router: NewRouter(),
sealed: true,
logger: conf.Logger,
ha: haBackend,
advertiseAddr: conf.AdvertiseAddr,
physical: conf.Physical,
barrier: barrier,
router: NewRouter(),
sealed: true,
standby: true,
logger: conf.Logger,
}
// Setup the backends
@ -238,6 +281,9 @@ func (c *Core) HandleRequest(req *logical.Request) (*logical.Response, error) {
if c.sealed {
return nil, ErrSealed
}
if c.standby {
return nil, ErrStandby
}
if c.router.LoginPath(req.Path) {
return c.handleLoginRequest(req)
@ -582,6 +628,61 @@ func (c *Core) Sealed() (bool, error) {
return c.sealed, nil
}
// Standby checks if the Vault is in standby mode
func (c *Core) Standby() (bool, error) {
c.stateLock.RLock()
defer c.stateLock.RUnlock()
return c.standby, nil
}
// Leader is used to get the current active leader
func (c *Core) Leader() (bool, string, error) {
c.stateLock.RLock()
defer c.stateLock.RUnlock()
// Check if sealed
if c.sealed {
return false, "", ErrSealed
}
// Check if HA enabled
if c.ha == nil {
return false, "", ErrHANotEnabled
}
// Check if we are the leader
if !c.standby {
return true, c.advertiseAddr, nil
}
// Initialize a lock
lock, err := c.ha.LockWith(coreLockPath, "read")
if err != nil {
return false, "", err
}
// Read the value
held, value, err := lock.Value()
if err != nil {
return false, "", err
}
if !held {
return false, "", nil
}
// Value is the UUID of the leader, fetch the key
key := coreLeaderPrefix + value
entry, err := c.barrier.Get(key)
if err != nil {
return false, "", err
}
if entry == nil {
return false, "", nil
}
// Leader address is in the entry
return false, string(entry.Value), nil
}
// SealConfiguration is used to return information
// about the configuration of the Vault and it's current
// status.
@ -695,15 +796,21 @@ func (c *Core) Unseal(key []byte) (bool, error) {
}
c.logger.Printf("[INFO] core: vault is unsealed")
// Do post-unseal setup
c.logger.Printf("[INFO] core: post-unseal setup starting")
if err := c.postUnseal(); err != nil {
c.logger.Printf("[ERR] core: post-unseal setup failed: %v", err)
c.barrier.Seal()
c.logger.Printf("[WARN] core: vault is sealed")
return false, err
// Do post-unseal setup if HA is not enabled
if c.ha == nil {
c.standby = false
if err := c.postUnseal(); err != nil {
c.logger.Printf("[ERR] core: post-unseal setup failed: %v", err)
c.barrier.Seal()
c.logger.Printf("[WARN] core: vault is sealed")
return false, err
}
} else {
// Go to standby mode, wait until we are active to unseal
c.standbyDoneCh = make(chan struct{})
c.standbyStopCh = make(chan struct{})
go c.runStandby(c.standbyDoneCh, c.standbyStopCh)
}
c.logger.Printf("[INFO] core: post-unseal setup complete")
// Success!
c.sealed = false
@ -726,15 +833,24 @@ func (c *Core) Seal(token string) error {
return err
}
// Enable that we are sealed to prevent furthur transactions
c.sealed = true
// Do pre-seal teardown
c.logger.Printf("[INFO] core: pre-seal teardown starting")
if err := c.preSeal(); err != nil {
c.logger.Printf("[ERR] core: pre-seal teardown failed: %v", err)
return fmt.Errorf("internal error")
// Do pre-seal teardown if HA is not enabled
if c.ha == nil {
if err := c.preSeal(); err != nil {
c.logger.Printf("[ERR] core: pre-seal teardown failed: %v", err)
return fmt.Errorf("internal error")
}
} else {
// Signal the standby goroutine to shutdown, wait for completion
close(c.standbyStopCh)
// Release the lock while we wait to avoid deadlocking
c.stateLock.Unlock()
<-c.standbyDoneCh
c.stateLock.Lock()
}
c.logger.Printf("[INFO] core: pre-seal teardown complete")
if err := c.barrier.Seal(); err != nil {
return err
@ -749,6 +865,7 @@ func (c *Core) Seal(token string) error {
// credential stores, etc.
func (c *Core) postUnseal() error {
defer metrics.MeasureSince([]string{"core", "post_unseal"}, time.Now())
c.logger.Printf("[INFO] core: post-unseal setup starting")
if cache, ok := c.physical.(*physical.Cache); ok {
cache.Purge()
}
@ -781,6 +898,7 @@ func (c *Core) postUnseal() error {
}
c.metricsCh = make(chan struct{})
go c.emitMetrics(c.metricsCh)
c.logger.Printf("[INFO] core: post-unseal setup complete")
return nil
}
@ -788,6 +906,7 @@ func (c *Core) postUnseal() error {
// for any state teardown required.
func (c *Core) preSeal() error {
defer metrics.MeasureSince([]string{"core", "pre_seal"}, time.Now())
c.logger.Printf("[INFO] core: pre-seal teardown starting")
if c.metricsCh != nil {
close(c.metricsCh)
c.metricsCh = nil
@ -813,9 +932,127 @@ func (c *Core) preSeal() error {
if cache, ok := c.physical.(*physical.Cache); ok {
cache.Purge()
}
c.logger.Printf("[INFO] core: pre-seal teardown complete")
return nil
}
// runStandby is a long running routine that is used when an HA backend
// is enabled. It waits until we are leader and switches this Vault to
// active.
func (c *Core) runStandby(doneCh, stopCh chan struct{}) {
defer close(doneCh)
c.logger.Printf("[INFO] core: entering standby mode")
for {
// Check for a shutdown
select {
case <-stopCh:
return
default:
}
// Create a lock
uuid := generateUUID()
lock, err := c.ha.LockWith(coreLockPath, uuid)
if err != nil {
c.logger.Printf("[ERR] core: failed to create lock: %v", err)
return
}
// Attempt the acquisition
leaderCh := c.acquireLock(lock, stopCh)
// Bail if we are being shutdown
if leaderCh == nil {
return
}
c.logger.Printf("[INFO] core: acquired lock, enabling active operation")
// Advertise ourself as leader
if err := c.advertiseLeader(uuid); err != nil {
c.logger.Printf("[ERR] core: leader advertisement setup failed: %v", err)
lock.Unlock()
continue
}
// Attempt the post-unseal process
c.stateLock.Lock()
err = c.postUnseal()
if err == nil {
c.standby = false
}
c.stateLock.Unlock()
// Handle a failure to unseal
if err != nil {
c.logger.Printf("[ERR] core: post-unseal setup failed: %v", err)
lock.Unlock()
continue
}
// Monitor a loss of leadership
select {
case <-leaderCh:
c.logger.Printf("[WARN] core: leadership lost, stopping active operation")
case <-stopCh:
c.logger.Printf("[WARN] core: stopping active operation")
}
// Clear ourself as leader
if err := c.clearLeader(uuid); err != nil {
c.logger.Printf("[ERR] core: clearing leader advertisement failed: %v", err)
}
// Attempt the pre-seal process
c.stateLock.Lock()
c.standby = true
err = c.preSeal()
c.stateLock.Unlock()
// Give up leadership
lock.Unlock()
// Check for a failure to prepare to seal
if err := c.preSeal(); err != nil {
c.logger.Printf("[ERR] core: pre-seal teardown failed: %v", err)
continue
}
}
}
// acquireLock blocks until the lock is acquired, returning the leaderCh
func (c *Core) acquireLock(lock physical.Lock, stopCh <-chan struct{}) <-chan struct{} {
for {
// Attempt lock acquisition
leaderCh, err := lock.Lock(stopCh)
if err == nil {
return leaderCh
}
// Retry the acquisition
c.logger.Printf("[ERR] core: failed to acquire lock: %v", err)
select {
case <-time.After(lockRetryInterval):
case <-stopCh:
return nil
}
}
}
// advertiseLeader is used to advertise the current node as leader
func (c *Core) advertiseLeader(uuid string) error {
ent := &Entry{
Key: coreLeaderPrefix + uuid,
Value: []byte(c.advertiseAddr),
}
return c.barrier.Put(ent)
}
// clearLeader is used to clear our leadership entry
func (c *Core) clearLeader(uuid string) error {
key := coreLeaderPrefix + uuid
return c.barrier.Delete(key)
}
// emitMetrics is used to periodically expose metrics while runnig
func (c *Core) emitMetrics(stopCh chan struct{}) {
for {

View file

@ -983,3 +983,170 @@ func TestCore_HandleRequest_CreateToken_Lease(t *testing.T) {
t.Fatalf("bad: %#v", resp.Auth)
}
}
func TestCore_Standby(t *testing.T) {
// Create the first core and initialize it
inm := physical.NewInmemHA()
core, err := NewCore(&CoreConfig{Physical: inm, AdvertiseAddr: "foo"})
if err != nil {
t.Fatalf("err: %v", err)
}
key, root := TestCoreInit(t, core)
if _, err := core.Unseal(TestKeyCopy(key)); err != nil {
t.Fatalf("unseal err: %s", err)
}
// Verify unsealed
sealed, err := core.Sealed()
if err != nil {
t.Fatalf("err checking seal status: %s", err)
}
if sealed {
t.Fatal("should not be sealed")
}
// Wait for core to become active
start := time.Now()
var standby bool
for time.Now().Sub(start) < time.Second {
standby, err = core.Standby()
if err != nil {
t.Fatalf("err: %v", err)
}
if !standby {
break
}
}
if standby {
t.Fatalf("should not be in standby mode")
}
// Put a secret
req := &logical.Request{
Operation: logical.WriteOperation,
Path: "secret/foo",
Data: map[string]interface{}{
"foo": "bar",
},
ClientToken: root,
}
_, err = core.HandleRequest(req)
if err != nil {
t.Fatalf("err: %v", err)
}
// Check the leader is local
isLeader, advertise, err := core.Leader()
if err != nil {
t.Fatalf("err: %v", err)
}
if !isLeader {
t.Fatalf("should be leader")
}
if advertise != "foo" {
t.Fatalf("Bad advertise: %v", advertise)
}
// Create a second core, attached to same in-memory store
core2, err := NewCore(&CoreConfig{Physical: inm, AdvertiseAddr: "bar"})
if err != nil {
t.Fatalf("err: %v", err)
}
if _, err := core2.Unseal(TestKeyCopy(key)); err != nil {
t.Fatalf("unseal err: %s", err)
}
// Verify unsealed
sealed, err = core2.Sealed()
if err != nil {
t.Fatalf("err checking seal status: %s", err)
}
if sealed {
t.Fatal("should not be sealed")
}
// Core2 should be in standby
standby, err = core2.Standby()
if err != nil {
t.Fatalf("err: %v", err)
}
if !standby {
t.Fatalf("should be standby")
}
// Request should fail in standby mode
_, err = core2.HandleRequest(req)
if err != ErrStandby {
t.Fatalf("err: %v", err)
}
// Check the leader is not local
isLeader, advertise, err = core2.Leader()
if err != nil {
t.Fatalf("err: %v", err)
}
if isLeader {
t.Fatalf("should not be leader")
}
if advertise != "foo" {
t.Fatalf("Bad advertise: %v", advertise)
}
// Seal the first core, should step down
err = core.Seal(root)
if err != nil {
t.Fatalf("err: %v", err)
}
// Core should be in standby
standby, err = core.Standby()
if err != nil {
t.Fatalf("err: %v", err)
}
if !standby {
t.Fatalf("should be standby")
}
// Wait for core2 to become active
start = time.Now()
for time.Now().Sub(start) < time.Second {
standby, err = core2.Standby()
if err != nil {
t.Fatalf("err: %v", err)
}
if !standby {
break
}
}
if standby {
t.Fatalf("should not be in standby mode")
}
// Read the secret
req = &logical.Request{
Operation: logical.ReadOperation,
Path: "secret/foo",
ClientToken: root,
}
resp, err := core2.HandleRequest(req)
if err != nil {
t.Fatalf("err: %v", err)
}
// Verify the response
if resp.Data["foo"] != "bar" {
t.Fatalf("bad: %#v", resp)
}
// Check the leader is local
isLeader, advertise, err = core2.Leader()
if err != nil {
t.Fatalf("err: %v", err)
}
if !isLeader {
t.Fatalf("should be leader")
}
if advertise != "bar" {
t.Fatalf("Bad advertise: %v", advertise)
}
}

View file

@ -96,10 +96,12 @@ func (c *Core) setupExpiration() error {
// stopExpiration is used to stop the expiration manager before
// sealing the Vault.
func (c *Core) stopExpiration() error {
if err := c.expiration.Stop(); err != nil {
return err
if c.expiration != nil {
if err := c.expiration.Stop(); err != nil {
return err
}
c.expiration = nil
}
c.expiration = nil
return nil
}