Collapse UpdateAdvertiseAddr() into RunServiceDiscovery()

This commit is contained in:
Sean Chittenden 2016-04-25 13:46:28 -07:00
parent 85ca7b32ca
commit 9647f2e067
4 changed files with 56 additions and 52 deletions

View file

@ -290,12 +290,7 @@ func (c *ServerCommand) Run(args []string) int {
if coreConfig.HAPhysical != nil {
sd, ok := coreConfig.HAPhysical.(physical.ServiceDiscovery)
if ok {
if err := sd.UpdateAdvertiseAddr(coreConfig.AdvertiseAddr); err != nil {
c.Ui.Error(fmt.Sprintf("Error configuring service discovery: %v", err))
return 1
}
if err := sd.RunServiceDiscovery(c.ShutdownCh); err != nil {
if err := sd.RunServiceDiscovery(c.ShutdownCh, coreConfig.AdvertiseAddr); err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing service discovery: %v", err))
return 1
}

View file

@ -173,23 +173,6 @@ func newConsulBackend(conf map[string]string) (Backend, error) {
return c, nil
}
// UpdateAdvertiseAddr provides a pre-initialization hook for updating
// Consul's advertise address.
func (c *ConsulBackend) UpdateAdvertiseAddr(addr string) error {
if c.running {
return fmt.Errorf("service registration unable to update advertise address, backend already running")
}
host, port, err := parseAdvertiseAddr(addr)
if err != nil {
return errwrap.Wrapf(fmt.Sprintf(`failed to parse advertise address "%v": {{err}}`, addr), err)
}
c.advertiseHost = host
c.advertisePort = int(port)
return nil
}
// serviceTags returns all of the relevant tags for Consul.
func serviceTags(active bool) []string {
activeTag := "standby"
@ -218,13 +201,13 @@ func (c *ConsulBackend) AdvertiseActive(active bool) error {
defer atomic.CompareAndSwapInt64(&c.registrationLock, 1, 0)
// Retry agent registration until successful
registration_complete:
for {
c.service.Tags = serviceTags(c.active)
agent := c.client.Agent()
err := agent.ServiceRegister(c.service)
if err == nil {
break registration_complete
// Success
return nil
}
// wtb logger c.logger.Printf("[WARN] service registration failed: %v", err)
@ -233,11 +216,13 @@ func (c *ConsulBackend) AdvertiseActive(active bool) error {
c.serviceLock.Lock()
if !c.running {
// Shutting down
return err
}
}
}
// Successful concurrent update to active state
return nil
}
@ -259,7 +244,15 @@ func (c *ConsulBackend) AdvertiseSealed(sealed bool) error {
return nil
}
func (c *ConsulBackend) RunServiceDiscovery(shutdownCh ShutdownChannel) (err error) {
func (c *ConsulBackend) setAdvertiseAddr(addr string) (err error) {
c.advertiseHost, c.advertisePort, err = c.parseAdvertiseAddr(addr)
if err != nil {
return err
}
return nil
}
func (c *ConsulBackend) RunServiceDiscovery(shutdownCh ShutdownChannel, advertiseAddr string) (err error) {
c.serviceLock.Lock()
defer c.serviceLock.Unlock()
@ -267,8 +260,8 @@ func (c *ConsulBackend) RunServiceDiscovery(shutdownCh ShutdownChannel) (err err
return nil
}
if c.running {
return fmt.Errorf("service registration routine already running")
if err := c.setAdvertiseAddr(advertiseAddr); err != nil {
return err
}
serviceID := c.serviceID()
@ -376,7 +369,7 @@ func (c *ConsulBackend) serviceID() string {
return fmt.Sprintf("%s:%s:%d", c.serviceName, c.advertiseHost, c.advertisePort)
}
func parseAdvertiseAddr(addr string) (host string, port int, err error) {
func (c *ConsulBackend) parseAdvertiseAddr(addr string) (host string, port int, err error) {
if addr == "" {
return "", -1, fmt.Errorf("advertise address must not be empty")
}
@ -389,10 +382,19 @@ func parseAdvertiseAddr(addr string) (host string, port int, err error) {
var portStr string
host, portStr, err = net.SplitHostPort(url.Host)
if err != nil {
return "", -3, errwrap.Wrapf(fmt.Sprintf(`failed to find a host:port in advertise address "%v": {{err}}`, url.Host), err)
if url.Scheme == "http" {
portStr = "80"
} else if url.Scheme == "https" {
portStr = "443"
} else if url.Scheme == "unix" {
portStr = "0"
host = url.Path
} else {
return "", -3, errwrap.Wrapf(fmt.Sprintf(`failed to find a host:port in advertise address "%v": {{err}}`, url.Host), err)
}
}
portNum, err := strconv.ParseInt(portStr, 10, 0)
if err != nil || portNum < 1 || portNum > 65535 {
if err != nil || portNum < 0 || portNum > 65535 {
return "", -4, errwrap.Wrapf(fmt.Sprintf(`failed to parse valid port "%v": {{err}}`, portStr), err)
}

View file

@ -27,7 +27,6 @@ func testConsulBackend(t *testing.T) *ConsulBackend {
}
func testConsulBackendConfig(t *testing.T, conf *consulConf) *ConsulBackend {
const serviceID = "vaultTestService"
be, err := newConsulBackend(*conf)
if err != nil {
t.Fatalf("Expected Consul to initialize: %v", err)
@ -38,8 +37,10 @@ func testConsulBackendConfig(t *testing.T, conf *consulConf) *ConsulBackend {
t.Fatalf("Expected ConsulBackend")
}
c.consulClientConf = api.DefaultConfig()
c.service = &api.AgentServiceRegistration{
ID: serviceID,
ID: c.serviceID(),
Name: c.serviceName,
Tags: serviceTags(c.active),
Port: 8200,
@ -51,7 +52,7 @@ func testConsulBackendConfig(t *testing.T, conf *consulConf) *ConsulBackend {
ID: c.checkID(),
Name: "Vault Sealed Status",
Notes: "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server",
ServiceID: serviceID,
ServiceID: c.serviceID(),
AgentServiceCheck: api.AgentServiceCheck{
TTL: c.checkTimeout.String(),
Status: api.HealthPassing,
@ -159,7 +160,10 @@ func TestConsul_newConsulBackend(t *testing.T) {
if !ok {
t.Fatalf("Expected ConsulBackend: %s", test.name)
}
if err := c.UpdateAdvertiseAddr(test.advertiseAddr); err != nil {
c.disableRegistration = true
var shutdownCh ShutdownChannel
if err := c.RunServiceDiscovery(shutdownCh, test.advertiseAddr); err != nil {
t.Fatalf("bad: %v", err)
}
@ -217,7 +221,7 @@ func TestConsul_serviceTags(t *testing.T) {
}
}
func TestConsul_UpdateAdvertiseAddr(t *testing.T) {
func TestConsul_parseAdvertiseAddr(t *testing.T) {
tests := []struct {
addr string
host string
@ -236,6 +240,18 @@ func TestConsul_UpdateAdvertiseAddr(t *testing.T) {
port: 8200,
pass: true,
},
{
addr: "https://127.0.0.1:8200",
host: "127.0.0.1",
port: 8200,
pass: true,
},
{
addr: "unix:///tmp/.vault.addr.sock",
host: "/tmp/.vault.addr.sock",
port: 0,
pass: true,
},
{
addr: "127.0.0.1:8200",
pass: false,
@ -247,11 +263,7 @@ func TestConsul_UpdateAdvertiseAddr(t *testing.T) {
}
for _, test := range tests {
c := testConsulBackend(t)
if c == nil {
t.Fatalf("bad")
}
err := c.UpdateAdvertiseAddr(test.addr)
host, port, err := c.parseAdvertiseAddr(test.addr)
if test.pass {
if err != nil {
t.Fatalf("bad: %v", err)
@ -264,12 +276,12 @@ func TestConsul_UpdateAdvertiseAddr(t *testing.T) {
}
}
if c.advertiseHost != test.host {
t.Fatalf("bad: %v != %v", c.advertiseHost, test.host)
if host != test.host {
t.Fatalf("bad: %v != %v", host, test.host)
}
if c.advertisePort != test.port {
t.Fatalf("bad: %v != %v", c.advertisePort, test.port)
if port != test.port {
t.Fatalf("bad: %v != %v", port, test.port)
}
}
}
@ -394,7 +406,7 @@ func TestConsul_serviceID(t *testing.T) {
"service": test.serviceName,
})
if err := c.UpdateAdvertiseAddr(test.advertiseAddr); err != nil {
if err := c.setAdvertiseAddr(test.advertiseAddr); err != nil {
t.Fatalf("bad: %s %v", test.name, err)
}

View file

@ -59,12 +59,7 @@ type ServiceDiscovery interface {
// Run executes any background service discovery tasks until the
// shutdown channel is closed.
RunServiceDiscovery(ShutdownChannel) error
// UpdateAdvertiseAddr allows for a non-Running backend to update the
// advertise address. HABackends may want to present a different
// address that wasn't available when a Backend was created.
UpdateAdvertiseAddr(addr string) error
RunServiceDiscovery(shutdownCh ShutdownChannel, advertiseAddr string) error
}
type Lock interface {