From 7ba37bba2329c0b2e06958075ac30459f67d54cb Mon Sep 17 00:00:00 2001 From: jokestax Date: Wed, 13 May 2026 09:09:08 +0000 Subject: [PATCH 1/3] feat: add k3s-owned systemd watchdog notifier Signed-off-by: jokestax --- pkg/agent/run.go | 34 +++++ pkg/agent/run_test.go | 22 ++++ pkg/cli/cmds/log_linux.go | 6 +- pkg/cli/server/server.go | 40 ++++++ pkg/daemons/health/health.go | 115 +++++++++++++++++ pkg/daemons/health/health_test.go | 121 ++++++++++++++++++ pkg/daemons/watchdog/watchdog.go | 87 +++++++++++++ pkg/daemons/watchdog/watchdog_test.go | 174 ++++++++++++++++++++++++++ 8 files changed, 598 insertions(+), 1 deletion(-) create mode 100644 pkg/daemons/health/health.go create mode 100644 pkg/daemons/health/health_test.go create mode 100644 pkg/daemons/watchdog/watchdog.go create mode 100644 pkg/daemons/watchdog/watchdog_test.go diff --git a/pkg/agent/run.go b/pkg/agent/run.go index 3d2436198dc..517d07b4513 100644 --- a/pkg/agent/run.go +++ b/pkg/agent/run.go @@ -26,6 +26,8 @@ import ( "github.com/k3s-io/k3s/pkg/daemons/agent" daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config" "github.com/k3s-io/k3s/pkg/daemons/executor" + "github.com/k3s-io/k3s/pkg/daemons/health" + "github.com/k3s-io/k3s/pkg/daemons/watchdog" "github.com/k3s-io/k3s/pkg/metrics" "github.com/k3s-io/k3s/pkg/nodeconfig" "github.com/k3s-io/k3s/pkg/profile" @@ -164,12 +166,44 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error { logrus.Info(version.Program + " agent is up and running") os.Setenv("NOTIFY_SOCKET", notifySocket) systemd.SdNotify(true, "READY=1\n") + go watchdog.Run(ctx, notifySocket, agentHealthGroup(nodeConfig)) } }() return nil } +// agentHealthGroup returns the set of liveness checks that must all pass for +// the k3s agent process to be considered healthy by the systemd watchdog. It +// is only used on agent-only nodes; on server nodes the server's checker set +// also covers kubelet and kube-proxy. kube-proxy is not included here because +// its disabled state is not stored on nodeConfig — adding it unconditionally +// would silence the watchdog whenever kube-proxy is disabled. +func agentHealthGroup(nodeConfig *daemonconfig.Node) *health.Group { + g := health.NewGroup() + + g.Add(health.HTTPGet("kubelet", "http://127.0.0.1:10248/healthz")) + if socket := criSocketPath(nodeConfig); socket != "" { + g.Add(health.UnixSocket("cri", socket)) + } + return g +} + +// criSocketPath returns a filesystem path suitable for net.DialUnix, stripping +// the "unix://" scheme that some configurations use. It returns "" when the +// runtime socket is unknown or not a unix socket. +func criSocketPath(nodeConfig *daemonconfig.Node) string { + addr := nodeConfig.AgentConfig.RuntimeSocket + if addr == "" { + return "" + } + addr = strings.TrimPrefix(addr, "unix://") + if strings.Contains(addr, "://") { + return "" + } + return addr +} + // startCRI starts the configured CRI, or waits for an external CRI to be ready. func startCRI(ctx context.Context, nodeConfig *daemonconfig.Node) error { if nodeConfig.Docker { diff --git a/pkg/agent/run_test.go b/pkg/agent/run_test.go index 9f2255322e8..6819a248eb1 100644 --- a/pkg/agent/run_test.go +++ b/pkg/agent/run_test.go @@ -97,3 +97,25 @@ func Test_UnitGetConntrackConfig(t *testing.T) { }) } } + +func Test_UnitCRISocketPath(t *testing.T) { + tests := []struct { + name string + socket string + want string + }{ + {name: "empty", socket: "", want: ""}, + {name: "plain path", socket: "/run/k3s/containerd/containerd.sock", want: "/run/k3s/containerd/containerd.sock"}, + {name: "unix scheme", socket: "unix:///run/k3s/containerd/containerd.sock", want: "/run/k3s/containerd/containerd.sock"}, + {name: "non-unix scheme", socket: "tcp://127.0.0.1:1234", want: ""}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + nodeConfig := &daemonconfig.Node{} + nodeConfig.AgentConfig.RuntimeSocket = tt.socket + if got := criSocketPath(nodeConfig); got != tt.want { + t.Errorf("criSocketPath(%q) = %q, want %q", tt.socket, got, tt.want) + } + }) + } +} diff --git a/pkg/cli/cmds/log_linux.go b/pkg/cli/cmds/log_linux.go index 513e94db05d..389673f5d40 100644 --- a/pkg/cli/cmds/log_linux.go +++ b/pkg/cli/cmds/log_linux.go @@ -57,7 +57,11 @@ func forkIfLoggingOrReaping() error { } args := append([]string{version.Program}, os.Args[1:]...) - env := append(os.Environ(), "_K3S_LOG_REEXEC_=true", "NOTIFY_SOCKET=") + // NOTIFY_SOCKET is intentionally passed through to the child so that + // pkg/daemons/watchdog can drive systemd's WATCHDOG=1 pings. The child + // strips NOTIFY_SOCKET from its env early (server.go / agent run.go) + // before any embedded component can read it. + env := append(os.Environ(), "_K3S_LOG_REEXEC_=true") ctx := signals.SetupSignalContext() cmd := exec.CommandContext(ctx, "/proc/self/exe") cmd.Args = args diff --git a/pkg/cli/server/server.go b/pkg/cli/server/server.go index b6e3257b328..7aa8af0e950 100644 --- a/pkg/cli/server/server.go +++ b/pkg/cli/server/server.go @@ -6,6 +6,7 @@ import ( "net" "os" "path/filepath" + "strconv" "strings" "sync" "time" @@ -18,6 +19,8 @@ import ( "github.com/k3s-io/k3s/pkg/clientaccess" "github.com/k3s-io/k3s/pkg/daemons/config" "github.com/k3s-io/k3s/pkg/daemons/executor" + "github.com/k3s-io/k3s/pkg/daemons/health" + "github.com/k3s-io/k3s/pkg/daemons/watchdog" "github.com/k3s-io/k3s/pkg/datadir" "github.com/k3s-io/k3s/pkg/etcd" k3smetrics "github.com/k3s-io/k3s/pkg/metrics" @@ -615,11 +618,48 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont logrus.Info(version.Program + " is up and running") os.Setenv("NOTIFY_SOCKET", notifySocket) systemd.SdNotify(true, "READY=1\n") + go watchdog.Run(ctx, notifySocket, serverHealthGroup(&serverConfig.ControlConfig, cfg.DisableAgent)) }() return server.StartServer(ctx, wg, &serverConfig, cfg) } +// serverHealthGroup returns the set of liveness checks that must all pass for +// the k3s server process to be considered healthy by the systemd watchdog. +// Components that are disabled via --disable-* flags are skipped. When the +// embedded agent is running on this node, kubelet and kube-proxy checks are +// included as well, because the same k3s process is responsible for their +// liveness too. All probes target loopback because every component lives in +// the same process. +func serverHealthGroup(cc *config.Control, agentDisabled bool) *health.Group { + g := health.NewGroup() + host := cc.Loopback(false) + + if !cc.DisableAPIServer && cc.HTTPSPort > 0 { + url := fmt.Sprintf("https://%s/livez", net.JoinHostPort(host, strconv.Itoa(cc.HTTPSPort))) + g.Add(health.HTTPGet("kube-apiserver", url)) + } + if !cc.DisableETCD { + g.Add(health.TCP("etcd", net.JoinHostPort(host, "2379"))) + } + if !cc.DisableControllerManager { + g.Add(health.TCP("kube-controller-manager", net.JoinHostPort(host, "10257"))) + } + if !cc.DisableScheduler { + g.Add(health.TCP("kube-scheduler", net.JoinHostPort(host, "10259"))) + } + if cc.SupervisorPort > 0 { + g.Add(health.TCP("supervisor", net.JoinHostPort(host, strconv.Itoa(cc.SupervisorPort)))) + } + if !agentDisabled { + g.Add(health.HTTPGet("kubelet", fmt.Sprintf("http://%s/healthz", net.JoinHostPort(host, "10248")))) + if !cc.DisableKubeProxy { + g.Add(health.HTTPGet("kube-proxy", fmt.Sprintf("http://%s/healthz", net.JoinHostPort(host, "10256")))) + } + } + return g +} + // validateNetworkConfig ensures that the network configuration values make sense. func validateNetworkConfiguration(serverConfig server.Config) error { switch serverConfig.ControlConfig.EgressSelectorMode { diff --git a/pkg/daemons/health/health.go b/pkg/daemons/health/health.go new file mode 100644 index 00000000000..732c0c91f7b --- /dev/null +++ b/pkg/daemons/health/health.go @@ -0,0 +1,115 @@ +// Package health provides a small framework for registering per-component +// liveness probes used by the systemd watchdog notifier. Each registered +// Checker is invoked on every watchdog tick; if any check fails the notifier +// stays silent and systemd will eventually restart k3s. +package health + +import ( + "context" + "crypto/tls" + "fmt" + "net" + "net/http" + "time" +) + +const dialTimeout = 5 * time.Second + +type Checker interface { + Name() string + Check(ctx context.Context) error +} + +type Func struct { + ComponentName string + Probe func(ctx context.Context) error +} + +func (f Func) Name() string { return f.ComponentName } +func (f Func) Check(ctx context.Context) error { return f.Probe(ctx) } + +type Group struct { + checkers []Checker +} + +func NewGroup() *Group { return &Group{} } + +func (g *Group) Add(c ...Checker) { + for _, ck := range c { + if ck == nil { + continue + } + g.checkers = append(g.checkers, ck) + } +} + +func (g *Group) Len() int { return len(g.checkers) } + +func (g *Group) Names() []string { + names := make([]string, len(g.checkers)) + for i, c := range g.checkers { + names[i] = c.Name() + } + return names +} + +func (g *Group) CheckAll(ctx context.Context) (string, error) { + for _, c := range g.checkers { + if err := c.Check(ctx); err != nil { + return c.Name(), err + } + } + return "", nil +} + +func TCP(name, addr string) Checker { + return Func{ComponentName: name, Probe: func(ctx context.Context) error { + probeCtx, cancel := context.WithTimeout(ctx, dialTimeout) + defer cancel() + var d net.Dialer + conn, err := d.DialContext(probeCtx, "tcp", addr) + if err != nil { + return fmt.Errorf("dial tcp %s: %w", addr, err) + } + return conn.Close() + }} +} + +func UnixSocket(name, path string) Checker { + return Func{ComponentName: name, Probe: func(ctx context.Context) error { + probeCtx, cancel := context.WithTimeout(ctx, dialTimeout) + defer cancel() + var d net.Dialer + conn, err := d.DialContext(probeCtx, "unix", path) + if err != nil { + return fmt.Errorf("dial unix %s: %w", path, err) + } + return conn.Close() + }} +} + +func HTTPGet(name, url string) Checker { + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + DisableKeepAlives: true, + }, + } + return Func{ComponentName: name, Probe: func(ctx context.Context) error { + probeCtx, cancel := context.WithTimeout(ctx, dialTimeout) + defer cancel() + req, err := http.NewRequestWithContext(probeCtx, http.MethodGet, url, nil) + if err != nil { + return err + } + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("get %s: %w", url, err) + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("get %s: status %d", url, resp.StatusCode) + } + return nil + }} +} diff --git a/pkg/daemons/health/health_test.go b/pkg/daemons/health/health_test.go new file mode 100644 index 00000000000..22716d0a0de --- /dev/null +++ b/pkg/daemons/health/health_test.go @@ -0,0 +1,121 @@ +package health + +import ( + "context" + "errors" + "net" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "testing" +) + +func Test_UnitGroupCheckAll(t *testing.T) { + boom := errors.New("boom") + g := NewGroup() + g.Add( + Func{ComponentName: "first", Probe: func(_ context.Context) error { return nil }}, + Func{ComponentName: "second", Probe: func(_ context.Context) error { return boom }}, + Func{ComponentName: "third", Probe: func(_ context.Context) error { t.Fatal("third should not run after second failed"); return nil }}, + ) + name, err := g.CheckAll(context.Background()) + if name != "second" { + t.Errorf("expected first failure to be %q, got %q", "second", name) + } + if !errors.Is(err, boom) { + t.Errorf("expected wrapped boom error, got %v", err) + } +} + +func Test_UnitGroupCheckAllPasses(t *testing.T) { + g := NewGroup() + g.Add( + Func{ComponentName: "a", Probe: func(_ context.Context) error { return nil }}, + Func{ComponentName: "b", Probe: func(_ context.Context) error { return nil }}, + ) + if name, err := g.CheckAll(context.Background()); name != "" || err != nil { + t.Errorf("expected ('', nil), got (%q, %v)", name, err) + } +} + +func Test_UnitGroupAddSkipsNil(t *testing.T) { + g := NewGroup() + g.Add(nil, Func{ComponentName: "x", Probe: func(_ context.Context) error { return nil }}, nil) + if g.Len() != 1 { + t.Errorf("expected nil checkers to be skipped; got Len=%d", g.Len()) + } + if names := g.Names(); len(names) != 1 || names[0] != "x" { + t.Errorf("unexpected names: %v", names) + } +} + +func Test_UnitTCPChecker(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + t.Cleanup(func() { ln.Close() }) + + if err := TCP("ok", ln.Addr().String()).Check(context.Background()); err != nil { + t.Errorf("expected open port to pass, got %v", err) + } + ln.Close() + if err := TCP("closed", ln.Addr().String()).Check(context.Background()); err == nil { + t.Errorf("expected closed port to fail") + } +} + +func Test_UnitUnixSocketChecker(t *testing.T) { + dir := t.TempDir() + socket := filepath.Join(dir, "test.sock") + + ln, err := net.Listen("unix", socket) + if err != nil { + t.Fatalf("listen unix: %v", err) + } + t.Cleanup(func() { ln.Close() }) + + if err := UnixSocket("ok", socket).Check(context.Background()); err != nil { + t.Errorf("expected open socket to pass, got %v", err) + } + + ln.Close() + _ = os.Remove(socket) + if err := UnixSocket("missing", socket).Check(context.Background()); err == nil { + t.Errorf("expected missing socket to fail") + } +} + +func Test_UnitHTTPGetChecker(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/ok": + w.WriteHeader(http.StatusOK) + case "/fail": + w.WriteHeader(http.StatusInternalServerError) + } + })) + t.Cleanup(srv.Close) + + if err := HTTPGet("ok", srv.URL+"/ok").Check(context.Background()); err != nil { + t.Errorf("expected 200 response to pass, got %v", err) + } + if err := HTTPGet("fail", srv.URL+"/fail").Check(context.Background()); err == nil { + t.Errorf("expected 500 response to fail") + } + if err := HTTPGet("dial", "http://127.0.0.1:1/never").Check(context.Background()); err == nil { + t.Errorf("expected unreachable URL to fail") + } +} + +func Test_UnitHTTPGetCheckerSkipsTLSVerify(t *testing.T) { + // httptest.NewTLSServer uses a self-signed cert; the checker must accept it. + srv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + t.Cleanup(srv.Close) + if err := HTTPGet("tls", srv.URL+"/livez").Check(context.Background()); err != nil { + t.Errorf("expected TLS-skip-verify probe to pass against self-signed cert, got %v", err) + } +} diff --git a/pkg/daemons/watchdog/watchdog.go b/pkg/daemons/watchdog/watchdog.go new file mode 100644 index 00000000000..9a8fde6b67f --- /dev/null +++ b/pkg/daemons/watchdog/watchdog.go @@ -0,0 +1,87 @@ +// Package watchdog implements the k3s side of the systemd watchdog protocol. +// +// k3s strips NOTIFY_SOCKET from the process environment early in startup so +// that embedded components (kubelet, etcd, kine, etc.) cannot send READY=1 or +// WATCHDOG=1 to systemd on behalf of the whole process. That is intentional: +// the kubelet by itself does not know whether etcd, the API server, the CRI +// runtime, and the other in-process components are alive, so letting it ping +// the watchdog would mask whole-process failures. +// +// READY=1 is still sent the usual way via systemd.SdNotify by the server / +// agent startup code, which temporarily restores NOTIFY_SOCKET and then +// unsets it again. This package owns the periodic WATCHDOG=1 pings: callers +// pass in the cached NOTIFY_SOCKET value they captured before it was +// stripped, plus a health.Group covering every component that must be alive +// for the process to be considered healthy. WATCHDOG=1 is only sent while +// every Checker in the group passes; otherwise the loop stays quiet and +// systemd will restart the unit after WatchdogSec. +package watchdog + +import ( + "context" + "errors" + "net" + "time" + + systemd "github.com/coreos/go-systemd/v22/daemon" + "github.com/k3s-io/k3s/pkg/daemons/health" + "github.com/sirupsen/logrus" +) + +func Run(ctx context.Context, socketPath string, group *health.Group) { + if socketPath == "" { + return + } + if group == nil || group.Len() == 0 { + logrus.Warn("systemd watchdog: no health checks registered, notifier disabled") + return + } + interval, err := systemd.SdWatchdogEnabled(false) + if err != nil { + logrus.Warnf("systemd watchdog: failed to read WATCHDOG_USEC: %v", err) + return + } + if interval == 0 { + logrus.Debug("systemd watchdog: not enabled by unit, notifier disabled") + return + } + + tick := interval / 2 + logrus.Infof("systemd watchdog: pinging every %s (WatchdogSec=%s), monitoring components %v", + tick, interval, group.Names()) + + ticker := time.NewTicker(tick) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if name, err := group.CheckAll(ctx); err != nil { + logrus.Warnf("systemd watchdog: %q is unhealthy, withholding WATCHDOG=1: %v", name, err) + continue + } + if err := notify(socketPath, systemd.SdNotifyWatchdog); err != nil { + logrus.Warnf("systemd watchdog: failed to send WATCHDOG=1: %v", err) + } + } + } +} + +// notify writes a single state line as a datagram to the systemd notify +// socket. The socket is SOCK_DGRAM, so each call is a self-contained +// message and we do not need to manage a persistent connection. +func notify(socketPath, state string) error { + if socketPath == "" { + return errors.New("watchdog: empty notify socket path") + } + addr := &net.UnixAddr{Name: socketPath, Net: "unixgram"} + conn, err := net.DialUnix(addr.Net, nil, addr) + if err != nil { + return err + } + defer conn.Close() + _, err = conn.Write([]byte(state)) + return err +} diff --git a/pkg/daemons/watchdog/watchdog_test.go b/pkg/daemons/watchdog/watchdog_test.go new file mode 100644 index 00000000000..e53a078df46 --- /dev/null +++ b/pkg/daemons/watchdog/watchdog_test.go @@ -0,0 +1,174 @@ +package watchdog + +import ( + "context" + "errors" + "net" + "os" + "path/filepath" + "sync/atomic" + "testing" + "time" + + "github.com/k3s-io/k3s/pkg/daemons/health" +) + +// startNotifyListener opens a unix datagram socket at a temporary path and +// returns the path plus a channel that receives every datagram written to it. +// The listener is cleaned up when the test ends. +func startNotifyListener(t *testing.T) (string, <-chan string) { + t.Helper() + dir := t.TempDir() + path := filepath.Join(dir, "notify.sock") + conn, err := net.ListenUnixgram("unixgram", &net.UnixAddr{Name: path, Net: "unixgram"}) + if err != nil { + t.Fatalf("listen unixgram: %v", err) + } + t.Cleanup(func() { + conn.Close() + _ = os.Remove(path) + }) + + out := make(chan string, 16) + go func() { + buf := make([]byte, 1024) + for { + n, _, err := conn.ReadFromUnix(buf) + if err != nil { + close(out) + return + } + out <- string(buf[:n]) + } + }() + return path, out +} + +// withWatchdogEnv sets WATCHDOG_USEC for the duration of the test so the +// notify loop actually ticks. +func withWatchdogEnv(t *testing.T, usec string) { + t.Helper() + prev, had := os.LookupEnv("WATCHDOG_USEC") + if err := os.Setenv("WATCHDOG_USEC", usec); err != nil { + t.Fatalf("setenv: %v", err) + } + t.Cleanup(func() { + if had { + os.Setenv("WATCHDOG_USEC", prev) + } else { + os.Unsetenv("WATCHDOG_USEC") + } + }) +} + +func Test_UnitWatchdogNoSocketReturnsImmediately(t *testing.T) { + g := health.NewGroup() + g.Add(health.Func{ComponentName: "x", Probe: func(_ context.Context) error { return nil }}) + done := make(chan struct{}) + go func() { Run(context.Background(), "", g); close(done) }() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("Run did not return when socketPath was empty") + } +} + +func Test_UnitWatchdogEmptyGroupReturnsImmediately(t *testing.T) { + socket, _ := startNotifyListener(t) + withWatchdogEnv(t, "200000") // 200ms + + done := make(chan struct{}) + go func() { Run(context.Background(), socket, health.NewGroup()); close(done) }() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("Run did not return when group was empty") + } +} + +func Test_UnitWatchdogNoEnvReturnsImmediately(t *testing.T) { + socket, _ := startNotifyListener(t) + // Make sure WATCHDOG_USEC is unset. + prev, had := os.LookupEnv("WATCHDOG_USEC") + os.Unsetenv("WATCHDOG_USEC") + t.Cleanup(func() { + if had { + os.Setenv("WATCHDOG_USEC", prev) + } + }) + + g := health.NewGroup() + g.Add(health.Func{ComponentName: "x", Probe: func(_ context.Context) error { return nil }}) + done := make(chan struct{}) + go func() { Run(context.Background(), socket, g); close(done) }() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("Run did not return when WATCHDOG_USEC was unset") + } +} + +func Test_UnitWatchdogPingsWhenHealthy(t *testing.T) { + socket, msgs := startNotifyListener(t) + withWatchdogEnv(t, "100000") + + g := health.NewGroup() + g.Add(health.Func{ComponentName: "ok", Probe: func(_ context.Context) error { return nil }}) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go Run(ctx, socket, g) + + select { + case got := <-msgs: + if got != "WATCHDOG=1" { + t.Errorf("expected WATCHDOG=1, got %q", got) + } + case <-time.After(2 * time.Second): + t.Fatal("did not receive WATCHDOG=1 within 2s") + } +} + +func Test_UnitWatchdogWithholdsPingWhenUnhealthy(t *testing.T) { + socket, msgs := startNotifyListener(t) + withWatchdogEnv(t, "100000") + + var calls atomic.Int32 + g := health.NewGroup() + g.Add(health.Func{ComponentName: "bad", Probe: func(_ context.Context) error { + calls.Add(1) + return errors.New("unhealthy") + }}) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go Run(ctx, socket, g) + + select { + case got := <-msgs: + t.Fatalf("did not expect any WATCHDOG=1 ping, got %q", got) + case <-time.After(500 * time.Millisecond): + } + if calls.Load() == 0 { + t.Fatal("expected checker to have been invoked at least once") + } +} + +func Test_UnitWatchdogStopsOnContextCancel(t *testing.T) { + socket, _ := startNotifyListener(t) + withWatchdogEnv(t, "100000") + + g := health.NewGroup() + g.Add(health.Func{ComponentName: "ok", Probe: func(_ context.Context) error { return nil }}) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { Run(ctx, socket, g); close(done) }() + + cancel() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("Run did not return after ctx cancellation") + } +} From 06a0a0c189d77223330fc42f2fe69374b714fe65 Mon Sep 17 00:00:00 2001 From: jokestax Date: Sat, 16 May 2026 11:40:20 +0000 Subject: [PATCH 2/3] fix: use upstream healthz.HealthChecker, gRPC CRI probe, real liveness endpoints Signed-off-by: jokestax --- pkg/agent/run.go | 50 ++++----- pkg/agent/run_test.go | 22 ---- pkg/cli/server/server.go | 48 +++++---- pkg/daemons/health/health.go | 129 ++++++++-------------- pkg/daemons/health/health_test.go | 147 +++++++++++++------------- pkg/daemons/watchdog/watchdog.go | 66 ++++++------ pkg/daemons/watchdog/watchdog_test.go | 106 ++++++++----------- 7 files changed, 255 insertions(+), 313 deletions(-) diff --git a/pkg/agent/run.go b/pkg/agent/run.go index 517d07b4513..6dec618b3b0 100644 --- a/pkg/agent/run.go +++ b/pkg/agent/run.go @@ -43,6 +43,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/server/healthz" "k8s.io/client-go/kubernetes" toolscache "k8s.io/client-go/tools/cache" toolswatch "k8s.io/client-go/tools/watch" @@ -143,6 +144,14 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error { notifySocket := os.Getenv("NOTIFY_SOCKET") os.Unsetenv("NOTIFY_SOCKET") + // Capture the watchdog interval before stripping WATCHDOG_USEC so the + // kubelet's own NewHealthChecker short-circuits and doesn't spawn a + // goroutine that logs "Failed to notify watchdog" every tick. + watchdogInterval, werr := systemd.SdWatchdogEnabled(true) + if werr != nil { + logrus.Warnf("systemd watchdog: failed to read WATCHDOG_USEC, watchdog disabled: %v", werr) + } + go func() { if err := startCRI(ctx, nodeConfig); err != nil { signals.RequestShutdown(errors.WithMessage(err, "failed to start container runtime")) @@ -166,42 +175,25 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error { logrus.Info(version.Program + " agent is up and running") os.Setenv("NOTIFY_SOCKET", notifySocket) systemd.SdNotify(true, "READY=1\n") - go watchdog.Run(ctx, notifySocket, agentHealthGroup(nodeConfig)) + go watchdog.Run(ctx, notifySocket, watchdogInterval, agentHealthCheckers(nodeConfig)) } }() return nil } -// agentHealthGroup returns the set of liveness checks that must all pass for -// the k3s agent process to be considered healthy by the systemd watchdog. It -// is only used on agent-only nodes; on server nodes the server's checker set -// also covers kubelet and kube-proxy. kube-proxy is not included here because -// its disabled state is not stored on nodeConfig — adding it unconditionally -// would silence the watchdog whenever kube-proxy is disabled. -func agentHealthGroup(nodeConfig *daemonconfig.Node) *health.Group { - g := health.NewGroup() - - g.Add(health.HTTPGet("kubelet", "http://127.0.0.1:10248/healthz")) - if socket := criSocketPath(nodeConfig); socket != "" { - g.Add(health.UnixSocket("cri", socket)) +// agentHealthCheckers returns the set of liveness checks that must all pass +// for the k3s agent process to be considered healthy by the systemd +// watchdog. Only used on agent-only nodes; on server nodes the server's +// checker set covers kubelet (and kube-proxy when gated by DisableKubeProxy +// on Control). kube-proxy is not included here because its disabled state is +// not stored on nodeConfig — adding it unconditionally would silence the +// watchdog whenever kube-proxy is disabled. +func agentHealthCheckers(nodeConfig *daemonconfig.Node) []healthz.HealthChecker { + return []healthz.HealthChecker{ + health.HTTPGet("kubelet", "http://127.0.0.1:10248/healthz"), + health.GRPC("cri", nodeConfig.AgentConfig.RuntimeSocket), } - return g -} - -// criSocketPath returns a filesystem path suitable for net.DialUnix, stripping -// the "unix://" scheme that some configurations use. It returns "" when the -// runtime socket is unknown or not a unix socket. -func criSocketPath(nodeConfig *daemonconfig.Node) string { - addr := nodeConfig.AgentConfig.RuntimeSocket - if addr == "" { - return "" - } - addr = strings.TrimPrefix(addr, "unix://") - if strings.Contains(addr, "://") { - return "" - } - return addr } // startCRI starts the configured CRI, or waits for an external CRI to be ready. diff --git a/pkg/agent/run_test.go b/pkg/agent/run_test.go index 6819a248eb1..9f2255322e8 100644 --- a/pkg/agent/run_test.go +++ b/pkg/agent/run_test.go @@ -97,25 +97,3 @@ func Test_UnitGetConntrackConfig(t *testing.T) { }) } } - -func Test_UnitCRISocketPath(t *testing.T) { - tests := []struct { - name string - socket string - want string - }{ - {name: "empty", socket: "", want: ""}, - {name: "plain path", socket: "/run/k3s/containerd/containerd.sock", want: "/run/k3s/containerd/containerd.sock"}, - {name: "unix scheme", socket: "unix:///run/k3s/containerd/containerd.sock", want: "/run/k3s/containerd/containerd.sock"}, - {name: "non-unix scheme", socket: "tcp://127.0.0.1:1234", want: ""}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - nodeConfig := &daemonconfig.Node{} - nodeConfig.AgentConfig.RuntimeSocket = tt.socket - if got := criSocketPath(nodeConfig); got != tt.want { - t.Errorf("criSocketPath(%q) = %q, want %q", tt.socket, got, tt.want) - } - }) - } -} diff --git a/pkg/cli/server/server.go b/pkg/cli/server/server.go index 7aa8af0e950..ec24fd81b7e 100644 --- a/pkg/cli/server/server.go +++ b/pkg/cli/server/server.go @@ -42,6 +42,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/server/healthz" kubeapiserverflag "k8s.io/component-base/cli/flag" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controlplane/apiserver/options" @@ -479,6 +480,14 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont notifySocket := os.Getenv("NOTIFY_SOCKET") os.Unsetenv("NOTIFY_SOCKET") + // Capture the watchdog interval before stripping WATCHDOG_USEC so the + // kubelet's own NewHealthChecker short-circuits and doesn't spawn a + // goroutine that logs "Failed to notify watchdog" every tick. + watchdogInterval, err := systemd.SdWatchdogEnabled(true) + if err != nil { + logrus.Warnf("systemd watchdog: failed to read WATCHDOG_USEC, watchdog disabled: %v", err) + } + // try setting advertise-ip from agent VPN if vpnInfo, _ := vpn.GetInfoFromExecutor(); vpnInfo != nil { // If we are in ipv6-only mode, we should pass the ipv6 address. Otherwise, ipv4 @@ -618,46 +627,49 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont logrus.Info(version.Program + " is up and running") os.Setenv("NOTIFY_SOCKET", notifySocket) systemd.SdNotify(true, "READY=1\n") - go watchdog.Run(ctx, notifySocket, serverHealthGroup(&serverConfig.ControlConfig, cfg.DisableAgent)) + go watchdog.Run(ctx, notifySocket, watchdogInterval, serverHealthCheckers(&serverConfig.ControlConfig, cfg.DisableAgent)) }() return server.StartServer(ctx, wg, &serverConfig, cfg) } -// serverHealthGroup returns the set of liveness checks that must all pass for -// the k3s server process to be considered healthy by the systemd watchdog. -// Components that are disabled via --disable-* flags are skipped. When the -// embedded agent is running on this node, kubelet and kube-proxy checks are -// included as well, because the same k3s process is responsible for their -// liveness too. All probes target loopback because every component lives in -// the same process. -func serverHealthGroup(cc *config.Control, agentDisabled bool) *health.Group { - g := health.NewGroup() +// serverHealthCheckers returns the liveness checks that must all pass for the +// k3s server process to be considered healthy by the systemd watchdog. +// Components disabled via --disable-* are skipped. When the embedded agent is +// running on this node, kubelet and kube-proxy are included too, since the +// same process owns their liveness. Endpoints mirror the RKE2 static-pod +// readiness/liveness probes: +// +// https://github.com/rancher/rke2/blob/v1.36.0+rke2r1/pkg/podtemplate/spec.go +func serverHealthCheckers(cc *config.Control, agentDisabled bool) []healthz.HealthChecker { + var checkers []healthz.HealthChecker host := cc.Loopback(false) if !cc.DisableAPIServer && cc.HTTPSPort > 0 { url := fmt.Sprintf("https://%s/livez", net.JoinHostPort(host, strconv.Itoa(cc.HTTPSPort))) - g.Add(health.HTTPGet("kube-apiserver", url)) + checkers = append(checkers, health.HTTPGet("kube-apiserver", url)) } if !cc.DisableETCD { - g.Add(health.TCP("etcd", net.JoinHostPort(host, "2379"))) + checkers = append(checkers, health.HTTPGet("etcd", fmt.Sprintf("http://%s/health?serializable=true", net.JoinHostPort(host, "2381")))) } if !cc.DisableControllerManager { - g.Add(health.TCP("kube-controller-manager", net.JoinHostPort(host, "10257"))) + checkers = append(checkers, health.HTTPGet("kube-controller-manager", fmt.Sprintf("https://%s/healthz", net.JoinHostPort(host, "10257")))) } if !cc.DisableScheduler { - g.Add(health.TCP("kube-scheduler", net.JoinHostPort(host, "10259"))) + checkers = append(checkers, health.HTTPGet("kube-scheduler", fmt.Sprintf("https://%s/healthz", net.JoinHostPort(host, "10259")))) } if cc.SupervisorPort > 0 { - g.Add(health.TCP("supervisor", net.JoinHostPort(host, strconv.Itoa(cc.SupervisorPort)))) + // The supervisor has no HTTP /healthz; a TCP probe verifies the + // listener is still bound. + checkers = append(checkers, health.TCP("supervisor", net.JoinHostPort(host, strconv.Itoa(cc.SupervisorPort)))) } if !agentDisabled { - g.Add(health.HTTPGet("kubelet", fmt.Sprintf("http://%s/healthz", net.JoinHostPort(host, "10248")))) + checkers = append(checkers, health.HTTPGet("kubelet", fmt.Sprintf("http://%s/healthz", net.JoinHostPort(host, "10248")))) if !cc.DisableKubeProxy { - g.Add(health.HTTPGet("kube-proxy", fmt.Sprintf("http://%s/healthz", net.JoinHostPort(host, "10256")))) + checkers = append(checkers, health.HTTPGet("kube-proxy", fmt.Sprintf("http://%s/healthz", net.JoinHostPort(host, "10256")))) } } - return g + return checkers } // validateNetworkConfig ensures that the network configuration values make sense. diff --git a/pkg/daemons/health/health.go b/pkg/daemons/health/health.go index 732c0c91f7b..b3ded49b0f2 100644 --- a/pkg/daemons/health/health.go +++ b/pkg/daemons/health/health.go @@ -1,7 +1,3 @@ -// Package health provides a small framework for registering per-component -// liveness probes used by the systemd watchdog notifier. Each registered -// Checker is invoked on every watchdog tick; if any check fails the notifier -// stays silent and systemd will eventually restart k3s. package health import ( @@ -10,95 +6,28 @@ import ( "fmt" "net" "net/http" + "strings" "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + "k8s.io/apiserver/pkg/server/healthz" ) const dialTimeout = 5 * time.Second -type Checker interface { - Name() string - Check(ctx context.Context) error -} - -type Func struct { - ComponentName string - Probe func(ctx context.Context) error -} - -func (f Func) Name() string { return f.ComponentName } -func (f Func) Check(ctx context.Context) error { return f.Probe(ctx) } - -type Group struct { - checkers []Checker -} - -func NewGroup() *Group { return &Group{} } - -func (g *Group) Add(c ...Checker) { - for _, ck := range c { - if ck == nil { - continue - } - g.checkers = append(g.checkers, ck) - } -} - -func (g *Group) Len() int { return len(g.checkers) } - -func (g *Group) Names() []string { - names := make([]string, len(g.checkers)) - for i, c := range g.checkers { - names[i] = c.Name() - } - return names -} - -func (g *Group) CheckAll(ctx context.Context) (string, error) { - for _, c := range g.checkers { - if err := c.Check(ctx); err != nil { - return c.Name(), err - } - } - return "", nil -} - -func TCP(name, addr string) Checker { - return Func{ComponentName: name, Probe: func(ctx context.Context) error { - probeCtx, cancel := context.WithTimeout(ctx, dialTimeout) - defer cancel() - var d net.Dialer - conn, err := d.DialContext(probeCtx, "tcp", addr) - if err != nil { - return fmt.Errorf("dial tcp %s: %w", addr, err) - } - return conn.Close() - }} -} - -func UnixSocket(name, path string) Checker { - return Func{ComponentName: name, Probe: func(ctx context.Context) error { - probeCtx, cancel := context.WithTimeout(ctx, dialTimeout) - defer cancel() - var d net.Dialer - conn, err := d.DialContext(probeCtx, "unix", path) - if err != nil { - return fmt.Errorf("dial unix %s: %w", path, err) - } - return conn.Close() - }} -} - -func HTTPGet(name, url string) Checker { +func HTTPGet(name, url string) healthz.HealthChecker { client := &http.Client{ Transport: &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, DisableKeepAlives: true, }, } - return Func{ComponentName: name, Probe: func(ctx context.Context) error { - probeCtx, cancel := context.WithTimeout(ctx, dialTimeout) + return healthz.NamedCheck(name, func(_ *http.Request) error { + ctx, cancel := context.WithTimeout(context.Background(), dialTimeout) defer cancel() - req, err := http.NewRequestWithContext(probeCtx, http.MethodGet, url, nil) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return err } @@ -111,5 +40,41 @@ func HTTPGet(name, url string) Checker { return fmt.Errorf("get %s: status %d", url, resp.StatusCode) } return nil - }} + }) +} + +func TCP(name, addr string) healthz.HealthChecker { + return healthz.NamedCheck(name, func(_ *http.Request) error { + ctx, cancel := context.WithTimeout(context.Background(), dialTimeout) + defer cancel() + var d net.Dialer + conn, err := d.DialContext(ctx, "tcp", addr) + if err != nil { + return fmt.Errorf("dial tcp %s: %w", addr, err) + } + return conn.Close() + }) +} + +func GRPC(name, target string) healthz.HealthChecker { + target = strings.TrimPrefix(target, "unix://") + dialTarget := "unix:" + target + return healthz.NamedCheck(name, func(_ *http.Request) error { + ctx, cancel := context.WithTimeout(context.Background(), dialTimeout) + defer cancel() + conn, err := grpc.NewClient(dialTarget, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return fmt.Errorf("dial grpc %s: %w", dialTarget, err) + } + defer conn.Close() + client := healthpb.NewHealthClient(conn) + resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{}) + if err != nil { + return fmt.Errorf("grpc health check %s: %w", dialTarget, err) + } + if resp.Status != healthpb.HealthCheckResponse_SERVING { + return fmt.Errorf("grpc health check %s: status %s", dialTarget, resp.Status) + } + return nil + }) } diff --git a/pkg/daemons/health/health_test.go b/pkg/daemons/health/health_test.go index 22716d0a0de..baf4254b396 100644 --- a/pkg/daemons/health/health_test.go +++ b/pkg/daemons/health/health_test.go @@ -2,54 +2,16 @@ package health import ( "context" - "errors" "net" "net/http" "net/http/httptest" - "os" "path/filepath" "testing" + + "google.golang.org/grpc" + healthpb "google.golang.org/grpc/health/grpc_health_v1" ) -func Test_UnitGroupCheckAll(t *testing.T) { - boom := errors.New("boom") - g := NewGroup() - g.Add( - Func{ComponentName: "first", Probe: func(_ context.Context) error { return nil }}, - Func{ComponentName: "second", Probe: func(_ context.Context) error { return boom }}, - Func{ComponentName: "third", Probe: func(_ context.Context) error { t.Fatal("third should not run after second failed"); return nil }}, - ) - name, err := g.CheckAll(context.Background()) - if name != "second" { - t.Errorf("expected first failure to be %q, got %q", "second", name) - } - if !errors.Is(err, boom) { - t.Errorf("expected wrapped boom error, got %v", err) - } -} - -func Test_UnitGroupCheckAllPasses(t *testing.T) { - g := NewGroup() - g.Add( - Func{ComponentName: "a", Probe: func(_ context.Context) error { return nil }}, - Func{ComponentName: "b", Probe: func(_ context.Context) error { return nil }}, - ) - if name, err := g.CheckAll(context.Background()); name != "" || err != nil { - t.Errorf("expected ('', nil), got (%q, %v)", name, err) - } -} - -func Test_UnitGroupAddSkipsNil(t *testing.T) { - g := NewGroup() - g.Add(nil, Func{ComponentName: "x", Probe: func(_ context.Context) error { return nil }}, nil) - if g.Len() != 1 { - t.Errorf("expected nil checkers to be skipped; got Len=%d", g.Len()) - } - if names := g.Names(); len(names) != 1 || names[0] != "x" { - t.Errorf("unexpected names: %v", names) - } -} - func Test_UnitTCPChecker(t *testing.T) { ln, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { @@ -57,54 +19,37 @@ func Test_UnitTCPChecker(t *testing.T) { } t.Cleanup(func() { ln.Close() }) - if err := TCP("ok", ln.Addr().String()).Check(context.Background()); err != nil { + c := TCP("ok", ln.Addr().String()) + if c.Name() != "ok" { + t.Errorf("Name() = %q, want %q", c.Name(), "ok") + } + if err := c.Check(nil); err != nil { t.Errorf("expected open port to pass, got %v", err) } ln.Close() - if err := TCP("closed", ln.Addr().String()).Check(context.Background()); err == nil { + if err := TCP("closed", ln.Addr().String()).Check(nil); err == nil { t.Errorf("expected closed port to fail") } } -func Test_UnitUnixSocketChecker(t *testing.T) { - dir := t.TempDir() - socket := filepath.Join(dir, "test.sock") - - ln, err := net.Listen("unix", socket) - if err != nil { - t.Fatalf("listen unix: %v", err) - } - t.Cleanup(func() { ln.Close() }) - - if err := UnixSocket("ok", socket).Check(context.Background()); err != nil { - t.Errorf("expected open socket to pass, got %v", err) - } - - ln.Close() - _ = os.Remove(socket) - if err := UnixSocket("missing", socket).Check(context.Background()); err == nil { - t.Errorf("expected missing socket to fail") - } -} - func Test_UnitHTTPGetChecker(t *testing.T) { srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { case "/ok": w.WriteHeader(http.StatusOK) - case "/fail": + default: w.WriteHeader(http.StatusInternalServerError) } })) t.Cleanup(srv.Close) - if err := HTTPGet("ok", srv.URL+"/ok").Check(context.Background()); err != nil { - t.Errorf("expected 200 response to pass, got %v", err) + if err := HTTPGet("ok", srv.URL+"/ok").Check(nil); err != nil { + t.Errorf("expected 200 to pass, got %v", err) } - if err := HTTPGet("fail", srv.URL+"/fail").Check(context.Background()); err == nil { - t.Errorf("expected 500 response to fail") + if err := HTTPGet("fail", srv.URL+"/fail").Check(nil); err == nil { + t.Errorf("expected 500 to fail") } - if err := HTTPGet("dial", "http://127.0.0.1:1/never").Check(context.Background()); err == nil { + if err := HTTPGet("dial", "http://127.0.0.1:1/never").Check(nil); err == nil { t.Errorf("expected unreachable URL to fail") } } @@ -115,7 +60,67 @@ func Test_UnitHTTPGetCheckerSkipsTLSVerify(t *testing.T) { w.WriteHeader(http.StatusOK) })) t.Cleanup(srv.Close) - if err := HTTPGet("tls", srv.URL+"/livez").Check(context.Background()); err != nil { + if err := HTTPGet("tls", srv.URL+"/livez").Check(nil); err != nil { t.Errorf("expected TLS-skip-verify probe to pass against self-signed cert, got %v", err) } } + +// healthServer is a minimal implementation of grpc.health.v1.Health that +// returns a configurable status — used to test the gRPC health checker +// against both SERVING and NOT_SERVING responses. +type healthServer struct { + healthpb.UnimplementedHealthServer + status healthpb.HealthCheckResponse_ServingStatus +} + +func (h *healthServer) Check(_ context.Context, _ *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) { + return &healthpb.HealthCheckResponse{Status: h.status}, nil +} + +func startHealthServer(t *testing.T, status healthpb.HealthCheckResponse_ServingStatus) string { + t.Helper() + dir := t.TempDir() + socket := filepath.Join(dir, "grpc.sock") + + ln, err := net.Listen("unix", socket) + if err != nil { + t.Fatalf("listen unix: %v", err) + } + srv := grpc.NewServer() + healthpb.RegisterHealthServer(srv, &healthServer{status: status}) + go srv.Serve(ln) + t.Cleanup(func() { + srv.Stop() + ln.Close() + }) + return socket +} + +func Test_UnitGRPCCheckerServing(t *testing.T) { + socket := startHealthServer(t, healthpb.HealthCheckResponse_SERVING) + if err := GRPC("cri", socket).Check(nil); err != nil { + t.Errorf("expected SERVING status to pass, got %v", err) + } +} + +func Test_UnitGRPCCheckerNotServing(t *testing.T) { + socket := startHealthServer(t, healthpb.HealthCheckResponse_NOT_SERVING) + if err := GRPC("cri", socket).Check(nil); err == nil { + t.Errorf("expected NOT_SERVING status to fail") + } +} + +func Test_UnitGRPCCheckerStripsUnixScheme(t *testing.T) { + socket := startHealthServer(t, healthpb.HealthCheckResponse_SERVING) + if err := GRPC("cri", "unix://"+socket).Check(nil); err != nil { + t.Errorf("expected unix:// scheme to be stripped, got %v", err) + } +} + +func Test_UnitGRPCCheckerMissingSocket(t *testing.T) { + dir := t.TempDir() + socket := filepath.Join(dir, "absent.sock") + if err := GRPC("cri", socket).Check(nil); err == nil { + t.Errorf("expected missing socket to fail") + } +} diff --git a/pkg/daemons/watchdog/watchdog.go b/pkg/daemons/watchdog/watchdog.go index 9a8fde6b67f..bb395b42f59 100644 --- a/pkg/daemons/watchdog/watchdog.go +++ b/pkg/daemons/watchdog/watchdog.go @@ -1,20 +1,21 @@ -// Package watchdog implements the k3s side of the systemd watchdog protocol. +// Package watchdog implements the k3s side of the systemd notify / watchdog +// protocol. // -// k3s strips NOTIFY_SOCKET from the process environment early in startup so -// that embedded components (kubelet, etcd, kine, etc.) cannot send READY=1 or -// WATCHDOG=1 to systemd on behalf of the whole process. That is intentional: -// the kubelet by itself does not know whether etcd, the API server, the CRI -// runtime, and the other in-process components are alive, so letting it ping -// the watchdog would mask whole-process failures. +// k3s strips NOTIFY_SOCKET (and WATCHDOG_USEC) from the process environment +// early in startup so embedded components — kubelet, etcd, kine, etc. — +// cannot ping systemd on behalf of the whole process. That is intentional: +// the kubelet by itself has no visibility into etcd, the apiserver, or the +// CRI runtime, so letting it ping the watchdog would mask whole-process +// failures. // -// READY=1 is still sent the usual way via systemd.SdNotify by the server / +// READY=1 is still sent the usual way via systemd.SdNotify in the server / // agent startup code, which temporarily restores NOTIFY_SOCKET and then // unsets it again. This package owns the periodic WATCHDOG=1 pings: callers -// pass in the cached NOTIFY_SOCKET value they captured before it was -// stripped, plus a health.Group covering every component that must be alive -// for the process to be considered healthy. WATCHDOG=1 is only sent while -// every Checker in the group passes; otherwise the loop stays quiet and -// systemd will restart the unit after WatchdogSec. +// pass in the cached NOTIFY_SOCKET path and WATCHDOG_USEC interval that they +// captured before stripping, plus the set of healthz.HealthCheckers covering +// every component that must be alive for the process to be considered +// healthy. WATCHDOG=1 is only sent while every check passes; otherwise the +// loop stays quiet and systemd will restart the unit after WatchdogSec. package watchdog import ( @@ -24,31 +25,30 @@ import ( "time" systemd "github.com/coreos/go-systemd/v22/daemon" - "github.com/k3s-io/k3s/pkg/daemons/health" "github.com/sirupsen/logrus" + "k8s.io/apiserver/pkg/server/healthz" ) -func Run(ctx context.Context, socketPath string, group *health.Group) { +func Run(ctx context.Context, socketPath string, interval time.Duration, checkers []healthz.HealthChecker) { if socketPath == "" { return } - if group == nil || group.Len() == 0 { - logrus.Warn("systemd watchdog: no health checks registered, notifier disabled") - return - } - interval, err := systemd.SdWatchdogEnabled(false) - if err != nil { - logrus.Warnf("systemd watchdog: failed to read WATCHDOG_USEC: %v", err) - return - } - if interval == 0 { + if interval <= 0 { logrus.Debug("systemd watchdog: not enabled by unit, notifier disabled") return } + if len(checkers) == 0 { + logrus.Warn("systemd watchdog: no health checks registered, notifier disabled") + return + } tick := interval / 2 + names := make([]string, len(checkers)) + for i, c := range checkers { + names[i] = c.Name() + } logrus.Infof("systemd watchdog: pinging every %s (WatchdogSec=%s), monitoring components %v", - tick, interval, group.Names()) + tick, interval, names) ticker := time.NewTicker(tick) defer ticker.Stop() @@ -58,7 +58,7 @@ func Run(ctx context.Context, socketPath string, group *health.Group) { case <-ctx.Done(): return case <-ticker.C: - if name, err := group.CheckAll(ctx); err != nil { + if name, err := check(checkers); err != nil { logrus.Warnf("systemd watchdog: %q is unhealthy, withholding WATCHDOG=1: %v", name, err) continue } @@ -69,9 +69,15 @@ func Run(ctx context.Context, socketPath string, group *health.Group) { } } -// notify writes a single state line as a datagram to the systemd notify -// socket. The socket is SOCK_DGRAM, so each call is a self-contained -// message and we do not need to manage a persistent connection. +func check(checkers []healthz.HealthChecker) (string, error) { + for _, c := range checkers { + if err := c.Check(nil); err != nil { + return c.Name(), err + } + } + return "", nil +} + func notify(socketPath, state string) error { if socketPath == "" { return errors.New("watchdog: empty notify socket path") diff --git a/pkg/daemons/watchdog/watchdog_test.go b/pkg/daemons/watchdog/watchdog_test.go index e53a078df46..512e14aeef6 100644 --- a/pkg/daemons/watchdog/watchdog_test.go +++ b/pkg/daemons/watchdog/watchdog_test.go @@ -4,13 +4,14 @@ import ( "context" "errors" "net" + "net/http" "os" "path/filepath" "sync/atomic" "testing" "time" - "github.com/k3s-io/k3s/pkg/daemons/health" + "k8s.io/apiserver/pkg/server/healthz" ) // startNotifyListener opens a unix datagram socket at a temporary path and @@ -44,28 +45,31 @@ func startNotifyListener(t *testing.T) (string, <-chan string) { return path, out } -// withWatchdogEnv sets WATCHDOG_USEC for the duration of the test so the -// notify loop actually ticks. -func withWatchdogEnv(t *testing.T, usec string) { - t.Helper() - prev, had := os.LookupEnv("WATCHDOG_USEC") - if err := os.Setenv("WATCHDOG_USEC", usec); err != nil { - t.Fatalf("setenv: %v", err) - } - t.Cleanup(func() { - if had { - os.Setenv("WATCHDOG_USEC", prev) - } else { - os.Unsetenv("WATCHDOG_USEC") - } - }) +// fakeChecker returns a healthz.HealthChecker that calls fn and exposes a +// counter of invocations. +type fakeChecker struct { + name string + fn func() error + calls atomic.Int32 +} + +func (f *fakeChecker) Name() string { return f.name } +func (f *fakeChecker) Check(_ *http.Request) error { f.calls.Add(1); return f.fn() } + +func ok(name string) *fakeChecker { + return &fakeChecker{name: name, fn: func() error { return nil }} +} + +func bad(name string, err error) *fakeChecker { + return &fakeChecker{name: name, fn: func() error { return err }} } func Test_UnitWatchdogNoSocketReturnsImmediately(t *testing.T) { - g := health.NewGroup() - g.Add(health.Func{ComponentName: "x", Probe: func(_ context.Context) error { return nil }}) done := make(chan struct{}) - go func() { Run(context.Background(), "", g); close(done) }() + go func() { + Run(context.Background(), "", time.Second, []healthz.HealthChecker{ok("x")}) + close(done) + }() select { case <-done: case <-time.After(time.Second): @@ -73,51 +77,40 @@ func Test_UnitWatchdogNoSocketReturnsImmediately(t *testing.T) { } } -func Test_UnitWatchdogEmptyGroupReturnsImmediately(t *testing.T) { +func Test_UnitWatchdogZeroIntervalReturnsImmediately(t *testing.T) { socket, _ := startNotifyListener(t) - withWatchdogEnv(t, "200000") // 200ms - done := make(chan struct{}) - go func() { Run(context.Background(), socket, health.NewGroup()); close(done) }() + go func() { + Run(context.Background(), socket, 0, []healthz.HealthChecker{ok("x")}) + close(done) + }() select { case <-done: case <-time.After(time.Second): - t.Fatal("Run did not return when group was empty") + t.Fatal("Run did not return when interval was zero") } } -func Test_UnitWatchdogNoEnvReturnsImmediately(t *testing.T) { +func Test_UnitWatchdogEmptyCheckersReturnsImmediately(t *testing.T) { socket, _ := startNotifyListener(t) - // Make sure WATCHDOG_USEC is unset. - prev, had := os.LookupEnv("WATCHDOG_USEC") - os.Unsetenv("WATCHDOG_USEC") - t.Cleanup(func() { - if had { - os.Setenv("WATCHDOG_USEC", prev) - } - }) - - g := health.NewGroup() - g.Add(health.Func{ComponentName: "x", Probe: func(_ context.Context) error { return nil }}) done := make(chan struct{}) - go func() { Run(context.Background(), socket, g); close(done) }() + go func() { + Run(context.Background(), socket, time.Second, nil) + close(done) + }() select { case <-done: case <-time.After(time.Second): - t.Fatal("Run did not return when WATCHDOG_USEC was unset") + t.Fatal("Run did not return when checkers was empty") } } func Test_UnitWatchdogPingsWhenHealthy(t *testing.T) { socket, msgs := startNotifyListener(t) - withWatchdogEnv(t, "100000") - - g := health.NewGroup() - g.Add(health.Func{ComponentName: "ok", Probe: func(_ context.Context) error { return nil }}) - ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go Run(ctx, socket, g) + + go Run(ctx, socket, 100*time.Millisecond, []healthz.HealthChecker{ok("ok")}) select { case got := <-msgs: @@ -131,39 +124,30 @@ func Test_UnitWatchdogPingsWhenHealthy(t *testing.T) { func Test_UnitWatchdogWithholdsPingWhenUnhealthy(t *testing.T) { socket, msgs := startNotifyListener(t) - withWatchdogEnv(t, "100000") - - var calls atomic.Int32 - g := health.NewGroup() - g.Add(health.Func{ComponentName: "bad", Probe: func(_ context.Context) error { - calls.Add(1) - return errors.New("unhealthy") - }}) - ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go Run(ctx, socket, g) + + checker := bad("bad", errors.New("unhealthy")) + go Run(ctx, socket, 100*time.Millisecond, []healthz.HealthChecker{checker}) select { case got := <-msgs: t.Fatalf("did not expect any WATCHDOG=1 ping, got %q", got) case <-time.After(500 * time.Millisecond): } - if calls.Load() == 0 { + if checker.calls.Load() == 0 { t.Fatal("expected checker to have been invoked at least once") } } func Test_UnitWatchdogStopsOnContextCancel(t *testing.T) { socket, _ := startNotifyListener(t) - withWatchdogEnv(t, "100000") - - g := health.NewGroup() - g.Add(health.Func{ComponentName: "ok", Probe: func(_ context.Context) error { return nil }}) - ctx, cancel := context.WithCancel(context.Background()) done := make(chan struct{}) - go func() { Run(ctx, socket, g); close(done) }() + go func() { + Run(ctx, socket, 100*time.Millisecond, []healthz.HealthChecker{ok("ok")}) + close(done) + }() cancel() select { From 393875450fbf3ec715283aa86fb5e3210706e8b4 Mon Sep 17 00:00:00 2001 From: jokestax Date: Mon, 18 May 2026 16:05:04 +0000 Subject: [PATCH 3/3] fix(watchdog): authenticate apiserver /livez and rename probe constructors to suggesed format Signed-off-by: jokestax --- pkg/agent/run.go | 4 +- pkg/cli/server/server.go | 20 +++--- pkg/daemons/health/health.go | 41 +++++++++++- pkg/daemons/health/health_test.go | 100 +++++++++++++++++++++++++++--- 4 files changed, 141 insertions(+), 24 deletions(-) diff --git a/pkg/agent/run.go b/pkg/agent/run.go index 6dec618b3b0..d9def1e1f9f 100644 --- a/pkg/agent/run.go +++ b/pkg/agent/run.go @@ -191,8 +191,8 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error { // watchdog whenever kube-proxy is disabled. func agentHealthCheckers(nodeConfig *daemonconfig.Node) []healthz.HealthChecker { return []healthz.HealthChecker{ - health.HTTPGet("kubelet", "http://127.0.0.1:10248/healthz"), - health.GRPC("cri", nodeConfig.AgentConfig.RuntimeSocket), + health.NewHTTPGetHealthz("kubelet", "http://127.0.0.1:10248/healthz"), + health.NewGRPCHealthz("cri", nodeConfig.AgentConfig.RuntimeSocket), } } diff --git a/pkg/cli/server/server.go b/pkg/cli/server/server.go index ec24fd81b7e..c09ceb8bca0 100644 --- a/pkg/cli/server/server.go +++ b/pkg/cli/server/server.go @@ -646,27 +646,29 @@ func serverHealthCheckers(cc *config.Control, agentDisabled bool) []healthz.Heal host := cc.Loopback(false) if !cc.DisableAPIServer && cc.HTTPSPort > 0 { + // k3s starts kube-apiserver with --anonymous-auth=false, so /livez + // requires a client cert; the admin client cert is the simplest + // in-process identity that satisfies authn + authorization. url := fmt.Sprintf("https://%s/livez", net.JoinHostPort(host, strconv.Itoa(cc.HTTPSPort))) - checkers = append(checkers, health.HTTPGet("kube-apiserver", url)) + checkers = append(checkers, health.NewHTTPGetWithClientCertHealthz("kube-apiserver", url, + cc.Runtime.ClientAdminCert, cc.Runtime.ClientAdminKey)) } if !cc.DisableETCD { - checkers = append(checkers, health.HTTPGet("etcd", fmt.Sprintf("http://%s/health?serializable=true", net.JoinHostPort(host, "2381")))) + checkers = append(checkers, health.NewHTTPGetHealthz("etcd", fmt.Sprintf("http://%s/health?serializable=true", net.JoinHostPort(host, "2381")))) } if !cc.DisableControllerManager { - checkers = append(checkers, health.HTTPGet("kube-controller-manager", fmt.Sprintf("https://%s/healthz", net.JoinHostPort(host, "10257")))) + checkers = append(checkers, health.NewHTTPGetHealthz("kube-controller-manager", fmt.Sprintf("https://%s/healthz", net.JoinHostPort(host, "10257")))) } if !cc.DisableScheduler { - checkers = append(checkers, health.HTTPGet("kube-scheduler", fmt.Sprintf("https://%s/healthz", net.JoinHostPort(host, "10259")))) + checkers = append(checkers, health.NewHTTPGetHealthz("kube-scheduler", fmt.Sprintf("https://%s/healthz", net.JoinHostPort(host, "10259")))) } if cc.SupervisorPort > 0 { - // The supervisor has no HTTP /healthz; a TCP probe verifies the - // listener is still bound. - checkers = append(checkers, health.TCP("supervisor", net.JoinHostPort(host, strconv.Itoa(cc.SupervisorPort)))) + checkers = append(checkers, health.NewHTTPGetHealthz("supervisor", fmt.Sprintf("https://%s/ping", net.JoinHostPort(host, strconv.Itoa(cc.SupervisorPort))))) } if !agentDisabled { - checkers = append(checkers, health.HTTPGet("kubelet", fmt.Sprintf("http://%s/healthz", net.JoinHostPort(host, "10248")))) + checkers = append(checkers, health.NewHTTPGetHealthz("kubelet", fmt.Sprintf("http://%s/healthz", net.JoinHostPort(host, "10248")))) if !cc.DisableKubeProxy { - checkers = append(checkers, health.HTTPGet("kube-proxy", fmt.Sprintf("http://%s/healthz", net.JoinHostPort(host, "10256")))) + checkers = append(checkers, health.NewHTTPGetHealthz("kube-proxy", fmt.Sprintf("http://%s/healthz", net.JoinHostPort(host, "10256")))) } } return checkers diff --git a/pkg/daemons/health/health.go b/pkg/daemons/health/health.go index b3ded49b0f2..824fd676e29 100644 --- a/pkg/daemons/health/health.go +++ b/pkg/daemons/health/health.go @@ -17,7 +17,7 @@ import ( const dialTimeout = 5 * time.Second -func HTTPGet(name, url string) healthz.HealthChecker { +func NewHTTPGetHealthz(name, url string) healthz.HealthChecker { client := &http.Client{ Transport: &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, @@ -43,7 +43,42 @@ func HTTPGet(name, url string) healthz.HealthChecker { }) } -func TCP(name, addr string) healthz.HealthChecker { +func NewHTTPGetWithClientCertHealthz(name, url, certFile, keyFile string) healthz.HealthChecker { + cert, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + return healthz.NamedCheck(name, func(_ *http.Request) error { + return fmt.Errorf("load client cert %s/%s: %w", certFile, keyFile, err) + }) + } + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + Certificates: []tls.Certificate{cert}, + }, + DisableKeepAlives: true, + }, + } + return healthz.NamedCheck(name, func(_ *http.Request) error { + ctx, cancel := context.WithTimeout(context.Background(), dialTimeout) + defer cancel() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return err + } + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("get %s: %w", url, err) + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("get %s: status %d", url, resp.StatusCode) + } + return nil + }) +} + +func NewTCPConnectHealthz(name, addr string) healthz.HealthChecker { return healthz.NamedCheck(name, func(_ *http.Request) error { ctx, cancel := context.WithTimeout(context.Background(), dialTimeout) defer cancel() @@ -56,7 +91,7 @@ func TCP(name, addr string) healthz.HealthChecker { }) } -func GRPC(name, target string) healthz.HealthChecker { +func NewGRPCHealthz(name, target string) healthz.HealthChecker { target = strings.TrimPrefix(target, "unix://") dialTarget := "unix:" + target return healthz.NamedCheck(name, func(_ *http.Request) error { diff --git a/pkg/daemons/health/health_test.go b/pkg/daemons/health/health_test.go index baf4254b396..fcadf8bf1ff 100644 --- a/pkg/daemons/health/health_test.go +++ b/pkg/daemons/health/health_test.go @@ -2,11 +2,20 @@ package health import ( "context" + "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "math/big" "net" "net/http" "net/http/httptest" + "os" "path/filepath" "testing" + "time" "google.golang.org/grpc" healthpb "google.golang.org/grpc/health/grpc_health_v1" @@ -19,7 +28,7 @@ func Test_UnitTCPChecker(t *testing.T) { } t.Cleanup(func() { ln.Close() }) - c := TCP("ok", ln.Addr().String()) + c := NewTCPConnectHealthz("ok", ln.Addr().String()) if c.Name() != "ok" { t.Errorf("Name() = %q, want %q", c.Name(), "ok") } @@ -27,7 +36,7 @@ func Test_UnitTCPChecker(t *testing.T) { t.Errorf("expected open port to pass, got %v", err) } ln.Close() - if err := TCP("closed", ln.Addr().String()).Check(nil); err == nil { + if err := NewTCPConnectHealthz("closed", ln.Addr().String()).Check(nil); err == nil { t.Errorf("expected closed port to fail") } } @@ -43,13 +52,13 @@ func Test_UnitHTTPGetChecker(t *testing.T) { })) t.Cleanup(srv.Close) - if err := HTTPGet("ok", srv.URL+"/ok").Check(nil); err != nil { + if err := NewHTTPGetHealthz("ok", srv.URL+"/ok").Check(nil); err != nil { t.Errorf("expected 200 to pass, got %v", err) } - if err := HTTPGet("fail", srv.URL+"/fail").Check(nil); err == nil { + if err := NewHTTPGetHealthz("fail", srv.URL+"/fail").Check(nil); err == nil { t.Errorf("expected 500 to fail") } - if err := HTTPGet("dial", "http://127.0.0.1:1/never").Check(nil); err == nil { + if err := NewHTTPGetHealthz("dial", "http://127.0.0.1:1/never").Check(nil); err == nil { t.Errorf("expected unreachable URL to fail") } } @@ -60,11 +69,82 @@ func Test_UnitHTTPGetCheckerSkipsTLSVerify(t *testing.T) { w.WriteHeader(http.StatusOK) })) t.Cleanup(srv.Close) - if err := HTTPGet("tls", srv.URL+"/livez").Check(nil); err != nil { + if err := NewHTTPGetHealthz("tls", srv.URL+"/livez").Check(nil); err != nil { t.Errorf("expected TLS-skip-verify probe to pass against self-signed cert, got %v", err) } } +func Test_UnitHTTPGetWithClientCertChecker(t *testing.T) { + // TLS server that requires (and inspects) a client cert. + srv := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if len(r.TLS.PeerCertificates) == 0 { + http.Error(w, "no client cert", http.StatusUnauthorized) + return + } + w.WriteHeader(http.StatusOK) + })) + srv.TLS = &tls.Config{ClientAuth: tls.RequireAnyClientCert} + srv.StartTLS() + t.Cleanup(srv.Close) + + // Generate a throwaway client cert + key on disk. + certPath, keyPath := writeSelfSignedCert(t) + + if err := NewHTTPGetWithClientCertHealthz("apiserver", srv.URL+"/livez", certPath, keyPath).Check(nil); err != nil { + t.Errorf("expected probe with client cert to pass, got %v", err) + } + + // Without a cert the same endpoint should 401. + if err := NewHTTPGetHealthz("apiserver-anon", srv.URL+"/livez").Check(nil); err == nil { + t.Errorf("expected anonymous probe to fail without client cert") + } +} + +func Test_UnitHTTPGetWithClientCertMissingFiles(t *testing.T) { + // Cert path doesn't exist — Check should return an error every call, + // not panic. + c := NewHTTPGetWithClientCertHealthz("apiserver", "https://127.0.0.1:1/livez", "/nonexistent.crt", "/nonexistent.key") + if err := c.Check(nil); err == nil { + t.Errorf("expected missing cert files to surface as a Check error") + } +} + +// writeSelfSignedCert generates an in-memory self-signed cert and writes the +// PEM-encoded cert and key to a tempdir. Returns their paths. +func writeSelfSignedCert(t *testing.T) (certPath, keyPath string) { + t.Helper() + key, err := rsa.GenerateKey(rand.Reader, 2048) + if err != nil { + t.Fatalf("generate key: %v", err) + } + tmpl := &x509.Certificate{ + SerialNumber: big.NewInt(1), + Subject: pkix.Name{CommonName: "test-client"}, + NotBefore: time.Now().Add(-time.Hour), + NotAfter: time.Now().Add(time.Hour), + KeyUsage: x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth}, + } + der, err := x509.CreateCertificate(rand.Reader, tmpl, tmpl, &key.PublicKey, key) + if err != nil { + t.Fatalf("create cert: %v", err) + } + dir := t.TempDir() + certPath = filepath.Join(dir, "client.crt") + keyPath = filepath.Join(dir, "client.key") + if err := os.WriteFile(certPath, pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: der}), 0600); err != nil { + t.Fatalf("write cert: %v", err) + } + keyDER, err := x509.MarshalPKCS8PrivateKey(key) + if err != nil { + t.Fatalf("marshal key: %v", err) + } + if err := os.WriteFile(keyPath, pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: keyDER}), 0600); err != nil { + t.Fatalf("write key: %v", err) + } + return certPath, keyPath +} + // healthServer is a minimal implementation of grpc.health.v1.Health that // returns a configurable status — used to test the gRPC health checker // against both SERVING and NOT_SERVING responses. @@ -98,21 +178,21 @@ func startHealthServer(t *testing.T, status healthpb.HealthCheckResponse_Serving func Test_UnitGRPCCheckerServing(t *testing.T) { socket := startHealthServer(t, healthpb.HealthCheckResponse_SERVING) - if err := GRPC("cri", socket).Check(nil); err != nil { + if err := NewGRPCHealthz("cri", socket).Check(nil); err != nil { t.Errorf("expected SERVING status to pass, got %v", err) } } func Test_UnitGRPCCheckerNotServing(t *testing.T) { socket := startHealthServer(t, healthpb.HealthCheckResponse_NOT_SERVING) - if err := GRPC("cri", socket).Check(nil); err == nil { + if err := NewGRPCHealthz("cri", socket).Check(nil); err == nil { t.Errorf("expected NOT_SERVING status to fail") } } func Test_UnitGRPCCheckerStripsUnixScheme(t *testing.T) { socket := startHealthServer(t, healthpb.HealthCheckResponse_SERVING) - if err := GRPC("cri", "unix://"+socket).Check(nil); err != nil { + if err := NewGRPCHealthz("cri", "unix://"+socket).Check(nil); err != nil { t.Errorf("expected unix:// scheme to be stripped, got %v", err) } } @@ -120,7 +200,7 @@ func Test_UnitGRPCCheckerStripsUnixScheme(t *testing.T) { func Test_UnitGRPCCheckerMissingSocket(t *testing.T) { dir := t.TempDir() socket := filepath.Join(dir, "absent.sock") - if err := GRPC("cri", socket).Check(nil); err == nil { + if err := NewGRPCHealthz("cri", socket).Check(nil); err == nil { t.Errorf("expected missing socket to fail") } }