From db309326c8a6f5cb114e042f01be4f79720f91f3 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 14 Apr 2015 16:36:53 -0700 Subject: [PATCH] physical: Support association of value with lock --- physical/consul.go | 39 ++++++++++++++++++++++++-- physical/inmem_ha.go | 58 +++++++++++++++++++++++++++------------ physical/physical.go | 5 +++- physical/physical_test.go | 28 +++++++++++++++++-- 4 files changed, 107 insertions(+), 23 deletions(-) diff --git a/physical/consul.go b/physical/consul.go index 4c8c1886b5..1dc8901edd 100644 --- a/physical/consul.go +++ b/physical/consul.go @@ -113,15 +113,50 @@ func (c *ConsulBackend) List(prefix string) ([]string, error) { } // Lock is used for mutual exclusion based on the given key. -func (c *ConsulBackend) LockWith(key string) (Lock, error) { +func (c *ConsulBackend) LockWith(key, value string) (Lock, error) { // Create the lock opts := &api.LockOptions{ Key: key, + Value: []byte(value), SessionName: "Vault Lock", } lock, err := c.client.LockOpts(opts) if err != nil { return nil, fmt.Errorf("failed to create lock: %v", err) } - return lock, nil + cl := &ConsulLock{ + client: c.client, + key: key, + lock: lock, + } + return cl, nil +} + +// ConsulLock is used to provide the Lock interface backed by Consul +type ConsulLock struct { + client *api.Client + key string + lock *api.Lock +} + +func (c *ConsulLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) { + return c.lock.Lock(stopCh) +} + +func (c *ConsulLock) Unlock() error { + return c.lock.Unlock() +} + +func (c *ConsulLock) Value() (bool, string, error) { + kv := c.client.KV() + pair, _, err := kv.Get(c.key, nil) + if err != nil { + return false, "", err + } + if pair == nil { + return false, "", nil + } + held := pair.Session != "" + value := string(pair.Value) + return held, value, nil } diff --git a/physical/inmem_ha.go b/physical/inmem_ha.go index 895c7e4c0b..92ee314f77 100644 --- a/physical/inmem_ha.go +++ b/physical/inmem_ha.go @@ -7,38 +7,36 @@ import ( type InmemHABackend struct { InmemBackend - locks map[string]*sync.Mutex + locks map[string]string l sync.Mutex + cond *sync.Cond } // NewInmemHA constructs a new in-memory HA backend. This is only for testing. func NewInmemHA() *InmemHABackend { in := &InmemHABackend{ InmemBackend: *NewInmem(), - locks: make(map[string]*sync.Mutex), + locks: make(map[string]string), } + in.cond = sync.NewCond(&in.l) return in } // LockWith is used for mutual exclusion based on the given key. -func (i *InmemHABackend) LockWith(key string) (Lock, error) { - i.l.Lock() - defer i.l.Unlock() - - mutex, ok := i.locks[key] - if !ok { - mutex = new(sync.Mutex) - i.locks[key] = mutex +func (i *InmemHABackend) LockWith(key, value string) (Lock, error) { + l := &InmemLock{ + in: i, + key: key, + value: value, } - - return &InmemLock{mutex: mutex}, nil + return l, nil } // InmemLock is an in-memory Lock implementation for the HABackend type InmemLock struct { - // mutex is the underlying mutex, may be shared between - // instances of InmemLock - mutex *sync.Mutex + in *InmemHABackend + key string + value string held bool leaderCh chan struct{} @@ -56,13 +54,26 @@ func (i *InmemLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) { didLock := make(chan struct{}) releaseCh := make(chan bool, 1) go func() { - i.mutex.Lock() + // Wait to acquire the lock + i.in.l.Lock() + _, ok := i.in.locks[i.key] + for ok { + i.in.cond.Wait() + _, ok = i.in.locks[i.key] + } + i.in.locks[i.key] = i.value + i.in.l.Unlock() + + // Signal that lock is held close(didLock) // Handle an early abort release := <-releaseCh if release { - i.mutex.Unlock() + i.in.l.Lock() + delete(i.in.locks, i.key) + i.in.l.Unlock() + i.in.cond.Broadcast() } }() @@ -92,6 +103,17 @@ func (i *InmemLock) Unlock() error { close(i.leaderCh) i.leaderCh = nil i.held = false - i.mutex.Unlock() + + i.in.l.Lock() + delete(i.in.locks, i.key) + i.in.l.Unlock() + i.in.cond.Broadcast() return nil } + +func (i *InmemLock) Value() (bool, string, error) { + i.in.l.Lock() + val, ok := i.in.locks[i.key] + i.in.l.Unlock() + return ok, val, nil +} diff --git a/physical/physical.go b/physical/physical.go index 47c4057c2d..51b607ac33 100644 --- a/physical/physical.go +++ b/physical/physical.go @@ -29,7 +29,7 @@ type Backend interface { // hot standby for a leader that services all requests. type HABackend interface { // LockWith is used for mutual exclusion based on the given key. - LockWith(key string) (Lock, error) + LockWith(key, value string) (Lock, error) } type Lock interface { @@ -41,6 +41,9 @@ type Lock interface { // Unlock is used to release the lock Unlock() error + + // Returns the value of the lock and if it is held + Value() (bool, string, error) } // Entry is used to represent data stored by the physical backend diff --git a/physical/physical_test.go b/physical/physical_test.go index 93e524dda2..f6fce1a11a 100644 --- a/physical/physical_test.go +++ b/physical/physical_test.go @@ -163,7 +163,7 @@ func testBackend_ListPrefix(t *testing.T, b Backend) { func testHABackend(t *testing.T, b HABackend, b2 HABackend) { // Get the lock - lock, err := b.LockWith("foo") + lock, err := b.LockWith("foo", "bar") if err != nil { t.Fatalf("err: %v", err) } @@ -177,8 +177,20 @@ func testHABackend(t *testing.T, b HABackend, b2 HABackend) { t.Fatalf("failed to get leader ch") } + // Check the value + held, val, err := lock.Value() + if err != nil { + t.Fatalf("err: %v", err) + } + if !held { + t.Fatalf("should be held") + } + if val != "bar" { + t.Fatalf("bad value: %v", err) + } + // Second acquisition should fail - lock2, err := b2.LockWith("foo") + lock2, err := b2.LockWith("foo", "baz") if err != nil { t.Fatalf("err: %v", err) } @@ -209,4 +221,16 @@ func testHABackend(t *testing.T, b HABackend, b2 HABackend) { if leaderCh2 == nil { t.Fatalf("should get leader ch") } + + // Check the value + held, val, err = lock.Value() + if err != nil { + t.Fatalf("err: %v", err) + } + if !held { + t.Fatalf("should be held") + } + if val != "baz" { + t.Fatalf("bad value: %v", err) + } }