diff --git a/pkg/apiaddresses/controller.go b/pkg/apiaddresses/controller.go deleted file mode 100644 index b0ceefcdbc8..00000000000 --- a/pkg/apiaddresses/controller.go +++ /dev/null @@ -1,64 +0,0 @@ -package apiaddresses - -import ( - "bytes" - "context" - "encoding/json" - - "github.com/k3s-io/k3s/pkg/daemons/config" - "github.com/k3s-io/k3s/pkg/etcd" - "github.com/k3s-io/k3s/pkg/util" - "github.com/k3s-io/k3s/pkg/version" - controllerv1 "github.com/rancher/wrangler/pkg/generated/controllers/core/v1" - clientv3 "go.etcd.io/etcd/client/v3" - v1 "k8s.io/api/core/v1" -) - -type EndpointsControllerGetter func() controllerv1.EndpointsController - -func Register(ctx context.Context, runtime *config.ControlRuntime, endpoints controllerv1.EndpointsController) error { - h := &handler{ - endpointsController: endpoints, - runtime: runtime, - ctx: ctx, - } - endpoints.OnChange(ctx, version.Program+"-apiserver-lb-controller", h.sync) - - cl, err := etcd.GetClient(h.ctx, h.runtime) - if err != nil { - return err - } - h.etcdClient = cl - - go func() { - <-ctx.Done() - h.etcdClient.Close() - }() - - return nil -} - -type handler struct { - endpointsController controllerv1.EndpointsController - runtime *config.ControlRuntime - ctx context.Context - etcdClient *clientv3.Client -} - -// This controller will update the version.program/apiaddresses etcd key with a list of -// api addresses endpoints found in the kubernetes service in the default namespace -func (h *handler) sync(key string, endpoint *v1.Endpoints) (*v1.Endpoints, error) { - if endpoint != nil && - endpoint.Namespace == "default" && - endpoint.Name == "kubernetes" { - w := &bytes.Buffer{} - if err := json.NewEncoder(w).Encode(util.GetAddresses(endpoint)); err != nil { - return nil, err - } - _, err := h.etcdClient.Put(h.ctx, etcd.AddressKey, w.String()) - if err != nil { - return nil, err - } - } - return endpoint, nil -} diff --git a/pkg/etcd/apiaddresses_controller.go b/pkg/etcd/apiaddresses_controller.go new file mode 100644 index 00000000000..af181a81713 --- /dev/null +++ b/pkg/etcd/apiaddresses_controller.go @@ -0,0 +1,72 @@ +package etcd + +import ( + "bytes" + "context" + "encoding/json" + + "github.com/k3s-io/k3s/pkg/util" + "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/watch" +) + +func registerEndpointsHandlers(ctx context.Context, etcd *ETCD) error { + if etcd.config.DisableAPIServer { + return nil + } + + endpoints := etcd.config.Runtime.Core.Core().V1().Endpoints() + watch, err := endpoints.Watch(metav1.NamespaceDefault, metav1.ListOptions{ + FieldSelector: fields.Set{"metadata.name": "kubernetes"}.String(), + ResourceVersion: "0", + }) + if err != nil { + return err + } + + h := &handler{ + etcd: etcd, + watch: watch, + } + + logrus.Infof("Starting managed etcd apiserver addresses controller") + go h.watchEndpoints(ctx) + + return nil +} + +type handler struct { + etcd *ETCD + watch watch.Interface +} + +// This controller will update the version.program/apiaddresses etcd key with a list of +// api addresses endpoints found in the kubernetes service in the default namespace +func (h *handler) watchEndpoints(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case ev, ok := <-h.watch.ResultChan(): + endpoint, ok := ev.Object.(*v1.Endpoints) + if !ok { + logrus.Errorf("Failed to watch apiserver addresses: could not convert event object to endpoint: %v", ev) + continue + } + + w := &bytes.Buffer{} + if err := json.NewEncoder(w).Encode(util.GetAddresses(endpoint)); err != nil { + logrus.Warnf("Failed to encode apiserver addresses: %v", err) + continue + } + + _, err := h.etcd.client.Put(ctx, AddressKey, w.String()) + if err != nil { + logrus.Warnf("Failed to store apiserver addresses in etcd: %v", err) + } + } + } +} diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index b967011b098..587801d8a7a 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -535,13 +535,15 @@ func (e *ETCD) Register(ctx context.Context, config *config.Control, handler htt if err := e.setName(false); err != nil { return nil, err } + e.config.Runtime.ClusterControllerStart = func(ctx context.Context) error { - RegisterMetadataHandlers(ctx, e, e.config.Runtime.Core.Core().V1().Node()) + registerMetadataHandlers(ctx, e) return nil } e.config.Runtime.LeaderElectedClusterControllerStart = func(ctx context.Context) error { - RegisterMemberHandlers(ctx, e, e.config.Runtime.Core.Core().V1().Node()) + registerMemberHandlers(ctx, e) + registerEndpointsHandlers(ctx, e) return nil } diff --git a/pkg/etcd/member_controller.go b/pkg/etcd/member_controller.go index b2aa3a5c0b3..c26df5c1dcf 100644 --- a/pkg/etcd/member_controller.go +++ b/pkg/etcd/member_controller.go @@ -11,12 +11,19 @@ import ( v1 "k8s.io/api/core/v1" ) -func RegisterMemberHandlers(ctx context.Context, etcd *ETCD, nodes controllerv1.NodeController) { +func registerMemberHandlers(ctx context.Context, etcd *ETCD) { + if etcd.config.DisableETCD { + return + } + + nodes := etcd.config.Runtime.Core.Core().V1().Node() e := &etcdMemberHandler{ etcd: etcd, nodeController: nodes, ctx: ctx, } + + logrus.Infof("Starting managed etcd member removal controller") nodes.OnChange(ctx, "managed-etcd-controller", e.sync) nodes.OnRemove(ctx, "managed-etcd-controller", e.onRemove) } @@ -50,7 +57,7 @@ func (e *etcdMemberHandler) sync(key string, node *v1.Node) (*v1.Node, error) { if currentNodeName, ok := node.Annotations[NodeNameAnnotation]; ok { if currentNodeName != removed { // If the current node name is not the same as the removed node name, reset the tainted annotation and removed node name - logrus.Infof("Resetting removed node flag as removed node name ( did not match current node name") + logrus.Infof("Resetting removed node flag as removed node name (did not match current node name") delete(node.Annotations, removedNodeNameAnnotation) node.Annotations[removalAnnotation] = "false" return e.nodeController.Update(node) diff --git a/pkg/etcd/metadata_controller.go b/pkg/etcd/metadata_controller.go index 39b4bf3fd7b..915f5814da0 100644 --- a/pkg/etcd/metadata_controller.go +++ b/pkg/etcd/metadata_controller.go @@ -10,12 +10,19 @@ import ( v1 "k8s.io/api/core/v1" ) -func RegisterMetadataHandlers(ctx context.Context, etcd *ETCD, nodes controllerv1.NodeController) { +func registerMetadataHandlers(ctx context.Context, etcd *ETCD) { + if etcd.config.DisableETCD { + return + } + + nodes := etcd.config.Runtime.Core.Core().V1().Node() h := &metadataHandler{ etcd: etcd, nodeController: nodes, ctx: ctx, } + + logrus.Infof("Starting managed etcd node label controller") nodes.OnChange(ctx, "managed-etcd-metadata-controller", h.sync) } @@ -32,7 +39,7 @@ func (m *metadataHandler) sync(key string, node *v1.Node) (*v1.Node, error) { nodeName := os.Getenv("NODE_NAME") if nodeName == "" { - logrus.Debug("waiting for node to be assigned for etcd controller") + logrus.Debug("waiting for node name to be assigned for managed etcd node label controller") m.nodeController.EnqueueAfter(key, 5*time.Second) return node, nil } @@ -48,8 +55,7 @@ func (m *metadataHandler) handleSelf(node *v1.Node) (*v1.Node, error) { if node.Annotations[NodeNameAnnotation] == m.etcd.name && node.Annotations[NodeAddressAnnotation] == m.etcd.address && node.Labels[EtcdRoleLabel] == "true" && - node.Labels[ControlPlaneLabel] == "true" || - m.etcd.config.DisableETCD { + node.Labels[ControlPlaneLabel] == "true" { return node, nil } diff --git a/pkg/server/server.go b/pkg/server/server.go index 56152417334..cc2b4f1eed5 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -14,7 +14,6 @@ import ( "time" "github.com/k3s-io/helm-controller/pkg/helm" - "github.com/k3s-io/k3s/pkg/apiaddresses" "github.com/k3s-io/k3s/pkg/cli/cmds" "github.com/k3s-io/k3s/pkg/clientaccess" "github.com/k3s-io/k3s/pkg/daemons/config" @@ -236,10 +235,6 @@ func coreControllers(ctx context.Context, sc *Context, config *Config) error { return err } - if err := apiaddresses.Register(ctx, config.ControlConfig.Runtime, sc.Core.Core().V1().Endpoints()); err != nil { - return err - } - if config.ControlConfig.EncryptSecrets { if err := secretsencrypt.Register(ctx, sc.K8s,