Merge pull request #5325 from rbrtbnfgl/fix-etcd-ipv6-url

Fixed etcd URL in case of IPv6 address
This commit is contained in:
Roberto Bonafiglia 2022-04-05 09:55:42 +02:00 committed by GitHub
commit 4afeb9c5c7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 104 additions and 31 deletions

View file

@ -308,7 +308,8 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N
// If the supervisor and externally-facing apiserver are not on the same port, tell the proxy where to find the apiserver.
if controlConfig.SupervisorPort != controlConfig.HTTPSPort {
if err := proxy.SetAPIServerPort(ctx, controlConfig.HTTPSPort); err != nil {
_, isIPv6, _ := util.GetFirstString([]string{envInfo.NodeIP.String()})
if err := proxy.SetAPIServerPort(ctx, controlConfig.HTTPSPort, isIPv6); err != nil {
return nil, errors.Wrapf(err, "failed to setup access to API Server port %d on at %s", controlConfig.HTTPSPort, proxy.SupervisorURL())
}
}

View file

@ -40,9 +40,13 @@ var (
ETCDServerServiceName = version.Program + "-etcd-server-load-balancer"
)
func New(ctx context.Context, dataDir, serviceName, serverURL string, lbServerPort int) (_lb *LoadBalancer, _err error) {
func New(ctx context.Context, dataDir, serviceName, serverURL string, lbServerPort int, isIPv6 bool) (_lb *LoadBalancer, _err error) {
config := net.ListenConfig{Control: reusePort}
listener, err := config.Listen(ctx, "tcp", "127.0.0.1:"+strconv.Itoa(lbServerPort))
localhostAddress := "127.0.0.1"
if isIPv6 {
localhostAddress = "[::1]"
}
listener, err := config.Listen(ctx, "tcp", localhostAddress+":"+strconv.Itoa(lbServerPort))
defer func() {
if _err != nil {
logrus.Warnf("Error starting load balancer: %s", _err)

View file

@ -106,7 +106,7 @@ func Test_UnitFailOver(t *testing.T) {
DataDir: tmpDir,
}
lb, err := New(context.TODO(), cfg.DataDir, SupervisorServiceName, cfg.ServerURL, RandomPort)
lb, err := New(context.TODO(), cfg.DataDir, SupervisorServiceName, cfg.ServerURL, RandomPort, false)
if err != nil {
assertEqual(t, err, nil)
}
@ -157,7 +157,7 @@ func Test_UnitFailFast(t *testing.T) {
DataDir: tmpDir,
}
lb, err := New(context.TODO(), cfg.DataDir, SupervisorServiceName, cfg.ServerURL, RandomPort)
lb, err := New(context.TODO(), cfg.DataDir, SupervisorServiceName, cfg.ServerURL, RandomPort, false)
if err != nil {
assertEqual(t, err, nil)
}

View file

@ -14,7 +14,7 @@ import (
type Proxy interface {
Update(addresses []string)
SetAPIServerPort(ctx context.Context, port int) error
SetAPIServerPort(ctx context.Context, port int, isIPv6 bool) error
SetSupervisorDefault(address string)
SupervisorURL() string
SupervisorAddresses() []string
@ -29,7 +29,7 @@ type Proxy interface {
// NOTE: This is a proxy in the API sense - it returns either actual server URLs, or the URL of the
// local load-balancer. It is not actually responsible for proxying requests at the network level;
// this is handled by the load-balancers that the proxy optionally steers connections towards.
func NewSupervisorProxy(ctx context.Context, lbEnabled bool, dataDir, supervisorURL string, lbServerPort int) (Proxy, error) {
func NewSupervisorProxy(ctx context.Context, lbEnabled bool, dataDir, supervisorURL string, lbServerPort int, isIPv6 bool) (Proxy, error) {
p := proxy{
lbEnabled: lbEnabled,
dataDir: dataDir,
@ -40,7 +40,7 @@ func NewSupervisorProxy(ctx context.Context, lbEnabled bool, dataDir, supervisor
}
if lbEnabled {
lb, err := loadbalancer.New(ctx, dataDir, loadbalancer.SupervisorServiceName, supervisorURL, p.lbServerPort)
lb, err := loadbalancer.New(ctx, dataDir, loadbalancer.SupervisorServiceName, supervisorURL, p.lbServerPort, isIPv6)
if err != nil {
return nil, err
}
@ -110,7 +110,7 @@ func (p *proxy) setSupervisorPort(addresses []string) []string {
// load-balancing is enabled, another load-balancer is started on a port one below the supervisor
// load-balancer, and the address of this load-balancer is returned instead of the actual apiserver
// addresses.
func (p *proxy) SetAPIServerPort(ctx context.Context, port int) error {
func (p *proxy) SetAPIServerPort(ctx context.Context, port int, isIPv6 bool) error {
u, err := url.Parse(p.initialSupervisorURL)
if err != nil {
return errors.Wrapf(err, "failed to parse server URL %s", p.initialSupervisorURL)
@ -125,7 +125,7 @@ func (p *proxy) SetAPIServerPort(ctx context.Context, port int) error {
if lbServerPort != 0 {
lbServerPort = lbServerPort - 1
}
lb, err := loadbalancer.New(ctx, p.dataDir, loadbalancer.APIServerServiceName, p.apiServerURL, lbServerPort)
lb, err := loadbalancer.New(ctx, p.dataDir, loadbalancer.APIServerServiceName, p.apiServerURL, lbServerPort, isIPv6)
if err != nil {
return err
}

View file

@ -239,8 +239,9 @@ func createProxyAndValidateToken(ctx context.Context, cfg *cmds.Agent) (proxy.Pr
if err := os.MkdirAll(agentDir, 0700); err != nil {
return nil, err
}
_, isIPv6, _ := util.GetFirstString([]string{cfg.NodeIP.String()})
proxy, err := proxy.NewSupervisorProxy(ctx, !cfg.DisableLoadBalancer, agentDir, cfg.ServerURL, cfg.LBServerPort)
proxy, err := proxy.NewSupervisorProxy(ctx, !cfg.DisableLoadBalancer, agentDir, cfg.ServerURL, cfg.LBServerPort, isIPv6)
if err != nil {
return nil, err
}

View file

@ -167,7 +167,7 @@ func connect(rootCtx context.Context, waitGroup *sync.WaitGroup, address string,
for {
remotedialer.ClientConnect(ctx, wsURL, nil, ws, func(proto, address string) bool {
host, port, err := net.SplitHostPort(address)
return err == nil && proto == "tcp" && ports[port] && host == "127.0.0.1"
return err == nil && proto == "tcp" && ports[port] && (host == "127.0.0.1" || host == "::1")
}, func(_ context.Context) error {
if waitGroup != nil {
once.Do(waitGroup.Done)

View file

@ -24,7 +24,7 @@ func Register(ctx context.Context, runtime *config.ControlRuntime, endpoints con
}
endpoints.OnChange(ctx, version.Program+"-apiserver-lb-controller", h.sync)
cl, err := etcd.GetClient(h.ctx, h.runtime, "https://127.0.0.1:2379")
cl, err := etcd.GetClient(h.ctx, h.runtime)
if err != nil {
return err
}

View file

@ -460,9 +460,12 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
if ip == "" {
ip = "127.0.0.1"
if IPv6only {
ip = "[::1]"
ip = "::1"
}
}
if utilsnet.IsIPv6String(ip) {
ip = fmt.Sprintf("[%s]", ip)
}
url := fmt.Sprintf("https://%s:%d", ip, serverConfig.ControlConfig.SupervisorPort)
token, err := clientaccess.FormatToken(serverConfig.ControlConfig.Runtime.AgentToken, serverConfig.ControlConfig.Runtime.ServerCA)

View file

@ -10,6 +10,7 @@ import (
"github.com/k3s-io/k3s/pkg/cluster/managed"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/etcd"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/kine/pkg/endpoint"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@ -52,7 +53,8 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) {
clientURL.Host = clientURL.Hostname() + ":2379"
clientURLs = append(clientURLs, clientURL.String())
}
etcdProxy, err := etcd.NewETCDProxy(ctx, true, c.config.DataDir, clientURLs[0])
IPv6OnlyService, _ := util.IsIPv6OnlyCIDRs(c.config.ServiceIPRanges)
etcdProxy, err := etcd.NewETCDProxy(ctx, true, c.config.DataDir, clientURLs[0], IPv6OnlyService)
if err != nil {
return nil, err
}

View file

@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"errors"
"fmt"
"io/ioutil"
"log"
"net"
@ -22,6 +23,7 @@ import (
"github.com/rancher/wrangler/pkg/generated/controllers/core"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilsnet "k8s.io/utils/net"
)
// newListener returns a new TCP listener and HTTP request handler using dynamiclistener.
@ -35,7 +37,11 @@ func (c *Cluster) newListener(ctx context.Context) (net.Listener, http.Handler,
os.Remove(filepath.Join(c.config.DataDir, "tls/dynamic-cert.json"))
}
}
tcp, err := dynamiclistener.NewTCPListener(c.config.BindAddress, c.config.SupervisorPort)
ip := c.config.BindAddress
if utilsnet.IsIPv6String(ip) {
ip = fmt.Sprintf("[%s]", ip)
}
tcp, err := dynamiclistener.NewTCPListener(ip, c.config.SupervisorPort)
if err != nil {
return nil, nil, err
}

View file

@ -22,6 +22,7 @@ import (
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/passwd"
"github.com/k3s-io/k3s/pkg/token"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
certutil "github.com/rancher/dynamiclistener/cert"
"github.com/sirupsen/logrus"
@ -305,7 +306,13 @@ func genClientCerts(config *config.Control) error {
factory := getSigningCertFactory(regen, nil, []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth}, runtime.ClientCA, runtime.ClientCAKey)
var certGen bool
apiEndpoint := fmt.Sprintf("https://127.0.0.1:%d", config.APIServerPort)
IPv6OnlyService, _ := util.IsIPv6OnlyCIDRs(config.ServiceIPRanges)
ip := "127.0.0.1"
if IPv6OnlyService {
ip = "[::1]"
}
apiEndpoint := fmt.Sprintf("https://%s:%d", ip, config.APIServerPort)
certGen, err = factory("system:admin", []string{"system:masters"}, runtime.ClientAdminCert, runtime.ClientAdminKey)
if err != nil {

View file

@ -32,7 +32,13 @@ import (
_ "k8s.io/component-base/metrics/prometheus/restclient"
)
var localhostIP = net.ParseIP("127.0.0.1")
func getLocalhostIP(serviceCIDR []*net.IPNet) net.IP {
IPv6OnlyService, _ := util.IsIPv6OnlyCIDRs(serviceCIDR)
if IPv6OnlyService {
return net.ParseIP("::1")
}
return net.ParseIP("127.0.0.1")
}
type roundTripFunc func(req *http.Request) (*http.Response, error)
@ -102,7 +108,7 @@ func controllerManager(ctx context.Context, cfg *config.Control) error {
"cluster-cidr": util.JoinIPNets(cfg.ClusterIPRanges),
"root-ca-file": runtime.ServerCA,
"profiling": "false",
"bind-address": localhostIP.String(),
"bind-address": getLocalhostIP(cfg.ServiceIPRanges).String(),
"secure-port": "10257",
"use-service-account-credentials": "true",
"cluster-signing-kube-apiserver-client-cert-file": runtime.ClientCA,
@ -134,7 +140,7 @@ func scheduler(ctx context.Context, cfg *config.Control) error {
"kubeconfig": runtime.KubeConfigScheduler,
"authorization-kubeconfig": runtime.KubeConfigScheduler,
"authentication-kubeconfig": runtime.KubeConfigScheduler,
"bind-address": localhostIP.String(),
"bind-address": getLocalhostIP(cfg.ServiceIPRanges).String(),
"secure-port": "10259",
"profiling": "false",
}
@ -171,7 +177,7 @@ func apiServer(ctx context.Context, cfg *config.Control) error {
argsMap["insecure-port"] = "0"
argsMap["secure-port"] = strconv.Itoa(cfg.APIServerPort)
if cfg.APIServerBindAddress == "" {
argsMap["bind-address"] = localhostIP.String()
argsMap["bind-address"] = getLocalhostIP(cfg.ServiceIPRanges).String()
} else {
argsMap["bind-address"] = cfg.APIServerBindAddress
}
@ -304,7 +310,7 @@ func cloudControllerManager(ctx context.Context, cfg *config.Control) error {
"authorization-kubeconfig": runtime.KubeConfigCloudController,
"authentication-kubeconfig": runtime.KubeConfigCloudController,
"node-status-update-frequency": "1m0s",
"bind-address": "127.0.0.1",
"bind-address": getLocalhostIP(cfg.ServiceIPRanges).String(),
"port": "0",
}
if cfg.NoLeaderElect {

View file

@ -45,6 +45,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/client-go/util/retry"
utilsnet "k8s.io/utils/net"
)
const (
@ -133,6 +134,13 @@ func NewETCD() *ETCD {
}
}
func getLocalhostAddress(address string) string {
if utilsnet.IsIPv6String(address) {
return "[::1]"
}
return "127.0.0.1"
}
// EndpointName returns the name of the endpoint.
func (e *ETCD) EndpointName() string {
return "etcd"
@ -744,19 +752,29 @@ func (e *ETCD) migrateFromSQLite(ctx context.Context) error {
// peerURL returns the peer access address for the local node
func (e *ETCD) peerURL() string {
if utilsnet.IsIPv6String(e.address) {
return fmt.Sprintf("https://[%s]:2380", e.address)
}
return fmt.Sprintf("https://%s:2380", e.address)
}
// clientURL returns the client access address for the local node
func (e *ETCD) clientURL() string {
if utilsnet.IsIPv6String(e.address) {
return fmt.Sprintf("https://[%s]:2379", e.address)
}
return fmt.Sprintf("https://%s:2379", e.address)
}
// metricsURL returns the metrics access address
func (e *ETCD) metricsURL(expose bool) string {
address := "http://127.0.0.1:2381"
address := fmt.Sprintf("http://%s:2381", getLocalhostAddress(e.address))
if expose {
address = fmt.Sprintf("http://%s:2381,%s", e.address, address)
if utilsnet.IsIPv6String(e.address) {
address = fmt.Sprintf("http://[%s]:2381,%s", e.address, address)
} else {
address = fmt.Sprintf("http://%s:2381,%s", e.address, address)
}
}
return address
}
@ -767,7 +785,7 @@ func (e *ETCD) cluster(ctx context.Context, forceNew bool, options executor.Init
Name: e.name,
InitialOptions: options,
ForceNewCluster: forceNew,
ListenClientURLs: e.clientURL() + ",https://127.0.0.1:2379",
ListenClientURLs: e.clientURL() + "," + fmt.Sprintf("https://%s:2379", getLocalhostAddress(e.address)),
ListenMetricsURLs: e.metricsURL(e.config.EtcdExposeMetrics),
ListenPeerURLs: e.peerURL(),
AdvertiseClientURLs: e.clientURL(),

View file

@ -17,7 +17,7 @@ type Proxy interface {
// NewETCDProxy initializes a new proxy structure that contain a load balancer
// which listens on port 2379 and proxy between etcd cluster members
func NewETCDProxy(ctx context.Context, enabled bool, dataDir, etcdURL string) (Proxy, error) {
func NewETCDProxy(ctx context.Context, enabled bool, dataDir, etcdURL string, isIPv6 bool) (Proxy, error) {
u, err := url.Parse(etcdURL)
if err != nil {
return nil, errors.Wrap(err, "failed to parse etcd client URL")
@ -30,7 +30,7 @@ func NewETCDProxy(ctx context.Context, enabled bool, dataDir, etcdURL string) (P
}
if enabled {
lb, err := loadbalancer.New(ctx, dataDir, loadbalancer.ETCDServerServiceName, etcdURL, 2379)
lb, err := loadbalancer.New(ctx, dataDir, loadbalancer.ETCDServerServiceName, etcdURL, 2379, isIPv6)
if err != nil {
return nil, err
}

View file

@ -37,6 +37,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/net"
utilsnet "k8s.io/utils/net"
)
const (
@ -91,7 +92,12 @@ func StartServer(ctx context.Context, config *Config, cfg *cmds.Server) error {
if err == nil {
ip = hostIP
} else {
ip = net2.ParseIP("127.0.0.1")
IPv6OnlyService, _ := util.IsIPv6OnlyCIDRs(config.ControlConfig.ServiceIPRanges)
if IPv6OnlyService {
ip = net2.ParseIP("::1")
} else {
ip = net2.ParseIP("127.0.0.1")
}
}
}
@ -333,7 +339,12 @@ func printTokens(advertiseIP string, config *config.Control) error {
)
if advertiseIP == "" {
advertiseIP = "127.0.0.1"
IPv6OnlyService, _ := util.IsIPv6OnlyCIDRs(config.ServiceIPRanges)
if IPv6OnlyService {
advertiseIP = "::1"
} else {
advertiseIP = "127.0.0.1"
}
}
if len(config.Runtime.ServerToken) > 0 {
@ -365,12 +376,24 @@ func printTokens(advertiseIP string, config *config.Control) error {
func writeKubeConfig(certs string, config *Config) error {
ip := config.ControlConfig.BindAddress
if ip == "" {
ip = "127.0.0.1"
IPv6OnlyService, _ := util.IsIPv6OnlyCIDRs(config.ControlConfig.ServiceIPRanges)
if IPv6OnlyService {
ip = "[::1]"
} else {
ip = "127.0.0.1"
}
} else if utilsnet.IsIPv6String(ip) {
ip = fmt.Sprintf("[%s]", ip)
}
port := config.ControlConfig.HTTPSPort
// on servers without a local apiserver, tunnel access via the loadbalancer
if config.ControlConfig.DisableAPIServer {
ip = "127.0.0.1"
IPv6OnlyService, _ := util.IsIPv6OnlyCIDRs(config.ControlConfig.ServiceIPRanges)
if IPv6OnlyService {
ip = "[::1]"
} else {
ip = "127.0.0.1"
}
port = config.ControlConfig.APIServerPort
}
url := fmt.Sprintf("https://%s:%d", ip, port)
@ -454,6 +477,8 @@ func printToken(httpsPort int, advertiseIP, prefix, cmd string) {
logrus.Errorf("Failed to choose interface: %v", err)
}
ip = hostIP.String()
} else if utilsnet.IsIPv6String(ip) {
ip = fmt.Sprintf("[%s]", ip)
}
logrus.Infof("%s %s %s -s https://%s:%d -t ${NODE_TOKEN}", prefix, version.Program, cmd, ip, httpsPort)