From f37e7565b87d64c3bc3260e0ee11d826798eb63c Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Tue, 5 Apr 2022 17:11:24 -0700 Subject: [PATCH] Move the apiserver addresses controller into the etcd package This controller only needs to run when using managed etcd, so move it in with the rest of the etcd stuff. This change also modifies the controller to only watch the Kubernetes service endpoint, instead of watching all endpoints in the entire cluster. Fixes an error message revealed by use of a newer grpc client in Kubernetes 1.24, which logs an error when the Put to etcd failed because kine doesn't support the etcd Put operation. The controller shouldn't have been running without etcd in the first place. Signed-off-by: Brad Davidson --- pkg/apiaddresses/controller.go | 64 ------------------------- pkg/etcd/apiaddresses_controller.go | 72 +++++++++++++++++++++++++++++ pkg/etcd/etcd.go | 6 ++- pkg/etcd/member_controller.go | 11 ++++- pkg/etcd/metadata_controller.go | 14 ++++-- pkg/server/server.go | 5 -- 6 files changed, 95 insertions(+), 77 deletions(-) delete mode 100644 pkg/apiaddresses/controller.go create mode 100644 pkg/etcd/apiaddresses_controller.go diff --git a/pkg/apiaddresses/controller.go b/pkg/apiaddresses/controller.go deleted file mode 100644 index b0ceefcdbc8..00000000000 --- a/pkg/apiaddresses/controller.go +++ /dev/null @@ -1,64 +0,0 @@ -package apiaddresses - -import ( - "bytes" - "context" - "encoding/json" - - "github.com/k3s-io/k3s/pkg/daemons/config" - "github.com/k3s-io/k3s/pkg/etcd" - "github.com/k3s-io/k3s/pkg/util" - "github.com/k3s-io/k3s/pkg/version" - controllerv1 "github.com/rancher/wrangler/pkg/generated/controllers/core/v1" - clientv3 "go.etcd.io/etcd/client/v3" - v1 "k8s.io/api/core/v1" -) - -type EndpointsControllerGetter func() controllerv1.EndpointsController - -func Register(ctx context.Context, runtime *config.ControlRuntime, endpoints controllerv1.EndpointsController) error { - h := &handler{ - endpointsController: endpoints, - runtime: runtime, - ctx: ctx, - } - endpoints.OnChange(ctx, version.Program+"-apiserver-lb-controller", h.sync) - - cl, err := etcd.GetClient(h.ctx, h.runtime) - if err != nil { - return err - } - h.etcdClient = cl - - go func() { - <-ctx.Done() - h.etcdClient.Close() - }() - - return nil -} - -type handler struct { - endpointsController controllerv1.EndpointsController - runtime *config.ControlRuntime - ctx context.Context - etcdClient *clientv3.Client -} - -// This controller will update the version.program/apiaddresses etcd key with a list of -// api addresses endpoints found in the kubernetes service in the default namespace -func (h *handler) sync(key string, endpoint *v1.Endpoints) (*v1.Endpoints, error) { - if endpoint != nil && - endpoint.Namespace == "default" && - endpoint.Name == "kubernetes" { - w := &bytes.Buffer{} - if err := json.NewEncoder(w).Encode(util.GetAddresses(endpoint)); err != nil { - return nil, err - } - _, err := h.etcdClient.Put(h.ctx, etcd.AddressKey, w.String()) - if err != nil { - return nil, err - } - } - return endpoint, nil -} diff --git a/pkg/etcd/apiaddresses_controller.go b/pkg/etcd/apiaddresses_controller.go new file mode 100644 index 00000000000..af181a81713 --- /dev/null +++ b/pkg/etcd/apiaddresses_controller.go @@ -0,0 +1,72 @@ +package etcd + +import ( + "bytes" + "context" + "encoding/json" + + "github.com/k3s-io/k3s/pkg/util" + "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/watch" +) + +func registerEndpointsHandlers(ctx context.Context, etcd *ETCD) error { + if etcd.config.DisableAPIServer { + return nil + } + + endpoints := etcd.config.Runtime.Core.Core().V1().Endpoints() + watch, err := endpoints.Watch(metav1.NamespaceDefault, metav1.ListOptions{ + FieldSelector: fields.Set{"metadata.name": "kubernetes"}.String(), + ResourceVersion: "0", + }) + if err != nil { + return err + } + + h := &handler{ + etcd: etcd, + watch: watch, + } + + logrus.Infof("Starting managed etcd apiserver addresses controller") + go h.watchEndpoints(ctx) + + return nil +} + +type handler struct { + etcd *ETCD + watch watch.Interface +} + +// This controller will update the version.program/apiaddresses etcd key with a list of +// api addresses endpoints found in the kubernetes service in the default namespace +func (h *handler) watchEndpoints(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case ev, ok := <-h.watch.ResultChan(): + endpoint, ok := ev.Object.(*v1.Endpoints) + if !ok { + logrus.Errorf("Failed to watch apiserver addresses: could not convert event object to endpoint: %v", ev) + continue + } + + w := &bytes.Buffer{} + if err := json.NewEncoder(w).Encode(util.GetAddresses(endpoint)); err != nil { + logrus.Warnf("Failed to encode apiserver addresses: %v", err) + continue + } + + _, err := h.etcd.client.Put(ctx, AddressKey, w.String()) + if err != nil { + logrus.Warnf("Failed to store apiserver addresses in etcd: %v", err) + } + } + } +} diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index b967011b098..587801d8a7a 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -535,13 +535,15 @@ func (e *ETCD) Register(ctx context.Context, config *config.Control, handler htt if err := e.setName(false); err != nil { return nil, err } + e.config.Runtime.ClusterControllerStart = func(ctx context.Context) error { - RegisterMetadataHandlers(ctx, e, e.config.Runtime.Core.Core().V1().Node()) + registerMetadataHandlers(ctx, e) return nil } e.config.Runtime.LeaderElectedClusterControllerStart = func(ctx context.Context) error { - RegisterMemberHandlers(ctx, e, e.config.Runtime.Core.Core().V1().Node()) + registerMemberHandlers(ctx, e) + registerEndpointsHandlers(ctx, e) return nil } diff --git a/pkg/etcd/member_controller.go b/pkg/etcd/member_controller.go index b2aa3a5c0b3..c26df5c1dcf 100644 --- a/pkg/etcd/member_controller.go +++ b/pkg/etcd/member_controller.go @@ -11,12 +11,19 @@ import ( v1 "k8s.io/api/core/v1" ) -func RegisterMemberHandlers(ctx context.Context, etcd *ETCD, nodes controllerv1.NodeController) { +func registerMemberHandlers(ctx context.Context, etcd *ETCD) { + if etcd.config.DisableETCD { + return + } + + nodes := etcd.config.Runtime.Core.Core().V1().Node() e := &etcdMemberHandler{ etcd: etcd, nodeController: nodes, ctx: ctx, } + + logrus.Infof("Starting managed etcd member removal controller") nodes.OnChange(ctx, "managed-etcd-controller", e.sync) nodes.OnRemove(ctx, "managed-etcd-controller", e.onRemove) } @@ -50,7 +57,7 @@ func (e *etcdMemberHandler) sync(key string, node *v1.Node) (*v1.Node, error) { if currentNodeName, ok := node.Annotations[NodeNameAnnotation]; ok { if currentNodeName != removed { // If the current node name is not the same as the removed node name, reset the tainted annotation and removed node name - logrus.Infof("Resetting removed node flag as removed node name ( did not match current node name") + logrus.Infof("Resetting removed node flag as removed node name (did not match current node name") delete(node.Annotations, removedNodeNameAnnotation) node.Annotations[removalAnnotation] = "false" return e.nodeController.Update(node) diff --git a/pkg/etcd/metadata_controller.go b/pkg/etcd/metadata_controller.go index 39b4bf3fd7b..915f5814da0 100644 --- a/pkg/etcd/metadata_controller.go +++ b/pkg/etcd/metadata_controller.go @@ -10,12 +10,19 @@ import ( v1 "k8s.io/api/core/v1" ) -func RegisterMetadataHandlers(ctx context.Context, etcd *ETCD, nodes controllerv1.NodeController) { +func registerMetadataHandlers(ctx context.Context, etcd *ETCD) { + if etcd.config.DisableETCD { + return + } + + nodes := etcd.config.Runtime.Core.Core().V1().Node() h := &metadataHandler{ etcd: etcd, nodeController: nodes, ctx: ctx, } + + logrus.Infof("Starting managed etcd node label controller") nodes.OnChange(ctx, "managed-etcd-metadata-controller", h.sync) } @@ -32,7 +39,7 @@ func (m *metadataHandler) sync(key string, node *v1.Node) (*v1.Node, error) { nodeName := os.Getenv("NODE_NAME") if nodeName == "" { - logrus.Debug("waiting for node to be assigned for etcd controller") + logrus.Debug("waiting for node name to be assigned for managed etcd node label controller") m.nodeController.EnqueueAfter(key, 5*time.Second) return node, nil } @@ -48,8 +55,7 @@ func (m *metadataHandler) handleSelf(node *v1.Node) (*v1.Node, error) { if node.Annotations[NodeNameAnnotation] == m.etcd.name && node.Annotations[NodeAddressAnnotation] == m.etcd.address && node.Labels[EtcdRoleLabel] == "true" && - node.Labels[ControlPlaneLabel] == "true" || - m.etcd.config.DisableETCD { + node.Labels[ControlPlaneLabel] == "true" { return node, nil } diff --git a/pkg/server/server.go b/pkg/server/server.go index 56152417334..cc2b4f1eed5 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -14,7 +14,6 @@ import ( "time" "github.com/k3s-io/helm-controller/pkg/helm" - "github.com/k3s-io/k3s/pkg/apiaddresses" "github.com/k3s-io/k3s/pkg/cli/cmds" "github.com/k3s-io/k3s/pkg/clientaccess" "github.com/k3s-io/k3s/pkg/daemons/config" @@ -236,10 +235,6 @@ func coreControllers(ctx context.Context, sc *Context, config *Config) error { return err } - if err := apiaddresses.Register(ctx, config.ControlConfig.Runtime, sc.Core.Core().V1().Endpoints()); err != nil { - return err - } - if config.ControlConfig.EncryptSecrets { if err := secretsencrypt.Register(ctx, sc.K8s,