From 5e9e9cfff6d4cd93be3bdb8815341f417ca7f856 Mon Sep 17 00:00:00 2001 From: Ken Breeman Date: Tue, 12 May 2015 00:37:08 -0400 Subject: [PATCH 1/6] Rough implementation of Zookeeper HA physical backend. Contains breaking changes to 'path' config. Has unresolved TODO's. --- physical/zookeeper.go | 131 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 119 insertions(+), 12 deletions(-) diff --git a/physical/zookeeper.go b/physical/zookeeper.go index 1b01f4cbc6..b898becd56 100644 --- a/physical/zookeeper.go +++ b/physical/zookeeper.go @@ -14,7 +14,8 @@ import ( // prefix within Zookeeper. It is used in production situations as // it allows Vault to run on multiple machines in a highly-available manner. type ZookeeperBackend struct { - path string + datapath string + lockpath string client *zk.Conn } @@ -22,18 +23,20 @@ type ZookeeperBackend struct { // and the prefix in the KV store. func newZookeeperBackend(conf map[string]string) (Backend, error) { // Get the path in Zookeeper - path, ok := conf["path"] + basepath, ok := conf["path"] if !ok { - path = "vault/" + basepath = "vault/" } // Ensure path is suffixed and prefixed (zk requires prefix /) - if !strings.HasSuffix(path, "/") { - path += "/" + if !strings.HasSuffix(basepath, "/") { + basepath += "/" } - if !strings.HasPrefix(path, "/") { - path = "/" + path + if !strings.HasPrefix(basepath, "/") { + basepath = "/" + basepath } + datapath := basepath + "data/" + lockpath := basepath + "lock/" // Configure the client, default to localhost instance var machines string @@ -50,7 +53,8 @@ func newZookeeperBackend(conf map[string]string) (Backend, error) { // Setup the backend c := &ZookeeperBackend{ - path: path, + datapath: datapath, + lockpath: lockpath, client: client, } return c, nil @@ -93,7 +97,7 @@ func (c *ZookeeperBackend) Put(entry *Entry) error { defer metrics.MeasureSince([]string{"zookeeper", "put"}, time.Now()) // Attempt to set the full path - fullPath := c.path + entry.Key + fullPath := c.datapath + entry.Key _, err := c.client.Set(fullPath, entry.Value, 0) // If we get ErrNoNode, we need to construct the path hierarchy @@ -108,7 +112,7 @@ func (c *ZookeeperBackend) Get(key string) (*Entry, error) { defer metrics.MeasureSince([]string{"zookeeper", "get"}, time.Now()) // Attempt to read the full path - fullPath := c.path + key + fullPath := c.datapath + key value, _, err := c.client.Get(fullPath) // Ignore if the node does not exist @@ -135,7 +139,7 @@ func (c *ZookeeperBackend) Delete(key string) error { defer metrics.MeasureSince([]string{"zookeeper", "delete"}, time.Now()) // Delete the full path - fullPath := c.path + key + fullPath := c.datapath + key err := c.client.Delete(fullPath, -1) // Mask if the node does not exist @@ -151,7 +155,7 @@ func (c *ZookeeperBackend) List(prefix string) ([]string, error) { defer metrics.MeasureSince([]string{"zookeeper", "list"}, time.Now()) // Query the children at the full path - fullPath := strings.TrimSuffix(c.path+prefix, "/") + fullPath := strings.TrimSuffix(c.datapath+prefix, "/") result, _, err := c.client.Children(fullPath) // If the path nodes are missing, no children! @@ -174,3 +178,106 @@ func (c *ZookeeperBackend) List(prefix string) ([]string, error) { sort.Strings(children) return children, nil } + +// LockWith is used for mutual exclusion based on the given key. +func (c *ZookeeperBackend) LockWith(key, value string) (Lock, error) { + l := &ZookeeperHALock{ + in: c, + key: key, + value: value, + } + return l, nil +} + +func (c *ZookeeperBackend) DetectHostAddr() (string, error) { + // TODO: implement this! + return "", nil +} + +// ZookeeperHALock is a Zookeeper Lock implementation for the HABackend +type ZookeeperHALock struct { + in *ZookeeperBackend + key string + value string + + held bool + leaderCh chan struct{} + zkLock *zk.Lock +} + +func (i *ZookeeperHALock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) { + if i.held { + return nil, fmt.Errorf("lock already held") + } + + // Attempt an async acquisition + didLock := make(chan struct{}) + failLock := make(chan error, 1) + releaseCh := make(chan bool, 1) + go func() { + // Wait to acquire the lock in ZK + acl := zk.WorldACL(zk.PermAll) + lockpath := i.in.lockpath + i.key + lock := zk.NewLock(i.in.client, lockpath, acl) + err := lock.Lock() + if err != nil { + failLock <- err + return + } + // Set node value + err2 := i.in.ensurePath(lockpath, []byte(i.value)) + if err2 != nil { + failLock <- err2 + lock.Unlock() + return + } + i.zkLock = lock + + // Signal that lock is held + close(didLock) + + // Handle an early abort + release := <-releaseCh + if release { + lock.Unlock() + } + }() + + // Wait for lock acquisition, failure, or shutdown + select { + case <-didLock: + releaseCh <- false + case err := <-failLock: + return nil, err + case <-stopCh: + releaseCh <- true + return nil, nil + } + + // Create the leader channel + i.held = true + i.leaderCh = make(chan struct{}) + + // TODO: Watch for Events which could result in loss of our zkLock and close(i.leaderCh) + + return i.leaderCh, nil +} + +func (i *ZookeeperHALock) Unlock() error { + if !i.held { + return nil + } + + close(i.leaderCh) + i.leaderCh = nil + i.held = false + i.zkLock.Unlock() + return nil +} + +func (i *ZookeeperHALock) Value() (bool, string, error) { + lockpath := i.in.lockpath + i.key + value, _, err := i.in.client.Get(lockpath) + return i.held, string(value), err +} + From df0d430c003e9c28d0a3cbf6338b7411dc984d2d Mon Sep 17 00:00:00 2001 From: Ken Breeman Date: Wed, 20 May 2015 22:54:35 -0400 Subject: [PATCH 2/6] Implement HA lock loss detection for zookeeper physical backend --- physical/zookeeper.go | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/physical/zookeeper.go b/physical/zookeeper.go index b898becd56..2050ae82f8 100644 --- a/physical/zookeeper.go +++ b/physical/zookeeper.go @@ -214,10 +214,10 @@ func (i *ZookeeperHALock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) didLock := make(chan struct{}) failLock := make(chan error, 1) releaseCh := make(chan bool, 1) + lockpath := i.in.lockpath + i.key go func() { // Wait to acquire the lock in ZK acl := zk.WorldACL(zk.PermAll) - lockpath := i.in.lockpath + i.key lock := zk.NewLock(i.in.client, lockpath, acl) err := lock.Lock() if err != nil { @@ -258,7 +258,33 @@ func (i *ZookeeperHALock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) i.held = true i.leaderCh = make(chan struct{}) - // TODO: Watch for Events which could result in loss of our zkLock and close(i.leaderCh) + // Watch for Events which could result in loss of our zkLock and close(i.leaderCh) + currentVal, _, lockeventCh, err3 := i.in.client.GetW(lockpath) + if err3 != nil { + return nil, fmt.Errorf("unable to watch HA lock") + } + if i.value != string(currentVal) { + return nil, fmt.Errorf("lost HA lock immediately before watch") + } + go func() { + for { + select { + case event := <- lockeventCh: + // Lost connection? + if event.State != zk.StateConnected && event.State != zk.StateHasSession { + close(i.leaderCh) + } + // Lost watch + if event.Type == zk.EventNotWatching { + close(i.leaderCh) + } + // Lock changed + if event.Type == zk.EventNodeCreated || event.Type == zk.EventNodeDeleted || event.Type == zk.EventNodeDataChanged { + close(i.leaderCh) + } + } + } + }() return i.leaderCh, nil } From 0805ce27e638b15904b53f6760a8555210bdc6d9 Mon Sep 17 00:00:00 2001 From: Ken Breeman Date: Wed, 20 May 2015 23:15:31 -0400 Subject: [PATCH 3/6] Restore backwards compatibility for zookeeper physical backend. Vault already prevents locks and data from overlapping internally. --- physical/zookeeper.go | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/physical/zookeeper.go b/physical/zookeeper.go index da9530fd90..25a12c8ac1 100644 --- a/physical/zookeeper.go +++ b/physical/zookeeper.go @@ -14,8 +14,7 @@ import ( // prefix within Zookeeper. It is used in production situations as // it allows Vault to run on multiple machines in a highly-available manner. type ZookeeperBackend struct { - datapath string - lockpath string + path string client *zk.Conn } @@ -23,20 +22,18 @@ type ZookeeperBackend struct { // and the prefix in the KV store. func newZookeeperBackend(conf map[string]string) (Backend, error) { // Get the path in Zookeeper - basepath, ok := conf["path"] + path, ok := conf["path"] if !ok { - basepath = "vault/" + path = "vault/" } // Ensure path is suffixed and prefixed (zk requires prefix /) - if !strings.HasSuffix(basepath, "/") { - basepath += "/" + if !strings.HasSuffix(path, "/") { + path += "/" } - if !strings.HasPrefix(basepath, "/") { - basepath = "/" + basepath + if !strings.HasPrefix(path, "/") { + path = "/" + path } - datapath := basepath + "data/" - lockpath := basepath + "lock/" // Configure the client, default to localhost instance var machines string @@ -53,8 +50,7 @@ func newZookeeperBackend(conf map[string]string) (Backend, error) { // Setup the backend c := &ZookeeperBackend{ - datapath: datapath, - lockpath: lockpath, + path: path, client: client, } return c, nil @@ -117,7 +113,7 @@ func (c *ZookeeperBackend) Put(entry *Entry) error { defer metrics.MeasureSince([]string{"zookeeper", "put"}, time.Now()) // Attempt to set the full path - fullPath := c.datapath + entry.Key + fullPath := c.path + entry.Key _, err := c.client.Set(fullPath, entry.Value, -1) // If we get ErrNoNode, we need to construct the path hierarchy @@ -132,7 +128,7 @@ func (c *ZookeeperBackend) Get(key string) (*Entry, error) { defer metrics.MeasureSince([]string{"zookeeper", "get"}, time.Now()) // Attempt to read the full path - fullPath := c.datapath + key + fullPath := c.path + key value, _, err := c.client.Get(fullPath) // Ignore if the node does not exist @@ -159,7 +155,7 @@ func (c *ZookeeperBackend) Delete(key string) error { defer metrics.MeasureSince([]string{"zookeeper", "delete"}, time.Now()) // Delete the full path - fullPath := c.datapath + key + fullPath := c.path + key err := c.deletePath(fullPath) // Mask if the node does not exist @@ -175,7 +171,7 @@ func (c *ZookeeperBackend) List(prefix string) ([]string, error) { defer metrics.MeasureSince([]string{"zookeeper", "list"}, time.Now()) // Query the children at the full path - fullPath := strings.TrimSuffix(c.datapath+prefix, "/") + fullPath := strings.TrimSuffix(c.path+prefix, "/") result, _, err := c.client.Children(fullPath) // If the path nodes are missing, no children! @@ -234,7 +230,7 @@ func (i *ZookeeperHALock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) didLock := make(chan struct{}) failLock := make(chan error, 1) releaseCh := make(chan bool, 1) - lockpath := i.in.lockpath + i.key + lockpath := i.in.path + i.key go func() { // Wait to acquire the lock in ZK acl := zk.WorldACL(zk.PermAll) @@ -322,7 +318,7 @@ func (i *ZookeeperHALock) Unlock() error { } func (i *ZookeeperHALock) Value() (bool, string, error) { - lockpath := i.in.lockpath + i.key + lockpath := i.in.path + i.key value, _, err := i.in.client.Get(lockpath) return i.held, string(value), err } From df12702f6813ba3bb92bffc822b9344a490aa50a Mon Sep 17 00:00:00 2001 From: Ken Breeman Date: Mon, 25 May 2015 22:14:00 -0400 Subject: [PATCH 4/6] Improvements based on PR feedback: removed empty detectAddress function, moved anonymous functions to named ones, added localLock mutex around i.held --- physical/zookeeper.go | 115 ++++++++++++++++++++++-------------------- 1 file changed, 60 insertions(+), 55 deletions(-) diff --git a/physical/zookeeper.go b/physical/zookeeper.go index 25a12c8ac1..e99a69bbac 100644 --- a/physical/zookeeper.go +++ b/physical/zookeeper.go @@ -4,6 +4,7 @@ import ( "fmt" "sort" "strings" + "sync" "time" "github.com/armon/go-metrics" @@ -205,23 +206,21 @@ func (c *ZookeeperBackend) LockWith(key, value string) (Lock, error) { return l, nil } -func (c *ZookeeperBackend) DetectHostAddr() (string, error) { - // TODO: implement this! - return "", nil -} - // ZookeeperHALock is a Zookeeper Lock implementation for the HABackend type ZookeeperHALock struct { in *ZookeeperBackend key string value string - held bool - leaderCh chan struct{} - zkLock *zk.Lock + held bool + localLock sync.Mutex + leaderCh chan struct{} + zkLock *zk.Lock } func (i *ZookeeperHALock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) { + i.localLock.Lock() + defer i.localLock.Unlock() if i.held { return nil, fmt.Errorf("lock already held") } @@ -231,33 +230,7 @@ func (i *ZookeeperHALock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) failLock := make(chan error, 1) releaseCh := make(chan bool, 1) lockpath := i.in.path + i.key - go func() { - // Wait to acquire the lock in ZK - acl := zk.WorldACL(zk.PermAll) - lock := zk.NewLock(i.in.client, lockpath, acl) - err := lock.Lock() - if err != nil { - failLock <- err - return - } - // Set node value - err2 := i.in.ensurePath(lockpath, []byte(i.value)) - if err2 != nil { - failLock <- err2 - lock.Unlock() - return - } - i.zkLock = lock - - // Signal that lock is held - close(didLock) - - // Handle an early abort - release := <-releaseCh - if release { - lock.Unlock() - } - }() + go i.attemptLock(lockpath, didLock, failLock, releaseCh) // Wait for lock acquisition, failure, or shutdown select { @@ -282,30 +255,62 @@ func (i *ZookeeperHALock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) if i.value != string(currentVal) { return nil, fmt.Errorf("lost HA lock immediately before watch") } - go func() { - for { - select { - case event := <- lockeventCh: - // Lost connection? - if event.State != zk.StateConnected && event.State != zk.StateHasSession { - close(i.leaderCh) - } - // Lost watch - if event.Type == zk.EventNotWatching { - close(i.leaderCh) - } - // Lock changed - if event.Type == zk.EventNodeCreated || event.Type == zk.EventNodeDeleted || event.Type == zk.EventNodeDataChanged { - close(i.leaderCh) - } - } - } - }() + go i.monitorLock(lockeventCh, i.leaderCh) return i.leaderCh, nil } +func (i *ZookeeperHALock) attemptLock(lockpath string, didLock chan struct{}, failLock chan error, releaseCh chan bool) { + // Wait to acquire the lock in ZK + acl := zk.WorldACL(zk.PermAll) + lock := zk.NewLock(i.in.client, lockpath, acl) + err := lock.Lock() + if err != nil { + failLock <- err + return + } + // Set node value + err2 := i.in.ensurePath(lockpath, []byte(i.value)) + if err2 != nil { + failLock <- err2 + lock.Unlock() + return + } + i.zkLock = lock + + // Signal that lock is held + close(didLock) + + // Handle an early abort + release := <-releaseCh + if release { + lock.Unlock() + } +} + +func (i *ZookeeperHALock) monitorLock(lockeventCh <-chan zk.Event, leaderCh chan struct{}) { + for { + select { + case event := <- lockeventCh: + // Lost connection? + if event.State != zk.StateConnected && event.State != zk.StateHasSession { + close(i.leaderCh) + } + // Lost watch + if event.Type == zk.EventNotWatching { + close(i.leaderCh) + } + // Lock changed + if event.Type == zk.EventNodeCreated || event.Type == zk.EventNodeDeleted || event.Type == zk.EventNodeDataChanged { + close(i.leaderCh) + } + } + } +} + func (i *ZookeeperHALock) Unlock() error { + i.localLock.Lock() + defer i.localLock.Unlock() if !i.held { return nil } @@ -320,6 +325,6 @@ func (i *ZookeeperHALock) Unlock() error { func (i *ZookeeperHALock) Value() (bool, string, error) { lockpath := i.in.path + i.key value, _, err := i.in.client.Get(lockpath) - return i.held, string(value), err + return (value != nil), string(value), err } From 7c5892d26d52ea8e8c865a39b57fb056d173590d Mon Sep 17 00:00:00 2001 From: Ken Breeman Date: Tue, 26 May 2015 00:12:16 -0400 Subject: [PATCH 5/6] Cleaned up zookeeper_ha locking, added tests and cleanup. --- physical/physical_test.go | 2 ++ physical/zookeeper.go | 14 ++++++----- physical/zookeeper_test.go | 51 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 6 deletions(-) diff --git a/physical/physical_test.go b/physical/physical_test.go index da7e97b122..0fc805e84e 100644 --- a/physical/physical_test.go +++ b/physical/physical_test.go @@ -264,4 +264,6 @@ func testHABackend(t *testing.T, b HABackend, b2 HABackend) { if val != "baz" { t.Fatalf("bad value: %v", err) } + // Cleanup + lock2.Unlock() } diff --git a/physical/zookeeper.go b/physical/zookeeper.go index e99a69bbac..dfe7acfe68 100644 --- a/physical/zookeeper.go +++ b/physical/zookeeper.go @@ -255,7 +255,7 @@ func (i *ZookeeperHALock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) if i.value != string(currentVal) { return nil, fmt.Errorf("lost HA lock immediately before watch") } - go i.monitorLock(lockeventCh, i.leaderCh) + go i.monitorLock(lockeventCh) return i.leaderCh, nil } @@ -270,7 +270,8 @@ func (i *ZookeeperHALock) attemptLock(lockpath string, didLock chan struct{}, fa return } // Set node value - err2 := i.in.ensurePath(lockpath, []byte(i.value)) + data := []byte(i.value) + err2 := i.in.ensurePath(lockpath, data) if err2 != nil { failLock <- err2 lock.Unlock() @@ -288,21 +289,24 @@ func (i *ZookeeperHALock) attemptLock(lockpath string, didLock chan struct{}, fa } } -func (i *ZookeeperHALock) monitorLock(lockeventCh <-chan zk.Event, leaderCh chan struct{}) { +func (i *ZookeeperHALock) monitorLock(lockeventCh <-chan zk.Event) { for { select { case event := <- lockeventCh: // Lost connection? - if event.State != zk.StateConnected && event.State != zk.StateHasSession { + if event.State == zk.StateUnknown || event.State == zk.StateDisconnected || event.State == zk.StateConnecting || event.State == zk.StateAuthFailed || event.State == zk.StateConnectedReadOnly || event.State == zk.StateExpired { close(i.leaderCh) + return } // Lost watch if event.Type == zk.EventNotWatching { close(i.leaderCh) + return } // Lock changed if event.Type == zk.EventNodeCreated || event.Type == zk.EventNodeDeleted || event.Type == zk.EventNodeDataChanged { close(i.leaderCh) + return } } } @@ -315,8 +319,6 @@ func (i *ZookeeperHALock) Unlock() error { return nil } - close(i.leaderCh) - i.leaderCh = nil i.held = false i.zkLock.Unlock() return nil diff --git a/physical/zookeeper_test.go b/physical/zookeeper_test.go index 31fd7f3813..9c76277dd4 100644 --- a/physical/zookeeper_test.go +++ b/physical/zookeeper_test.go @@ -30,7 +30,11 @@ func TestZookeeperBackend(t *testing.T) { } defer func() { + client.Delete(randPath + "/foo/bar/baz", -1) + client.Delete(randPath + "/foo/bar", -1) + client.Delete(randPath + "/foo", -1) client.Delete(randPath, -1) + client.Close() }() b, err := NewBackend("zookeeper", map[string]string{ @@ -44,3 +48,50 @@ func TestZookeeperBackend(t *testing.T) { testBackend(t, b) testBackend_ListPrefix(t, b) } + +func TestZookeeperHABackend(t *testing.T) { + addr := os.Getenv("ZOOKEEPER_ADDR") + if addr == "" { + t.SkipNow() + } + + client, _, err := zk.Connect([]string{addr}, time.Second) + + if err != nil { + t.Fatalf("err: %v", err) + } + + randPath := fmt.Sprintf("/vault-ha-%d", time.Now().Unix()) + acl := zk.WorldACL(zk.PermAll) + _, err = client.Create(randPath, []byte("hi"), int32(0), acl) + + if err != nil { + t.Fatalf("err: %v", err) + } + + defer func() { + client.Delete(randPath + "/foo", -1) + client.Delete(randPath, -1) + client.Close() + }() + + b, err := NewBackend("zookeeper", map[string]string{ + "address": addr + "," + addr, + "path": randPath, + }) + if err != nil { + t.Fatalf("err: %s", err) + } + + ha, ok := b.(HABackend) + if !ok { + t.Fatalf("zookeeper does not implement HABackend") + } + testHABackend(t, ha, ha) + + err = client.Delete(randPath + "/foo", -1) + if err != nil { + t.Fatalf("err: failed to cleanup! %s", err) + } + +} From 794cca7f852654e60f08295d2611f4d7ee243d26 Mon Sep 17 00:00:00 2001 From: Ken Breeman Date: Thu, 28 May 2015 00:39:12 -0400 Subject: [PATCH 6/6] Cleaned up error handling and HA lock monitoring for zookeeper physical backend based on PR feedback. --- physical/zookeeper.go | 39 +++++++++++++++++++++------------------ 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/physical/zookeeper.go b/physical/zookeeper.go index dfe7acfe68..ad87935c94 100644 --- a/physical/zookeeper.go +++ b/physical/zookeeper.go @@ -248,14 +248,14 @@ func (i *ZookeeperHALock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) i.leaderCh = make(chan struct{}) // Watch for Events which could result in loss of our zkLock and close(i.leaderCh) - currentVal, _, lockeventCh, err3 := i.in.client.GetW(lockpath) - if err3 != nil { - return nil, fmt.Errorf("unable to watch HA lock") + currentVal, _, lockeventCh, err := i.in.client.GetW(lockpath) + if err != nil { + return nil, fmt.Errorf("unable to watch HA lock: %v", err) } if i.value != string(currentVal) { return nil, fmt.Errorf("lost HA lock immediately before watch") } - go i.monitorLock(lockeventCh) + go i.monitorLock(lockeventCh, i.leaderCh) return i.leaderCh, nil } @@ -271,9 +271,9 @@ func (i *ZookeeperHALock) attemptLock(lockpath string, didLock chan struct{}, fa } // Set node value data := []byte(i.value) - err2 := i.in.ensurePath(lockpath, data) - if err2 != nil { - failLock <- err2 + err = i.in.ensurePath(lockpath, data) + if err != nil { + failLock <- err lock.Unlock() return } @@ -289,23 +289,26 @@ func (i *ZookeeperHALock) attemptLock(lockpath string, didLock chan struct{}, fa } } -func (i *ZookeeperHALock) monitorLock(lockeventCh <-chan zk.Event) { +func (i *ZookeeperHALock) monitorLock(lockeventCh <-chan zk.Event, leaderCh chan struct{}) { for { select { case event := <- lockeventCh: // Lost connection? - if event.State == zk.StateUnknown || event.State == zk.StateDisconnected || event.State == zk.StateConnecting || event.State == zk.StateAuthFailed || event.State == zk.StateConnectedReadOnly || event.State == zk.StateExpired { - close(i.leaderCh) + switch event.State { + case zk.StateConnected: + case zk.StateSyncConnected: + case zk.StateHasSession: + default: + close(leaderCh) return } - // Lost watch - if event.Type == zk.EventNotWatching { - close(i.leaderCh) - return - } - // Lock changed - if event.Type == zk.EventNodeCreated || event.Type == zk.EventNodeDeleted || event.Type == zk.EventNodeDataChanged { - close(i.leaderCh) + + // Lost lock? + switch event.Type { + case zk.EventNodeChildrenChanged: + case zk.EventSession: + default: + close(leaderCh) return } }