k3s/pkg/executor/embed/embed.go
Brad Davidson f891548e32
Some checks are pending
Scorecard supply-chain security / Scorecard analysis (push) Waiting to run
Fix embedded excutor VPN config injection
Allow the executor to modify node config before certs are generated, and use this to add VPN node IPs to kubelet serving cert
2026-04-14 09:03:05 -07:00

448 lines
13 KiB
Go

//go:build !no_embedded_executor
package embed
import (
"context"
"flag"
"fmt"
"net"
"net/http"
"os/exec"
"path/filepath"
"runtime/debug"
"strconv"
"sync"
"time"
"github.com/k3s-io/k3s/pkg/agent/containerd"
"github.com/k3s-io/k3s/pkg/agent/cri"
"github.com/k3s-io/k3s/pkg/agent/cridockerd"
"github.com/k3s-io/k3s/pkg/agent/flannel"
"github.com/k3s-io/k3s/pkg/agent/netpol"
"github.com/k3s-io/k3s/pkg/cli/cmds"
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/daemons/executor"
"github.com/k3s-io/k3s/pkg/executor/embed/etcd"
"github.com/k3s-io/k3s/pkg/signals"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/util/errors"
"github.com/k3s-io/k3s/pkg/version"
"github.com/k3s-io/k3s/pkg/vpn"
"github.com/sirupsen/logrus"
"k8s.io/apiserver/pkg/authentication/authenticator"
cloudprovider "k8s.io/cloud-provider"
ccmapp "k8s.io/cloud-provider/app"
cloudcontrollerconfig "k8s.io/cloud-provider/app/config"
"k8s.io/cloud-provider/names"
ccmopt "k8s.io/cloud-provider/options"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/klog/v2"
apiapp "k8s.io/kubernetes/cmd/kube-apiserver/app"
cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app"
proxy "k8s.io/kubernetes/cmd/kube-proxy/app"
sapp "k8s.io/kubernetes/cmd/kube-scheduler/app"
kubelet "k8s.io/kubernetes/cmd/kubelet/app"
utilsnet "k8s.io/utils/net"
// registering k3s cloud provider
_ "github.com/k3s-io/k3s/pkg/cloudprovider"
)
var once sync.Once
func init() {
executor.Set(&Embedded{})
}
// explicit type check
var _ executor.Executor = &Embedded{}
var _ executor.PreparingExecutor = &Embedded{}
var _ vpn.InfoProvider = &Embedded{}
type Embedded struct {
apiServerReady <-chan struct{}
etcdReady chan struct{}
criReady chan struct{}
nodeConfig *daemonconfig.Node
vpnInfo *vpn.Info
}
// Prepare modifies the node config prior to downloading client and server certificates from the server.
// If node IPs or names need to be modified, it should be done here so that the kubelet client and serving certs are valid.
func (e *Embedded) Prepare(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error {
// If there is a VPN, we must start it early to overwrite NodeIP and flannel interface
var err error
if cfg.VPNAuthFile != "" {
cfg.VPNAuth, err = util.ReadFile(ctx, cfg.VPNAuthFile)
if err != nil {
return errors.WithMessage(err, "failed to read vpn-auth-file")
}
}
if cfg.VPNAuth != "" {
err = vpn.StartVPN(cfg.VPNAuth)
if err != nil {
return err
}
e.vpnInfo, err = vpn.GetInfo(cfg.VPNAuth)
if err != nil {
return err
}
// Pass ipv4, ipv6 or both depending on nodeIPs mode
nodeIPs := nodeConfig.AgentConfig.NodeIPs
var vpnIPs []net.IP
if utilsnet.IsIPv4(nodeIPs[0]) && e.vpnInfo.IPv4Address != nil {
vpnIPs = append(vpnIPs, e.vpnInfo.IPv4Address)
if e.vpnInfo.IPv6Address != nil {
vpnIPs = append(vpnIPs, e.vpnInfo.IPv6Address)
}
} else if utilsnet.IsIPv6(nodeIPs[0]) && e.vpnInfo.IPv6Address != nil {
vpnIPs = append(vpnIPs, e.vpnInfo.IPv6Address)
if e.vpnInfo.IPv4Address != nil {
vpnIPs = append(vpnIPs, e.vpnInfo.IPv4Address)
}
} else {
return fmt.Errorf("address family mismatch when assigning VPN addresses to node: node=%v, VPN ipv4=%v ipv6=%v", nodeIPs, e.vpnInfo.IPv4Address, e.vpnInfo.IPv6Address)
}
// Overwrite nodeip and flannel interface and throw a warning if user explicitly set those parameters
if len(vpnIPs) != 0 {
logrus.Infof("Node-ip changed to %v due to VPN", vpnIPs)
if len(cfg.NodeIP.Value()) != 0 {
logrus.Warn("VPN provider overrides configured node-ip parameter")
}
if len(cfg.NodeExternalIP.Value()) != 0 {
logrus.Warn("VPN provider overrides node-external-ip parameter")
}
nodeIPs = vpnIPs
nodeConfig.AgentConfig.NodeIPs = vpnIPs
nodeConfig.AgentConfig.NodeIP = vpnIPs[0].String()
nodeConfig.Flannel.Iface, err = net.InterfaceByName(e.vpnInfo.Interface)
if err != nil {
return errors.WithMessagef(err, "unable to find vpn interface: %s", e.vpnInfo.Interface)
}
}
}
return err
}
func (e *Embedded) Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error {
e.apiServerReady = util.APIServerReadyChan(ctx, nodeConfig.AgentConfig.KubeConfigK3sController, util.DefaultAPIServerReadyTimeout)
e.etcdReady = make(chan struct{})
e.criReady = make(chan struct{})
e.nodeConfig = nodeConfig
go once.Do(func() {
// Ensure that the log verbosity remains set to the configured level by resetting it at 1-second intervals
// for the first 2 minutes that K3s is starting up. This is necessary because each of the Kubernetes
// components will initialize klog and reset the verbosity flag when they are starting.
logCtx, cancel := context.WithTimeout(ctx, time.Second*120)
defer cancel()
klog.InitFlags(nil)
for {
flag.Set("v", strconv.Itoa(cmds.LogConfig.VLevel))
select {
case <-time.After(time.Second):
case <-logCtx.Done():
return
}
}
})
if nodeConfig.Flannel.Backend != flannel.BackendNone {
var err error
if len(cfg.FlannelIface) > 0 {
nodeConfig.Flannel.Iface, err = net.InterfaceByName(cfg.FlannelIface)
if err != nil {
return errors.WithMessagef(err, "unable to find interface %s", cfg.FlannelIface)
}
}
// set paths for embedded flannel if enabled
hostLocal, err := exec.LookPath("host-local")
if err != nil {
return errors.WithMessagef(err, "failed to find host-local")
}
if cfg.FlannelConf == "" {
nodeConfig.Flannel.ConfFile = filepath.Join(cfg.DataDir, "agent", "etc", "flannel", "net-conf.json")
} else {
nodeConfig.Flannel.ConfFile = cfg.FlannelConf
nodeConfig.Flannel.ConfOverride = true
}
nodeConfig.AgentConfig.CNIBinDir = filepath.Dir(hostLocal)
nodeConfig.AgentConfig.CNIConfDir = filepath.Join(cfg.DataDir, "agent", "etc", "cni", "net.d")
// It does not make sense to use VPN without its flannel backend
if e.vpnInfo != nil {
nodeConfig.Flannel.Backend = e.vpnInfo.ProviderName
}
}
return nil
}
func (e *Embedded) Kubelet(ctx context.Context, args []string) error {
command := kubelet.NewKubeletCommand(context.Background())
command.SetArgs(args)
go func() {
<-e.APIServerReadyChan()
defer func() {
if err := recover(); err != nil {
logrus.WithField("stack", string(debug.Stack())).Fatalf("kubelet panic: %v", err)
}
}()
err := command.ExecuteContext(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
signals.RequestShutdown(errors.WithMessage(err, "kubelet exited"))
}
signals.RequestShutdown(nil)
}()
return nil
}
func (e *Embedded) KubeProxy(ctx context.Context, args []string) error {
command := proxy.NewProxyCommand()
command.SetArgs(util.GetArgs(platformKubeProxyArgs(e.nodeConfig), args))
go func() {
<-e.APIServerReadyChan()
defer func() {
if err := recover(); err != nil {
logrus.WithField("stack", string(debug.Stack())).Fatalf("kube-proxy panic: %v", err)
}
}()
err := command.ExecuteContext(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
signals.RequestShutdown(errors.WithMessage(err, "kube-proxy exited"))
}
signals.RequestShutdown(nil)
}()
return nil
}
func (*Embedded) APIServerHandlers(ctx context.Context) (authenticator.Request, http.Handler, error) {
startupConfig := <-apiapp.StartupConfig
return startupConfig.Authenticator, startupConfig.Handler, nil
}
func (e *Embedded) APIServer(ctx context.Context, args []string) error {
command := apiapp.NewAPIServerCommand(ctx.Done())
command.SetArgs(args)
go func() {
<-e.ETCDReadyChan()
defer func() {
if err := recover(); err != nil {
logrus.WithField("stack", string(debug.Stack())).Fatalf("apiserver panic: %v", err)
}
}()
err := command.ExecuteContext(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
signals.RequestShutdown(errors.WithMessage(err, "apiserver exited"))
}
signals.RequestShutdown(nil)
}()
return nil
}
func (e *Embedded) Scheduler(ctx context.Context, nodeReady <-chan struct{}, args []string) error {
command := sapp.NewSchedulerCommand(ctx.Done())
command.SetArgs(args)
go func() {
<-e.APIServerReadyChan()
<-nodeReady
defer func() {
if err := recover(); err != nil {
logrus.WithField("stack", string(debug.Stack())).Fatalf("scheduler panic: %v", err)
}
}()
err := command.ExecuteContext(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
signals.RequestShutdown(errors.WithMessage(err, "scheduler exited"))
}
signals.RequestShutdown(nil)
}()
return nil
}
func (e *Embedded) ControllerManager(ctx context.Context, args []string) error {
command := cmapp.NewControllerManagerCommand()
command.SetArgs(args)
go func() {
<-e.APIServerReadyChan()
defer func() {
if err := recover(); err != nil {
logrus.WithField("stack", string(debug.Stack())).Fatalf("controller-manager panic: %v", err)
}
}()
err := command.ExecuteContext(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
signals.RequestShutdown(errors.WithMessage(err, "controller-manager exited"))
}
signals.RequestShutdown(nil)
}()
return nil
}
func (*Embedded) CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error {
ccmOptions, err := ccmopt.NewCloudControllerManagerOptions()
if err != nil {
logrus.Fatalf("unable to initialize command options: %v", err)
}
cloudInitializer := func(config *cloudcontrollerconfig.CompletedConfig) cloudprovider.Interface {
cloud, err := cloudprovider.InitCloudProvider(version.Program, config.ComponentConfig.KubeCloudShared.CloudProvider.CloudConfigFile)
if err != nil {
logrus.Fatalf("Cloud provider could not be initialized: %v", err)
}
if cloud == nil {
logrus.Fatalf("Cloud provider is nil")
}
return cloud
}
controllerAliases := names.CCMControllerAliases()
command := ccmapp.NewCloudControllerManagerCommand(
ccmOptions,
cloudInitializer,
ccmapp.DefaultInitFuncConstructors,
controllerAliases,
cliflag.NamedFlagSets{},
ctx.Done())
command.SetArgs(args)
go func() {
<-ccmRBACReady
defer func() {
if err := recover(); err != nil {
logrus.WithField("stack", string(debug.Stack())).Fatalf("cloud-controller-manager panic: %v", err)
}
}()
err := command.ExecuteContext(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
signals.RequestShutdown(errors.WithMessage(err, "cloud-controller-manager exited"))
}
signals.RequestShutdown(nil)
}()
return nil
}
func (e *Embedded) CurrentETCDOptions() (executor.InitialOptions, error) {
return executor.InitialOptions{}, nil
}
func (e *Embedded) ETCD(ctx context.Context, wg *sync.WaitGroup, args *executor.ETCDConfig, extraArgs []string, test executor.TestFunc) error {
// Start a goroutine to call the provided test function until it returns true.
// The test function is reponsible for ensuring that the etcd server is up
// and ready to accept client requests.
if e.etcdReady != nil {
go func() {
for {
if err := test(ctx, true); err != nil {
logrus.Infof("Failed to test etcd connection: %v", err)
} else {
logrus.Info("Connection to etcd is ready")
close(e.etcdReady)
return
}
select {
case <-time.After(5 * time.Second):
case <-ctx.Done():
return
}
}
}()
}
return etcd.StartETCD(ctx, wg, args, extraArgs)
}
func (e *Embedded) Containerd(ctx context.Context, cfg *daemonconfig.Node) error {
return executor.CloseIfNilErr(containerd.Run(ctx, cfg), e.criReady)
}
func (e *Embedded) Docker(ctx context.Context, cfg *daemonconfig.Node) error {
return executor.CloseIfNilErr(cridockerd.Run(ctx, cfg), e.criReady)
}
func (e *Embedded) CRI(ctx context.Context, cfg *daemonconfig.Node) error {
// agentless sets cri socket path to /dev/null to indicate no CRI is needed
if cfg.ContainerRuntimeEndpoint != "/dev/null" {
return executor.CloseIfNilErr(cri.WaitForService(ctx, cfg.ContainerRuntimeEndpoint, "CRI"), e.criReady)
}
return executor.CloseIfNilErr(nil, e.criReady)
}
func (e *Embedded) CNI(ctx context.Context, wg *sync.WaitGroup, cfg *daemonconfig.Node) error {
if cfg.Flannel.Backend != flannel.BackendNone {
if (cfg.Flannel.ExternalIP) && (len(cfg.AgentConfig.NodeExternalIPs) == 0) {
logrus.Warnf("Server has flannel-external-ip flag set but this node does not set node-external-ip. Flannel will use internal address when connecting to this node.")
} else if (cfg.Flannel.ExternalIP) && (cfg.Flannel.Backend != flannel.BackendWireguardNative) {
logrus.Warnf("Flannel is using external addresses with an insecure backend: %v. Please consider using an encrypting flannel backend.", cfg.Flannel.Backend)
}
if err := flannel.Prepare(ctx, cfg); err != nil {
return err
}
if err := flannel.Run(ctx, wg, cfg); err != nil {
return err
}
}
if !cfg.AgentConfig.DisableNPC {
if err := netpol.Run(ctx, wg, cfg); err != nil {
return err
}
}
return nil
}
func (e *Embedded) APIServerReadyChan() <-chan struct{} {
if e.apiServerReady == nil {
panic("executor not bootstrapped")
}
return e.apiServerReady
}
func (e *Embedded) ETCDReadyChan() <-chan struct{} {
if e.etcdReady == nil {
panic("executor not bootstrapped")
}
return e.etcdReady
}
func (e *Embedded) CRIReadyChan() <-chan struct{} {
if e.criReady == nil {
panic("executor not bootstrapped")
}
return e.criReady
}
func (e Embedded) IsSelfHosted() bool {
return false
}
func (e Embedded) GetVPNInfo() (*vpn.Info, error) {
if e.vpnInfo != nil {
return e.vpnInfo, nil
}
return nil, errors.New("VPN info not set")
}