package agent import ( "context" "fmt" "net" "os" "path/filepath" goruntime "runtime" "strconv" "strings" "sync" "time" systemd "github.com/coreos/go-systemd/v22/daemon" "github.com/k3s-io/k3s/pkg/agent/config" "github.com/k3s-io/k3s/pkg/agent/containerd" "github.com/k3s-io/k3s/pkg/agent/proxy" "github.com/k3s-io/k3s/pkg/agent/syssetup" "github.com/k3s-io/k3s/pkg/agent/tunnel" "github.com/k3s-io/k3s/pkg/certmonitor" "github.com/k3s-io/k3s/pkg/cgroups" "github.com/k3s-io/k3s/pkg/cli/cmds" "github.com/k3s-io/k3s/pkg/clientaccess" cp "github.com/k3s-io/k3s/pkg/cloudprovider" "github.com/k3s-io/k3s/pkg/daemons/agent" daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config" "github.com/k3s-io/k3s/pkg/daemons/executor" "github.com/k3s-io/k3s/pkg/metrics" "github.com/k3s-io/k3s/pkg/nodeconfig" "github.com/k3s-io/k3s/pkg/profile" "github.com/k3s-io/k3s/pkg/rootless" "github.com/k3s-io/k3s/pkg/signals" "github.com/k3s-io/k3s/pkg/spegel" "github.com/k3s-io/k3s/pkg/util" "github.com/k3s-io/k3s/pkg/util/errors" "github.com/k3s-io/k3s/pkg/version" "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" toolscache "k8s.io/client-go/tools/cache" toolswatch "k8s.io/client-go/tools/watch" "k8s.io/component-base/cli/globalflag" "k8s.io/component-base/logs" app2 "k8s.io/kubernetes/cmd/kube-proxy/app" kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config" utilsnet "k8s.io/utils/net" utilsptr "k8s.io/utils/ptr" ) func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error { nodeConfig, err := config.Get(ctx, cfg, proxy) if err != nil { return errors.WithMessage(err, "failed to retrieve agent configuration") } dualCluster, err := utilsnet.IsDualStackCIDRs(nodeConfig.AgentConfig.ClusterCIDRs) if err != nil { return errors.WithMessage(err, "failed to validate cluster-cidr") } dualService, err := utilsnet.IsDualStackCIDRs(nodeConfig.AgentConfig.ServiceCIDRs) if err != nil { return errors.WithMessage(err, "failed to validate service-cidr") } dualNode, err := utilsnet.IsDualStackIPs(nodeConfig.AgentConfig.NodeIPs) if err != nil { return errors.WithMessage(err, "failed to validate node-ip") } serviceIPv4 := utilsnet.IsIPv4CIDR(nodeConfig.AgentConfig.ServiceCIDR) clusterIPv4 := utilsnet.IsIPv4CIDR(nodeConfig.AgentConfig.ClusterCIDR) nodeIPv4 := utilsnet.IsIPv4String(nodeConfig.AgentConfig.NodeIP) serviceIPv6 := utilsnet.IsIPv6CIDR(nodeConfig.AgentConfig.ServiceCIDR) clusterIPv6 := utilsnet.IsIPv6CIDR(nodeConfig.AgentConfig.ClusterCIDR) nodeIPv6 := utilsnet.IsIPv6String(nodeConfig.AgentConfig.NodeIP) // check that cluster-cidr and service-cidr have the same IP versions if (serviceIPv6 != clusterIPv6) || (dualCluster != dualService) || (serviceIPv4 != clusterIPv4) { return fmt.Errorf("cluster-cidr: %v and service-cidr: %v, must share the same IP version (IPv4, IPv6 or dual-stack)", nodeConfig.AgentConfig.ClusterCIDRs, nodeConfig.AgentConfig.ServiceCIDRs) } // check that node-ip has the IP versions set in cluster-cidr if (clusterIPv6 && !(nodeIPv6 || dualNode)) || (dualCluster && !dualNode) || (clusterIPv4 && !(nodeIPv4 || dualNode)) { return fmt.Errorf("cluster-cidr: %v and node-ip: %v, must share the same IP version (IPv4, IPv6 or dual-stack)", nodeConfig.AgentConfig.ClusterCIDRs, nodeConfig.AgentConfig.NodeIPs) } enableIPv6 := dualCluster || clusterIPv6 enableIPv4 := dualCluster || clusterIPv4 // dualStack or IPv6 are not supported on Windows node if (goruntime.GOOS == "windows") && enableIPv6 { return errors.New("dual-stack or IPv6 are not supported on Windows node") } conntrackConfig, err := getConntrackConfig(nodeConfig) if err != nil { return errors.WithMessage(err, "failed to validate kube-proxy conntrack configuration") } syssetup.Configure(enableIPv6, conntrackConfig) nodeConfig.AgentConfig.EnableIPv4 = enableIPv4 nodeConfig.AgentConfig.EnableIPv6 = enableIPv6 if err := executor.Bootstrap(ctx, nodeConfig, cfg); err != nil { return err } if nodeConfig.EmbeddedRegistry { if nodeConfig.Docker || nodeConfig.ContainerRuntimeEndpoint != "" { return errors.New("embedded registry mirror requires embedded containerd") } if err := spegel.DefaultRegistry.Start(ctx, nodeConfig, executor.CRIReadyChan()); err != nil { return errors.WithMessage(err, "failed to start embedded registry") } } if nodeConfig.SupervisorMetrics { if err := metrics.DefaultMetrics.Start(ctx, nodeConfig); err != nil { return errors.WithMessage(err, "failed to serve metrics") } } if nodeConfig.EnablePProf { if err := profile.DefaultProfiler.Start(ctx, nodeConfig); err != nil { return errors.WithMessage(err, "failed to serve pprof") } } if err := setupCriCtlConfig(cfg, nodeConfig); err != nil { return err } // Create a new context to use for agent components that is cancelled on a // delay after the signal context. This allows other things (like etcd) to // clean up, before agent components exit when their contexts are cancelled. ctx = util.DelayCancel(ctx, util.DefaultContextDelay) notifySocket := os.Getenv("NOTIFY_SOCKET") os.Unsetenv("NOTIFY_SOCKET") go func() { if err := startCRI(ctx, nodeConfig); err != nil { signals.RequestShutdown(errors.WithMessage(err, "failed to start container runtime")) } }() if err := setupTunnelAndRunAgent(ctx, nodeConfig, cfg, proxy); err != nil { return err } go func() { <-executor.APIServerReadyChan() if err := startNetwork(ctx, &sync.WaitGroup{}, nodeConfig); err != nil { signals.RequestShutdown(errors.WithMessage(err, "failed to start networking")) return } // By default, the server is responsible for notifying systemd // On agent-only nodes, the agent will notify systemd if notifySocket != "" { logrus.Info(version.Program + " agent is up and running") os.Setenv("NOTIFY_SOCKET", notifySocket) systemd.SdNotify(true, "READY=1\n") } }() return nil } // startCRI starts the configured CRI, or waits for an external CRI to be ready. func startCRI(ctx context.Context, nodeConfig *daemonconfig.Node) error { if nodeConfig.Docker { return executor.Docker(ctx, nodeConfig) } else if nodeConfig.ContainerRuntimeEndpoint == "" { if err := containerd.SetupContainerdConfig(nodeConfig); err != nil { return err } return executor.Containerd(ctx, nodeConfig) } return executor.CRI(ctx, nodeConfig) } // startNetwork updates the network annotations on the node and starts the CNI func startNetwork(ctx context.Context, wg *sync.WaitGroup, nodeConfig *daemonconfig.Node) error { // Use the kubelet kubeconfig to update annotations on the local node kubeletClient, err := util.GetClientSet(nodeConfig.AgentConfig.KubeConfigKubelet) if err != nil { return err } if err := configureNode(ctx, nodeConfig, kubeletClient); err != nil { return err } return executor.CNI(ctx, wg, nodeConfig) } // getConntrackConfig uses the kube-proxy code to parse the user-provided kube-proxy-arg values, and // extract the conntrack settings so that K3s can set them itself. This allows us to soft-fail when // running K3s in Docker, where kube-proxy is no longer allowed to set conntrack sysctls on newer kernels. // When running rootless, we do not attempt to set conntrack sysctls - this behavior is copied from kubeadm. func getConntrackConfig(nodeConfig *daemonconfig.Node) (*kubeproxyconfig.KubeProxyConntrackConfiguration, error) { ctConfig := &kubeproxyconfig.KubeProxyConntrackConfiguration{ MaxPerCore: utilsptr.To(int32(0)), Min: utilsptr.To(int32(0)), TCPEstablishedTimeout: &metav1.Duration{}, TCPCloseWaitTimeout: &metav1.Duration{}, } if nodeConfig.AgentConfig.Rootless { return ctConfig, nil } cmd := app2.NewProxyCommand() globalflag.AddGlobalFlags(cmd.Flags(), cmd.Name(), logs.SkipLoggingConfigurationFlags()) if err := cmd.ParseFlags(util.GetArgs(map[string]string{}, nodeConfig.AgentConfig.ExtraKubeProxyArgs)); err != nil { return nil, err } maxPerCore, err := cmd.Flags().GetInt32("conntrack-max-per-core") if err != nil { return nil, err } ctConfig.MaxPerCore = &maxPerCore ctMin, err := cmd.Flags().GetInt32("conntrack-min") if err != nil { return nil, err } ctConfig.Min = &ctMin establishedTimeout, err := cmd.Flags().GetDuration("conntrack-tcp-timeout-established") if err != nil { return nil, err } ctConfig.TCPEstablishedTimeout.Duration = establishedTimeout closeWaitTimeout, err := cmd.Flags().GetDuration("conntrack-tcp-timeout-close-wait") if err != nil { return nil, err } ctConfig.TCPCloseWaitTimeout.Duration = closeWaitTimeout return ctConfig, nil } // RunStandalone bootstraps the executor, but does not run the kubelet or containerd. // This allows other bits of code that expect the executor to be set up properly to function // even when the agent is disabled. func RunStandalone(ctx context.Context, wg *sync.WaitGroup, cfg cmds.Agent) error { proxy, err := createProxyAndValidateToken(ctx, &cfg) if err != nil { return err } nodeConfig, err := config.Get(ctx, cfg, proxy) if err != nil { return errors.WithMessage(err, "failed to retrieve agent configuration") } if err := executor.Bootstrap(ctx, nodeConfig, cfg); err != nil { return err } // this is a no-op just to get the cri ready channel closed if err := executor.CRI(ctx, nodeConfig); err != nil { return err } if err := tunnelSetup(ctx, nodeConfig, cfg, proxy); err != nil { return err } if err := certMonitorSetup(ctx, nodeConfig, cfg); err != nil { return err } if nodeConfig.SupervisorMetrics { if err := metrics.DefaultMetrics.Start(ctx, nodeConfig); err != nil { return errors.WithMessage(err, "failed to serve metrics") } } if nodeConfig.EnablePProf { if err := profile.DefaultProfiler.Start(ctx, nodeConfig); err != nil { return errors.WithMessage(err, "failed to serve pprof") } } return nil } // Run sets up cgroups, configures the LB proxy, and triggers startup // of containerd and kubelet. func Run(ctx context.Context, wg *sync.WaitGroup, cfg cmds.Agent) error { if err := cgroups.Validate(); err != nil { return err } if cfg.Rootless && !cfg.RootlessAlreadyUnshared { dualNode, err := utilsnet.IsDualStackIPStrings(cfg.NodeIP.Value()) if err != nil { return err } if err := rootless.Rootless(cfg.DataDir, dualNode); err != nil { return err } } proxy, err := createProxyAndValidateToken(ctx, &cfg) if err != nil { return err } return run(ctx, cfg, proxy) } func createProxyAndValidateToken(ctx context.Context, cfg *cmds.Agent) (proxy.Proxy, error) { agentDir := filepath.Join(cfg.DataDir, "agent") clientKubeletCert := filepath.Join(agentDir, "client-kubelet.crt") clientKubeletKey := filepath.Join(agentDir, "client-kubelet.key") if err := os.MkdirAll(agentDir, 0700); err != nil { return nil, err } _, nodeIPs, err := util.GetHostnameAndIPs(cfg.NodeName, cfg.NodeIP.Value()) if err != nil { return nil, errors.WithMessage(err, "failed to get node name and addresses") } proxy, err := proxy.NewSupervisorProxy(ctx, !cfg.DisableLoadBalancer, agentDir, cfg.ServerURL, cfg.LBServerPort, utilsnet.IsIPv6(nodeIPs[0])) if err != nil { return nil, err } options := []clientaccess.ValidationOption{ clientaccess.WithUser("node"), clientaccess.WithClientCertificate(clientKubeletCert, clientKubeletKey), } for { newToken, err := clientaccess.ParseAndValidateToken(proxy.SupervisorURL(), cfg.Token, options...) if err != nil { logrus.Errorf("Failed to validate connection to cluster at %s: %v", cfg.ServerURL, err) select { case <-ctx.Done(): return nil, ctx.Err() case <-time.After(2 * time.Second): } continue } cfg.Token = newToken.String() break } return proxy, nil } // configureNode waits for the node object to be created, and if/when it does, // ensures that the labels and annotations are up to date. func configureNode(ctx context.Context, nodeConfig *daemonconfig.Node, coreClient kubernetes.Interface) error { patcher := util.NewPatcher[*v1.Node](coreClient.CoreV1().Nodes()) lw := toolscache.NewListWatchFromClient(coreClient.CoreV1().RESTClient(), "nodes", metav1.NamespaceNone, fields.OneTermEqualSelector(metav1.ObjectNameField, nodeConfig.AgentConfig.NodeName)) condition := func(ev watch.Event) (bool, error) { node, ok := ev.Object.(*v1.Node) if !ok { return false, errors.New("event object not of type v1.Node") } patch := util.NewPatchList() updateMutableLabels(&nodeConfig.AgentConfig, patch) if nodeConfig.AgentConfig.DisableCCM { removeAddressAnnotations(patch, node) removeLegacyAddressLabels(patch, node) } else { updateAddressAnnotations(&nodeConfig.AgentConfig, patch, node) updateLegacyAddressLabels(&nodeConfig.AgentConfig, patch, node) } // inject node config nodeconfig.SetNodeConfigAnnotations(nodeConfig, patch, node) nodeconfig.SetNodeConfigLabels(nodeConfig, patch, node) if _, err := patcher.Patch(ctx, patch, nodeConfig.AgentConfig.NodeName); err != nil { logrus.Infof("Failed to set annotations and labels on node %s: %v", nodeConfig.AgentConfig.NodeName, err) return false, nil } logrus.Infof("Annotations and labels have been set successfully on node: %s", nodeConfig.AgentConfig.NodeName) return true, nil } if _, err := toolswatch.UntilWithSync(ctx, lw, &v1.Node{}, nil, condition); err != nil { return errors.WithMessage(err, "failed to configure node") } return nil } func updateMutableLabels(agentConfig *daemonconfig.Agent, patch *util.PatchList) { for _, m := range agentConfig.NodeLabels { var ( v string p = strings.SplitN(m, `=`, 2) k = p[0] ) if len(p) > 1 { v = p[1] } patch.Add(v, "metadata", "labels", k) } } func updateLegacyAddressLabels(agentConfig *daemonconfig.Agent, patch *util.PatchList, node *v1.Node) { ls := labels.Set(node.Labels) if ls.Has(cp.InternalIPKey) || ls.Has(cp.HostnameKey) { patch.Add(agentConfig.NodeIP, "metadata", "labels", cp.InternalIPKey) patch.Add(getHostname(agentConfig), "metadata", "labels", cp.HostnameKey) if agentConfig.NodeExternalIP != "" { patch.Add(agentConfig.NodeExternalIP, "metadata", "labels", cp.ExternalIPKey) } } } func removeLegacyAddressLabels(patch *util.PatchList, node *v1.Node) { for _, key := range []string{cp.HostnameKey, cp.InternalIPKey, cp.ExternalIPKey} { if _, ok := node.Labels[key]; ok { patch.Remove("metadata", "labels", key) } } } // updateAddressAnnotations updates the node annotations with important information about IP addresses of the node func updateAddressAnnotations(agentConfig *daemonconfig.Agent, patch *util.PatchList, node *v1.Node) { patch.Add(util.JoinIPs(agentConfig.NodeIPs), "metadata", "annotations", cp.InternalIPKey) patch.Add(getHostname(agentConfig), "metadata", "annotations", cp.HostnameKey) if agentConfig.NodeExternalIP != "" { patch.Add(util.JoinIPs(agentConfig.NodeExternalIPs), "metadata", "annotations", cp.ExternalIPKey) } if len(agentConfig.NodeInternalDNSs) > 0 { patch.Add(strings.Join(agentConfig.NodeInternalDNSs, ","), "metadata", "annotations", cp.InternalDNSKey) } else if _, ok := node.Annotations[cp.InternalDNSKey]; ok { patch.Remove("metadata", "annotations", cp.InternalDNSKey) } if len(agentConfig.NodeExternalDNSs) > 0 { patch.Add(strings.Join(agentConfig.NodeExternalDNSs, ","), "metadata", "annotations", cp.ExternalDNSKey) } else if _, ok := node.Annotations[cp.ExternalDNSKey]; ok { patch.Remove("metadata", "annotations", cp.ExternalDNSKey) } } func removeAddressAnnotations(patch *util.PatchList, node *v1.Node) { for _, key := range []string{cp.HostnameKey, cp.InternalIPKey, cp.ExternalIPKey, cp.InternalDNSKey, cp.ExternalDNSKey} { if _, ok := node.Annotations[key]; ok { patch.Remove("metadata", "annotations", key) } } } // setupTunnelAndRunAgent starts the agent tunnel, cert expiry monitoring, and // runs the Agent (cri+kubelet). On etcd-only nodes, an extra goroutine is // started to retrieve apiserver addresses from the datastore. On other node // types, this is done later by the tunnel setup, which starts goroutines to // watch apiserver endpoints. func setupTunnelAndRunAgent(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent, proxy proxy.Proxy) error { // only need to get apiserver addresses from the datastore on an etcd-only node that is not being reset if !cfg.ClusterReset && cfg.ETCDAgent { go waitForAPIServerAddresses(ctx, nodeConfig, cfg, proxy) } if err := tunnelSetup(ctx, nodeConfig, cfg, proxy); err != nil { return err } if err := certMonitorSetup(ctx, nodeConfig, cfg); err != nil { return err } return agent.Agent(ctx, nodeConfig, proxy) } // waitForAPIServerAddresses syncs apiserver addresses from the datastore. This // is also handled by the agent tunnel watch, but on etcd-only nodes we need to // read apiserver addresses from APIAddressCh before the agent has a // connection to the apiserver. This does not return until addresses or set, // or the context is cancelled. func waitForAPIServerAddresses(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent, proxy proxy.Proxy) { var localSupervisorDefault bool if addresses := proxy.SupervisorAddresses(); len(addresses) > 0 { host, _, _ := net.SplitHostPort(addresses[0]) if host == "127.0.0.1" || host == "::1" { localSupervisorDefault = true } } for { select { case <-time.After(5 * time.Second): logrus.Info("Waiting for control-plane node to register apiserver addresses in etcd") case addresses := <-cfg.APIAddressCh: for i, a := range addresses { host, _, err := net.SplitHostPort(a) if err == nil { addresses[i] = net.JoinHostPort(host, strconv.Itoa(nodeConfig.ServerHTTPSPort)) } } // If this is an etcd-only node that started up using its local supervisor, // switch to using a control-plane node as the supervisor. Otherwise, leave the // configured server address as the default. if localSupervisorDefault && len(addresses) > 0 { proxy.SetSupervisorDefault(addresses[0]) } proxy.Update(addresses) return case <-ctx.Done(): return } } } // tunnelSetup calls tunnel setup, unless the embedded etc cluster is being reset/restored, in which case // this is unnecessary as the kubelet is only needed to manage static pods and does not need to establish // tunneled connections to other cluster members. func tunnelSetup(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent, proxy proxy.Proxy) error { if cfg.ClusterReset { return nil } return tunnel.Setup(ctx, nodeConfig, proxy) } func certMonitorSetup(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error { if cfg.ClusterReset { return nil } return certmonitor.Setup(ctx, nodeConfig, cfg.DataDir) } // getHostname returns the actual system hostname. // If the hostname cannot be determined, or is invalid, the node name is used. func getHostname(agentConfig *daemonconfig.Agent) string { hostname, err := os.Hostname() if err != nil || hostname == "" || strings.Contains(hostname, "localhost") { return agentConfig.NodeName } return hostname }