From e4846c92b4ca63a55257ff2c2a4ef00246dd8bf0 Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Thu, 24 Feb 2022 14:35:08 -0800 Subject: [PATCH] Move temporary etcd startup into etcd module Reuse the existing etcd library code to start up the temporary etcd server for bootstrap reconcile. This allows us to do proper health-checking of the datastore on startup, including handling of alarms. Signed-off-by: Brad Davidson --- go.mod | 2 +- go.sum | 8 +- pkg/cli/etcdsnapshot/etcd_snapshot.go | 16 ++- pkg/cluster/bootstrap.go | 179 ++++++++------------------ pkg/cluster/storage.go | 2 +- pkg/daemons/executor/embed.go | 8 +- pkg/daemons/executor/etcd.go | 10 +- pkg/etcd/etcd.go | 125 +++++++++++++++--- pkg/etcd/etcd_test.go | 9 +- 9 files changed, 193 insertions(+), 166 deletions(-) diff --git a/go.mod b/go.mod index 230c2b03daf..0df3e67b420 100644 --- a/go.mod +++ b/go.mod @@ -93,7 +93,7 @@ require ( github.com/onsi/gomega v1.17.0 github.com/opencontainers/runc v1.0.3 github.com/opencontainers/selinux v1.8.3 - github.com/otiai10/copy v1.6.0 + github.com/otiai10/copy v1.7.0 github.com/pkg/errors v0.9.1 github.com/rancher/dynamiclistener v0.3.1 github.com/rancher/lasso v0.0.0-20210616224652-fc3ebd901c08 diff --git a/go.sum b/go.sum index a873b0f8197..eae96a972dd 100644 --- a/go.sum +++ b/go.sum @@ -922,13 +922,13 @@ github.com/opencontainers/selinux v1.8.3 h1:tzZR7AuKB5gU1+53uBkoG4XdIFGZzvJTOVoN github.com/opencontainers/selinux v1.8.3/go.mod h1:HTvjPFoGMbpQsG886e3lQwnsRWtE4TC1OF3OUvG9FAo= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/osrg/gobgp v0.0.0-20210801043420-9e48a36ed97c/go.mod h1:aNi0T2X6FSkl1evOifmJUsdxiQ1AQkiV7fIEtLIVv/U= -github.com/otiai10/copy v1.6.0 h1:IinKAryFFuPONZ7cm6T6E2QX/vcJwSnlaA5lfoaXIiQ= -github.com/otiai10/copy v1.6.0/go.mod h1:XWfuS3CrI0R6IE0FbgHsEazaXO8G0LpMp9o8tos0x4E= +github.com/otiai10/copy v1.7.0 h1:hVoPiN+t+7d2nzzwMiDHPSOogsWAStewq3TwU05+clE= +github.com/otiai10/copy v1.7.0/go.mod h1:rmRl6QPdJj6EiUqXQ/4Nn2lLXoNQjFCQbbNrxgc/t3U= github.com/otiai10/curr v0.0.0-20150429015615-9b4961190c95/go.mod h1:9qAhocn7zKJG+0mI8eUu6xqkFDYS2kb2saOteoSB3cE= github.com/otiai10/curr v1.0.0/go.mod h1:LskTG5wDwr8Rs+nNQ+1LlxRjAtTZZjtJW4rMXl6j4vs= github.com/otiai10/mint v1.3.0/go.mod h1:F5AjcsTsWUqX+Na9fpHb52P8pcRX2CI6A3ctIT91xUo= -github.com/otiai10/mint v1.3.2 h1:VYWnrP5fXmz1MXvjuUvcBrXSjGE6xjON+axB/UrpO3E= -github.com/otiai10/mint v1.3.2/go.mod h1:/yxELlJQ0ufhjUwhshSj+wFjZ78CnZ48/1wtmBH1OTc= +github.com/otiai10/mint v1.3.3 h1:7JgpsBaN0uMkyju4tbYHu0mnM55hNKVYLsXmwr15NQI= +github.com/otiai10/mint v1.3.3/go.mod h1:/yxELlJQ0ufhjUwhshSj+wFjZ78CnZ48/1wtmBH1OTc= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/paulmach/orb v0.1.3/go.mod h1:VFlX/8C+IQ1p6FTRRKzKoOPJnvEtA5G0Veuqwbu//Vk= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= diff --git a/pkg/cli/etcdsnapshot/etcd_snapshot.go b/pkg/cli/etcdsnapshot/etcd_snapshot.go index e138e8db6d4..b98941174b0 100644 --- a/pkg/cli/etcdsnapshot/etcd_snapshot.go +++ b/pkg/cli/etcdsnapshot/etcd_snapshot.go @@ -88,7 +88,9 @@ func run(app *cli.Context, cfg *cmds.Server) error { ctx := signals.SetupSignalContext() e := etcd.NewETCD() - e.SetControlConfig(&serverConfig.ControlConfig) + if err := e.SetControlConfig(ctx, &serverConfig.ControlConfig); err != nil { + return err + } initialized, err := e.IsInitialized(ctx, &serverConfig.ControlConfig) if err != nil { @@ -138,7 +140,9 @@ func delete(app *cli.Context, cfg *cmds.Server) error { ctx := signals.SetupSignalContext() e := etcd.NewETCD() - e.SetControlConfig(&serverConfig.ControlConfig) + if err := e.SetControlConfig(ctx, &serverConfig.ControlConfig); err != nil { + return err + } sc, err := server.NewContext(ctx, serverConfig.ControlConfig.Runtime.KubeConfigAdmin) if err != nil { @@ -179,7 +183,9 @@ func list(app *cli.Context, cfg *cmds.Server) error { ctx := signals.SetupSignalContext() e := etcd.NewETCD() - e.SetControlConfig(&serverConfig.ControlConfig) + if err := e.SetControlConfig(ctx, &serverConfig.ControlConfig); err != nil { + return err + } sf, err := e.ListSnapshots(ctx) if err != nil { @@ -246,7 +252,9 @@ func prune(app *cli.Context, cfg *cmds.Server) error { ctx := signals.SetupSignalContext() e := etcd.NewETCD() - e.SetControlConfig(&serverConfig.ControlConfig) + if err := e.SetControlConfig(ctx, &serverConfig.ControlConfig); err != nil { + return err + } sc, err := server.NewContext(ctx, serverConfig.ControlConfig.Runtime.KubeConfigAdmin) if err != nil { diff --git a/pkg/cluster/bootstrap.go b/pkg/cluster/bootstrap.go index 4314b242e69..b61a0c4454a 100644 --- a/pkg/cluster/bootstrap.go +++ b/pkg/cluster/bootstrap.go @@ -9,7 +9,6 @@ import ( "io/ioutil" "net" "os" - "path" "path/filepath" "reflect" "strconv" @@ -23,11 +22,9 @@ import ( "github.com/rancher/k3s/pkg/bootstrap" "github.com/rancher/k3s/pkg/clientaccess" "github.com/rancher/k3s/pkg/daemons/config" - "github.com/rancher/k3s/pkg/daemons/executor" "github.com/rancher/k3s/pkg/etcd" "github.com/rancher/k3s/pkg/version" "github.com/sirupsen/logrus" - "go.etcd.io/etcd/server/v3/embed" ) // Bootstrap attempts to load a managed database driver, if one has been initialized or should be created/joined. @@ -63,58 +60,8 @@ func (c *Cluster) Bootstrap(ctx context.Context, snapshot bool) error { // reading the data, and comparing that to the data on disk, all the while // starting normal etcd. if isInitialized { - logrus.Info("Starting local etcd to reconcile with datastore") - tmpDataDir := filepath.Join(c.config.DataDir, "db", "tmp-etcd") - os.RemoveAll(tmpDataDir) - if err := os.Mkdir(tmpDataDir, 0700); err != nil { - return err - } - etcdDataDir := etcd.DBDir(c.config) - if err := createTmpDataDir(etcdDataDir, tmpDataDir); err != nil { - return err - } - defer func() { - if err := os.RemoveAll(tmpDataDir); err != nil { - logrus.Warn("failed to remove etcd temp dir", err) - } - }() - - args := executor.ETCDConfig{ - DataDir: tmpDataDir, - ForceNewCluster: true, - ListenClientURLs: "http://127.0.0.1:2399", - Logger: "zap", - HeartbeatInterval: 500, - ElectionTimeout: 5000, - LogOutputs: []string{"stderr"}, - } - configFile, err := args.ToConfigFile(c.config.ExtraEtcdArgs) - if err != nil { - return err - } - cfg, err := embed.ConfigFromFile(configFile) - if err != nil { - return err - } - - etcd, err := embed.StartEtcd(cfg) - if err != nil { - return err - } - defer etcd.Close() - - data, err := c.retrieveInitializedDBdata(ctx) - if err != nil { - return err - } - - ec := endpoint.ETCDConfig{ - Endpoints: []string{"http://127.0.0.1:2399"}, - LeaderElect: false, - } - - if err := c.ReconcileBootstrapData(ctx, bytes.NewReader(data.Bytes()), &c.config.Runtime.ControlRuntimeBootstrap, false, &ec); err != nil { - logrus.Fatal(err) + if err := c.reconcileEtcd(ctx); err != nil { + logrus.Fatalf("Failed to reconcile with temporary etcd: %v", err) } } } @@ -127,69 +74,6 @@ func (c *Cluster) Bootstrap(ctx context.Context, snapshot bool) error { return nil } -// copyFile copies the contents of the src file -// to the given destination file. -func copyFile(src, dst string) error { - srcfd, err := os.Open(src) - if err != nil { - return err - } - defer srcfd.Close() - - dstfd, err := os.Create(dst) - if err != nil { - return err - } - defer dstfd.Close() - - if _, err = io.Copy(dstfd, srcfd); err != nil { - return err - } - - srcinfo, err := os.Stat(src) - if err != nil { - return err - } - - return os.Chmod(dst, srcinfo.Mode()) -} - -// createTmpDataDir creates a temporary directory and copies the -// contents of the original etcd data dir to be used -// by etcd when reading data. -func createTmpDataDir(src, dst string) error { - srcinfo, err := os.Stat(src) - if err != nil { - return err - } - - if err := os.MkdirAll(dst, srcinfo.Mode()); err != nil { - return err - } - - fds, err := ioutil.ReadDir(src) - if err != nil { - return err - } - - for _, fd := range fds { - srcfp := path.Join(src, fd.Name()) - dstfp := path.Join(dst, fd.Name()) - - if fd.IsDir() { - if err = createTmpDataDir(srcfp, dstfp); err != nil { - fmt.Println(err) - } - } else { - if err = copyFile(srcfp, dstfp); err != nil { - fmt.Println(err) - } - } - } - - return nil -} - // shouldBootstrapLoad returns true if we need to load ControlRuntimeBootstrap data again and a second boolean // indicating that the server has or has not been initialized, if etcd. This is controlled by a stamp file on // disk that records successful bootstrap using a hash of the join token. @@ -340,7 +224,7 @@ func isMigrated(buf io.ReadSeeker, files *bootstrap.PathsDataformat) bool { // and depending on where the difference is. If the datastore is newer, // then the data will be written to disk. If the data on disk is newer, // k3s will exit with an error. -func (c *Cluster) ReconcileBootstrapData(ctx context.Context, buf io.ReadSeeker, crb *config.ControlRuntimeBootstrap, isHTTP bool, ec *endpoint.ETCDConfig) error { +func (c *Cluster) ReconcileBootstrapData(ctx context.Context, buf io.ReadSeeker, crb *config.ControlRuntimeBootstrap, isHTTP bool) error { logrus.Info("Reconciling bootstrap data between datastore and disk") if err := c.certDirsExist(); err != nil { @@ -383,14 +267,7 @@ func (c *Cluster) ReconcileBootstrapData(ctx context.Context, buf io.ReadSeeker, var value *client.Value - var etcdConfig endpoint.ETCDConfig - if ec != nil { - etcdConfig = *ec - } else { - etcdConfig = c.config.Runtime.EtcdConfig - } - - storageClient, err := client.New(etcdConfig) + storageClient, err := client.New(c.config.Runtime.EtcdConfig) if err != nil { return err } @@ -596,7 +473,7 @@ func (c *Cluster) httpBootstrap(ctx context.Context) error { return err } - return c.ReconcileBootstrapData(ctx, bytes.NewReader(content), &c.config.Runtime.ControlRuntimeBootstrap, true, nil) + return c.ReconcileBootstrapData(ctx, bytes.NewReader(content), &c.config.Runtime.ControlRuntimeBootstrap, true) } func (c *Cluster) retrieveInitializedDBdata(ctx context.Context) (*bytes.Buffer, error) { @@ -671,3 +548,49 @@ func ipsTo16Bytes(mySlice []*net.IPNet) { ipNet.IP = ipNet.IP.To16() } } + +// reconcileEtcd starts a temporary single-member etcd cluster using a copy of the +// etcd database, and uses it to reconcile bootstrap data. This is necessary +// because the full etcd cluster may not have quorum during startup, but we still +// need to extract data from the datastore. +func (c *Cluster) reconcileEtcd(ctx context.Context) error { + logrus.Info("Starting temporary etcd to reconcile with datastore") + + tempConfig := endpoint.ETCDConfig{Endpoints: []string{"http://127.0.0.1:2399"}} + originalConfig := c.config.Runtime.EtcdConfig + c.config.Runtime.EtcdConfig = tempConfig + reconcileCtx, cancel := context.WithCancel(ctx) + + defer func() { + cancel() + c.config.Runtime.EtcdConfig = originalConfig + }() + + e := etcd.NewETCD() + if err := e.SetControlConfig(reconcileCtx, c.config); err != nil { + return err + } + e.StartEmbeddedTemporary(reconcileCtx) + + for { + if err := e.Test(reconcileCtx); err != nil { + logrus.Infof("Failed to test data store connection: %v", err) + } else { + logrus.Info(e.EndpointName() + " data store connection OK") + break + } + + select { + case <-time.After(5 * time.Second): + case <-reconcileCtx.Done(): + break + } + } + + data, err := c.retrieveInitializedDBdata(reconcileCtx) + if err != nil { + return err + } + + return c.ReconcileBootstrapData(reconcileCtx, bytes.NewReader(data.Bytes()), &c.config.Runtime.ControlRuntimeBootstrap, false) +} diff --git a/pkg/cluster/storage.go b/pkg/cluster/storage.go index f54ec741bb0..baecbc5507f 100644 --- a/pkg/cluster/storage.go +++ b/pkg/cluster/storage.go @@ -136,7 +136,7 @@ func (c *Cluster) storageBootstrap(ctx context.Context) error { return err } - return c.ReconcileBootstrapData(ctx, bytes.NewReader(data), &c.config.Runtime.ControlRuntimeBootstrap, false, nil) + return c.ReconcileBootstrapData(ctx, bytes.NewReader(data), &c.config.Runtime.ControlRuntimeBootstrap, false) } // getBootstrapKeyFromStorage will list all keys that has prefix /bootstrap and will check for key that is diff --git a/pkg/daemons/executor/embed.go b/pkg/daemons/executor/embed.go index da1aa16c284..8ae900836fc 100644 --- a/pkg/daemons/executor/embed.go +++ b/pkg/daemons/executor/embed.go @@ -41,10 +41,6 @@ func init() { executor = &Embedded{} } -type Embedded struct { - nodeConfig *daemonconfig.Node -} - func (e *Embedded) Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error { e.nodeConfig = nodeConfig return nil @@ -193,6 +189,10 @@ func (*Embedded) CloudControllerManager(ctx context.Context, ccmRBACReady <-chan return nil } +func (e *Embedded) CurrentETCDOptions() (InitialOptions, error) { + return InitialOptions{}, nil +} + // waitForUntaintedNode watches nodes, waiting to find one not tainted as // uninitialized by the external cloud provider. func waitForUntaintedNode(ctx context.Context, kubeConfig string) error { diff --git a/pkg/daemons/executor/etcd.go b/pkg/daemons/executor/etcd.go index ab3adfaa81d..c6b4910142f 100644 --- a/pkg/daemons/executor/etcd.go +++ b/pkg/daemons/executor/etcd.go @@ -1,6 +1,3 @@ -//go:build !no_embedded_executor -// +build !no_embedded_executor - package executor import ( @@ -9,17 +6,18 @@ import ( "io/ioutil" "path/filepath" + daemonconfig "github.com/rancher/k3s/pkg/daemons/config" "github.com/rancher/k3s/pkg/version" "github.com/sirupsen/logrus" "go.etcd.io/etcd/server/v3/embed" "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" ) -func (e Embedded) CurrentETCDOptions() (InitialOptions, error) { - return InitialOptions{}, nil +type Embedded struct { + nodeConfig *daemonconfig.Node } -func (e Embedded) ETCD(ctx context.Context, args ETCDConfig, extraArgs []string) error { +func (e *Embedded) ETCD(ctx context.Context, args ETCDConfig, extraArgs []string) error { configFile, err := args.ToConfigFile(extraArgs) if err != nil { return err diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index 94160347f7f..9eba38a8cdb 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -25,6 +25,7 @@ import ( "github.com/k3s-io/kine/pkg/client" endpoint2 "github.com/k3s-io/kine/pkg/endpoint" "github.com/minio/minio-go/v7" + cp "github.com/otiai10/copy" "github.com/pkg/errors" certutil "github.com/rancher/dynamiclistener/cert" "github.com/rancher/k3s/pkg/clientaccess" @@ -47,7 +48,7 @@ import ( ) const ( - endpoint = "https://127.0.0.1:2379" + defaultEndpoint = "https://127.0.0.1:2379" testTimeout = time.Second * 10 manageTickerTime = time.Second * 15 learnerMaxStallTime = time.Minute * 5 @@ -118,8 +119,22 @@ func (e *ETCD) EndpointName() string { } // SetControlConfig sets the given config on the etcd struct. -func (e *ETCD) SetControlConfig(config *config.Control) { +func (e *ETCD) SetControlConfig(ctx context.Context, config *config.Control) error { e.config = config + + client, err := GetClient(ctx, e.config.Runtime) + if err != nil { + return err + } + e.client = client + + address, err := GetAdvertiseAddress(config.PrivateIP) + if err != nil { + return err + } + e.address = address + + return e.setName(false) } // Test ensures that the local node is a voting member of the target cluster. @@ -129,7 +144,8 @@ func (e *ETCD) Test(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, testTimeout) defer cancel() - status, err := e.client.Status(ctx, endpoint) + endpoints := getEndpoints(e.config.Runtime) + status, err := e.client.Status(ctx, endpoints[0]) if err != nil { return err } @@ -318,6 +334,7 @@ func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) e if err != nil { return err } + logrus.Infof("Starting etcd for existing cluster member") return e.cluster(ctx, false, opt) } @@ -425,7 +442,7 @@ func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) er cluster = append(cluster, fmt.Sprintf("%s=%s", e.name, e.peerURL())) } - logrus.Infof("Starting etcd for cluster %v", cluster) + logrus.Infof("Starting etcd to join cluster with members %v", cluster) return e.cluster(ctx, false, executor.InitialOptions{ Cluster: strings.Join(cluster, ","), State: "existing", @@ -436,7 +453,7 @@ func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) er func (e *ETCD) Register(ctx context.Context, config *config.Control, handler http.Handler) (http.Handler, error) { e.config = config - client, err := GetClient(ctx, e.config.Runtime, endpoint) + client, err := GetClient(ctx, e.config.Runtime) if err != nil { return nil, err } @@ -447,7 +464,9 @@ func (e *ETCD) Register(ctx context.Context, config *config.Control, handler htt return nil, err } e.address = address - e.config.Datastore.Endpoint = endpoint + + endpoints := getEndpoints(config.Runtime) + e.config.Datastore.Endpoint = endpoints[0] e.config.Datastore.BackendTLSConfig.CAFile = e.config.Runtime.ETCDServerCA e.config.Datastore.BackendTLSConfig.CertFile = e.config.Runtime.ClientETCDCert e.config.Datastore.BackendTLSConfig.KeyFile = e.config.Runtime.ClientETCDKey @@ -530,7 +549,11 @@ func (e *ETCD) infoHandler() http.Handler { }) } -// GetClient returns an etcd client connected to the specified endpoints +// GetClient returns an etcd client connected to the specified endpoints. +// If no endpoints are provided, endpoints are retrieved from the provided runtime config. +// If the runtime config does not list any endpoints, the default endpoint is used. +// The returned client should be closed when no longer needed, in order to avoid leaking GRPC +// client goroutines. func GetClient(ctx context.Context, runtime *config.ControlRuntime, endpoints ...string) (*clientv3.Client, error) { cfg, err := getClientConfig(ctx, runtime, endpoints...) if err != nil { @@ -540,8 +563,12 @@ func GetClient(ctx context.Context, runtime *config.ControlRuntime, endpoints .. return clientv3.New(*cfg) } -// getClientConfig generates an etcd client config connected to the specified endpoints +// getClientConfig generates an etcd client config connected to the specified endpoints. +// If no endpoints are provided, getEndpoints is called to provide defaults. func getClientConfig(ctx context.Context, runtime *config.ControlRuntime, endpoints ...string) (*clientv3.Config, error) { + if len(endpoints) == 0 { + endpoints = getEndpoints(runtime) + } tlsConfig, err := toTLSConfig(runtime) if err != nil { return nil, err @@ -556,6 +583,14 @@ func getClientConfig(ctx context.Context, runtime *config.ControlRuntime, endpoi }, nil } +// getEndpoints returns the endpoints from the runtime config if set, otherwise the default endpoint. +func getEndpoints(runtime *config.ControlRuntime) []string { + if len(runtime.EtcdConfig.Endpoints) > 0 { + return runtime.EtcdConfig.Endpoints + } + return []string{defaultEndpoint} +} + // toTLSConfig converts the ControlRuntime configuration to TLS configuration suitable // for use by etcd. func toTLSConfig(runtime *config.ControlRuntime) (*tls.Config, error) { @@ -595,6 +630,7 @@ func GetAdvertiseAddress(advertiseIP string) (string, error) { // newCluster returns options to set up etcd for a new cluster func (e *ETCD) newCluster(ctx context.Context, reset bool) error { + logrus.Infof("Starting etcd for new cluster") err := e.cluster(ctx, reset, executor.InitialOptions{ AdvertisePeerURL: e.peerURL(), Cluster: fmt.Sprintf("%s=%s", e.name, e.peerURL()), @@ -637,7 +673,7 @@ func (e *ETCD) migrateFromSQLite(ctx context.Context) error { } defer sqliteClient.Close() - etcdClient, err := GetClient(ctx, e.config.Runtime, "https://localhost:2379") + etcdClient, err := GetClient(ctx, e.config.Runtime) if err != nil { return err } @@ -708,6 +744,61 @@ func (e *ETCD) cluster(ctx context.Context, forceNew bool, options executor.Init }, e.config.ExtraEtcdArgs) } +func (e *ETCD) StartEmbeddedTemporary(ctx context.Context) error { + etcdDataDir := DBDir(e.config) + tmpDataDir := etcdDataDir + "-tmp" + + os.RemoveAll(tmpDataDir) + if err := os.Mkdir(tmpDataDir, 0700); err != nil { + return err + } + + defer func() { + if err := os.RemoveAll(tmpDataDir); err != nil { + logrus.Warnf("Failed to remove etcd temp dir: %v", err) + } + }() + + if err := cp.Copy(etcdDataDir, tmpDataDir, cp.Options{PreserveOwner: true}); err != nil { + return err + } + + endpoints := getEndpoints(e.config.Runtime) + clientURL := endpoints[0] + peerURL, err := addPort(endpoints[0], 1) + if err != nil { + return err + } + + embedded := executor.Embedded{} + return embedded.ETCD(ctx, executor.ETCDConfig{ + InitialOptions: executor.InitialOptions{AdvertisePeerURL: peerURL}, + DataDir: tmpDataDir, + ForceNewCluster: true, + AdvertiseClientURLs: clientURL, + ListenClientURLs: clientURL, + ListenPeerURLs: peerURL, + Logger: "zap", + HeartbeatInterval: 500, + ElectionTimeout: 5000, + Name: e.name, + LogOutputs: []string{"stderr"}, + }, nil) +} + +func addPort(address string, offset int) (string, error) { + u, err := url.Parse(address) + if err != nil { + return "", err + } + port, err := strconv.Atoi(u.Port()) + if err != nil { + return "", err + } + port += offset + return fmt.Sprintf("%s://%s:%d", u.Scheme, u.Hostname(), port), nil +} + // RemovePeer removes a peer from the cluster. The peer name and IP address must both match. func (e *ETCD) RemovePeer(ctx context.Context, name, address string, allowSelfRemoval bool) error { ctx, cancel := context.WithTimeout(ctx, memberRemovalTimeout) @@ -760,7 +851,8 @@ func (e *ETCD) manageLearners(ctx context.Context) { logrus.Debug("Etcd client was nil") continue } - if status, err := e.client.Status(ctx, endpoint); err != nil { + endpoints := getEndpoints(e.config.Runtime) + if status, err := e.client.Status(ctx, endpoints[0]); err != nil { logrus.Errorf("Failed to check local etcd status for learner management: %v", err) continue } else if status.Header.MemberId != status.Leader { @@ -913,7 +1005,8 @@ func (e *ETCD) clearAlarms(ctx context.Context) error { return nil } -// clientURLs returns a list of all non-learner etcd cluster member client access URLs +// clientURLs returns a list of all non-learner etcd cluster member client access URLs. +// The list is retrieved from the remote server that is being joined. func ClientURLs(ctx context.Context, clientAccessInfo *clientaccess.Info, selfIP string) ([]string, Members, error) { var memberList Members resp, err := clientAccessInfo.Get("/db/info") @@ -980,7 +1073,7 @@ func (e *ETCD) preSnapshotSetup(ctx context.Context, config *config.Control) err if e.config == nil { e.config = config } - client, err := GetClient(ctx, e.config.Runtime, endpoint) + client, err := GetClient(ctx, e.config.Runtime) if err != nil { return err } @@ -1100,7 +1193,8 @@ func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error { } } - status, err := e.client.Status(ctx, endpoint) + endpoints := getEndpoints(e.config.Runtime) + status, err := e.client.Status(ctx, endpoints[0]) if err != nil { return errors.Wrap(err, "failed to check etcd status for snapshot") } @@ -1115,7 +1209,7 @@ func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error { return errors.Wrap(err, "failed to get the snapshot dir") } - cfg, err := getClientConfig(ctx, e.config.Runtime, endpoint) + cfg, err := getClientConfig(ctx, e.config.Runtime) if err != nil { return errors.Wrap(err, "failed to get config for etcd snapshot") } @@ -1838,8 +1932,9 @@ func backupDirWithRetention(dir string, maxBackupRetention int) (string, error) } // GetAPIServerURLsFromETCD will try to fetch the version.Program/apiaddresses key from etcd +// and unmarshal it to a list of apiserver endpoints. func GetAPIServerURLsFromETCD(ctx context.Context, cfg *config.Control) ([]string, error) { - cl, err := GetClient(ctx, cfg.Runtime, endpoint) + cl, err := GetClient(ctx, cfg.Runtime) if err != nil { return nil, err } diff --git a/pkg/etcd/etcd_test.go b/pkg/etcd/etcd_test.go index 51b87b8d2b4..e8beabaa2fa 100644 --- a/pkg/etcd/etcd_test.go +++ b/pkg/etcd/etcd_test.go @@ -244,7 +244,7 @@ func Test_UnitETCD_Start(t *testing.T) { ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) e.config.EtcdDisableSnapshots = true testutil.GenerateRuntime(e.config) - client, err := GetClient(ctxInfo.ctx, e.config.Runtime, endpoint) + client, err := GetClient(ctxInfo.ctx, e.config.Runtime) e.client = client return err @@ -254,6 +254,7 @@ func Test_UnitETCD_Start(t *testing.T) { if err := e.RemoveSelf(ctxInfo.ctx); err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { return err } + e.client.Close() ctxInfo.cancel() time.Sleep(10 * time.Second) testutil.CleanupDataDir(e.config) @@ -274,7 +275,7 @@ func Test_UnitETCD_Start(t *testing.T) { setup: func(e *ETCD, ctxInfo *contextInfo) error { ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) testutil.GenerateRuntime(e.config) - client, err := GetClient(ctxInfo.ctx, e.config.Runtime, endpoint) + client, err := GetClient(ctxInfo.ctx, e.config.Runtime) e.client = client return err @@ -284,6 +285,7 @@ func Test_UnitETCD_Start(t *testing.T) { if err := e.RemoveSelf(ctxInfo.ctx); err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { return err } + e.client.Close() ctxInfo.cancel() time.Sleep(5 * time.Second) testutil.CleanupDataDir(e.config) @@ -306,7 +308,7 @@ func Test_UnitETCD_Start(t *testing.T) { if err := testutil.GenerateRuntime(e.config); err != nil { return err } - client, err := GetClient(ctxInfo.ctx, e.config.Runtime, endpoint) + client, err := GetClient(ctxInfo.ctx, e.config.Runtime) if err != nil { return err } @@ -318,6 +320,7 @@ func Test_UnitETCD_Start(t *testing.T) { if err := e.RemoveSelf(ctxInfo.ctx); err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { return err } + e.client.Close() ctxInfo.cancel() time.Sleep(5 * time.Second) testutil.CleanupDataDir(e.config)