diff --git a/api/sys_health.go b/api/sys_health.go index 6868b96d77..4379e8e08a 100644 --- a/api/sys_health.go +++ b/api/sys_health.go @@ -25,6 +25,8 @@ func (c *Sys) HealthWithContext(ctx context.Context) (*HealthResponse, error) { r.Params.Add("standbycode", "299") r.Params.Add("drsecondarycode", "299") r.Params.Add("performancestandbycode", "299") + r.Params.Add("removedcode", "299") + r.Params.Add("haunhealthycode", "299") resp, err := c.c.rawRequestWithContext(ctx, r) if err != nil { @@ -38,19 +40,22 @@ func (c *Sys) HealthWithContext(ctx context.Context) (*HealthResponse, error) { } type HealthResponse struct { - Initialized bool `json:"initialized"` - Sealed bool `json:"sealed"` - Standby bool `json:"standby"` - PerformanceStandby bool `json:"performance_standby"` - ReplicationPerformanceMode string `json:"replication_performance_mode"` - ReplicationDRMode string `json:"replication_dr_mode"` - ServerTimeUTC int64 `json:"server_time_utc"` - Version string `json:"version"` - ClusterName string `json:"cluster_name,omitempty"` - ClusterID string `json:"cluster_id,omitempty"` - LastWAL uint64 `json:"last_wal,omitempty"` - Enterprise bool `json:"enterprise"` - EchoDurationMillis int64 `json:"echo_duration_ms"` - ClockSkewMillis int64 `json:"clock_skew_ms"` - ReplicationPrimaryCanaryAgeMillis int64 `json:"replication_primary_canary_age_ms"` + Initialized bool `json:"initialized"` + Sealed bool `json:"sealed"` + Standby bool `json:"standby"` + PerformanceStandby bool `json:"performance_standby"` + ReplicationPerformanceMode string `json:"replication_performance_mode"` + ReplicationDRMode string `json:"replication_dr_mode"` + ServerTimeUTC int64 `json:"server_time_utc"` + Version string `json:"version"` + ClusterName string `json:"cluster_name,omitempty"` + ClusterID string `json:"cluster_id,omitempty"` + LastWAL uint64 `json:"last_wal,omitempty"` + Enterprise bool `json:"enterprise"` + EchoDurationMillis int64 `json:"echo_duration_ms"` + ClockSkewMillis int64 `json:"clock_skew_ms"` + ReplicationPrimaryCanaryAgeMillis int64 `json:"replication_primary_canary_age_ms"` + RemovedFromCluster *bool `json:"removed_from_cluster,omitempty"` + HAConnectionHealthy *bool `json:"ha_connection_healthy,omitempty"` + LastRequestForwardingHeartbeatMillis int64 `json:"last_request_forwarding_heartbeat_ms,omitempty"` } diff --git a/changelog/28991.txt b/changelog/28991.txt new file mode 100644 index 0000000000..e7120258db --- /dev/null +++ b/changelog/28991.txt @@ -0,0 +1,6 @@ +```release-note:change +api: Add to sys/health whether the node has been removed from the HA cluster. If the node has been removed, return code 530 by default or the value of the `removedcode` query parameter. +``` +```release-note:change +api: Add to sys/health whether the standby node has been able to successfully send heartbeats to the active node and the time in milliseconds since the last heartbeat. If the standby has been unable to send a heartbeat, return code 474 by default or the value of the `haunhealthycode` query parameter. +``` diff --git a/http/sys_health.go b/http/sys_health.go index 0ed428d3d8..fece076022 100644 --- a/http/sys_health.go +++ b/http/sys_health.go @@ -158,6 +158,20 @@ func getSysHealth(core *vault.Core, r *http.Request) (int, *HealthResponse, erro perfStandbyCode = code } + haUnhealthyCode := 474 + if code, found, ok := fetchStatusCode(r, "haunhealthycode"); !ok { + return http.StatusBadRequest, nil, nil + } else if found { + haUnhealthyCode = code + } + + removedCode := 530 + if code, found, ok := fetchStatusCode(r, "removedcode"); !ok { + return http.StatusBadRequest, nil, nil + } else if found { + removedCode = code + } + ctx := context.Background() // Check system status @@ -175,13 +189,21 @@ func getSysHealth(core *vault.Core, r *http.Request) (int, *HealthResponse, erro return http.StatusInternalServerError, nil, err } + removed, shouldIncludeRemoved := core.IsRemovedFromCluster() + + haHealthy, lastHeartbeat := core.GetHAHeartbeatHealth() + // Determine the status code code := activeCode switch { case !init: code = uninitCode + case removed: + code = removedCode case sealed: code = sealedCode + case !haHealthy && lastHeartbeat != nil: + code = haUnhealthyCode case replicationState.HasState(consts.ReplicationDRSecondary): code = drSecondaryCode case perfStandby: @@ -233,6 +255,15 @@ func getSysHealth(core *vault.Core, r *http.Request) (int, *HealthResponse, erro return http.StatusInternalServerError, nil, err } + if shouldIncludeRemoved { + body.RemovedFromCluster = &removed + } + + if lastHeartbeat != nil { + body.LastRequestForwardingHeartbeatMillis = lastHeartbeat.Milliseconds() + body.HAConnectionHealthy = &haHealthy + } + if licenseState != nil { body.License = &HealthResponseLicense{ State: licenseState.State, @@ -257,20 +288,23 @@ type HealthResponseLicense struct { } type HealthResponse struct { - Initialized bool `json:"initialized"` - Sealed bool `json:"sealed"` - Standby bool `json:"standby"` - PerformanceStandby bool `json:"performance_standby"` - ReplicationPerformanceMode string `json:"replication_performance_mode"` - ReplicationDRMode string `json:"replication_dr_mode"` - ServerTimeUTC int64 `json:"server_time_utc"` - Version string `json:"version"` - Enterprise bool `json:"enterprise"` - ClusterName string `json:"cluster_name,omitempty"` - ClusterID string `json:"cluster_id,omitempty"` - LastWAL uint64 `json:"last_wal,omitempty"` - License *HealthResponseLicense `json:"license,omitempty"` - EchoDurationMillis int64 `json:"echo_duration_ms"` - ClockSkewMillis int64 `json:"clock_skew_ms"` - ReplicationPrimaryCanaryAgeMillis int64 `json:"replication_primary_canary_age_ms"` + Initialized bool `json:"initialized"` + Sealed bool `json:"sealed"` + Standby bool `json:"standby"` + PerformanceStandby bool `json:"performance_standby"` + ReplicationPerformanceMode string `json:"replication_performance_mode"` + ReplicationDRMode string `json:"replication_dr_mode"` + ServerTimeUTC int64 `json:"server_time_utc"` + Version string `json:"version"` + Enterprise bool `json:"enterprise"` + ClusterName string `json:"cluster_name,omitempty"` + ClusterID string `json:"cluster_id,omitempty"` + LastWAL uint64 `json:"last_wal,omitempty"` + License *HealthResponseLicense `json:"license,omitempty"` + EchoDurationMillis int64 `json:"echo_duration_ms"` + ClockSkewMillis int64 `json:"clock_skew_ms"` + ReplicationPrimaryCanaryAgeMillis int64 `json:"replication_primary_canary_age_ms"` + RemovedFromCluster *bool `json:"removed_from_cluster,omitempty"` + HAConnectionHealthy *bool `json:"ha_connection_healthy,omitempty"` + LastRequestForwardingHeartbeatMillis int64 `json:"last_request_forwarding_heartbeat_ms,omitempty"` } diff --git a/http/sys_health_test.go b/http/sys_health_test.go index dcc3473b06..d2480f4f53 100644 --- a/http/sys_health_test.go +++ b/http/sys_health_test.go @@ -15,6 +15,7 @@ import ( "github.com/hashicorp/vault/helper/constants" "github.com/hashicorp/vault/sdk/helper/consts" "github.com/hashicorp/vault/vault" + "github.com/stretchr/testify/require" ) func TestSysHealth_get(t *testing.T) { @@ -215,3 +216,29 @@ func TestSysHealth_head(t *testing.T) { } } } + +// TestSysHealth_Removed checks that a removed node returns a 530 and sets +// removed from cluster to be true. The test also checks that the removedcode +// query parameter is respected. +func TestSysHealth_Removed(t *testing.T) { + core, err := vault.TestCoreWithMockRemovableNodeHABackend(t, true) + require.NoError(t, err) + vault.TestCoreInit(t, core) + ln, addr := TestServer(t, core) + defer ln.Close() + raw, err := http.Get(addr + "/v1/sys/health") + require.NoError(t, err) + testResponseStatus(t, raw, 530) + healthResp := HealthResponse{} + testResponseBody(t, raw, &healthResp) + require.NotNil(t, healthResp.RemovedFromCluster) + require.True(t, *healthResp.RemovedFromCluster) + + raw, err = http.Get(addr + "/v1/sys/health?removedcode=299") + require.NoError(t, err) + testResponseStatus(t, raw, 299) + secondHealthResp := HealthResponse{} + testResponseBody(t, raw, &secondHealthResp) + require.NotNil(t, secondHealthResp.RemovedFromCluster) + require.True(t, *secondHealthResp.RemovedFromCluster) +} diff --git a/vault/cluster/inmem_layer.go b/vault/cluster/inmem_layer.go index aa28e153c2..3a2943d2be 100644 --- a/vault/cluster/inmem_layer.go +++ b/vault/cluster/inmem_layer.go @@ -116,6 +116,24 @@ func (l *InmemLayer) Listeners() []NetworkListener { return []NetworkListener{l.listener} } +// Partition forces the inmem layer to disconnect itself from peers and prevents +// creating new connections. The returned function will add all peers back +// and re-enable connections +func (l *InmemLayer) Partition() (unpartition func()) { + l.l.Lock() + peersCopy := make([]*InmemLayer, 0, len(l.peers)) + for _, peer := range l.peers { + peersCopy = append(peersCopy, peer) + } + l.l.Unlock() + l.DisconnectAll() + return func() { + for _, peer := range peersCopy { + l.Connect(peer) + } + } +} + // Dial implements NetworkLayer. func (l *InmemLayer) Dial(addr string, timeout time.Duration, tlsConfig *tls.Config) (*tls.Conn, error) { l.l.Lock() diff --git a/vault/core.go b/vault/core.go index 6c9c90087e..bad9a45e96 100644 --- a/vault/core.go +++ b/vault/core.go @@ -529,6 +529,8 @@ type Core struct { rpcClientConn *grpc.ClientConn // The grpc forwarding client rpcForwardingClient *forwardingClient + // The time of the last successful request forwarding heartbeat + rpcLastSuccessfulHeartbeat *atomic.Value // The UUID used to hold the leader lock. Only set on active node leaderUUID string @@ -1092,6 +1094,7 @@ func CreateCore(conf *CoreConfig) (*Core, error) { echoDuration: uberAtomic.NewDuration(0), activeNodeClockSkewMillis: uberAtomic.NewInt64(0), periodicLeaderRefreshInterval: conf.PeriodicLeaderRefreshInterval, + rpcLastSuccessfulHeartbeat: new(atomic.Value), } c.standbyStopCh.Store(make(chan struct{})) diff --git a/vault/external_tests/raft/raft_test.go b/vault/external_tests/raft/raft_test.go index 8c3b36ae6e..dca1f43076 100644 --- a/vault/external_tests/raft/raft_test.go +++ b/vault/external_tests/raft/raft_test.go @@ -31,8 +31,10 @@ import ( vaulthttp "github.com/hashicorp/vault/http" "github.com/hashicorp/vault/internalshared/configutil" "github.com/hashicorp/vault/physical/raft" + "github.com/hashicorp/vault/sdk/helper/jsonutil" "github.com/hashicorp/vault/sdk/logical" "github.com/hashicorp/vault/vault" + "github.com/hashicorp/vault/vault/cluster" vaultseal "github.com/hashicorp/vault/vault/seal" "github.com/stretchr/testify/require" "golang.org/x/net/http2" @@ -1414,3 +1416,126 @@ func TestRaftCluster_Removed_RaftConfig(t *testing.T) { }) require.Eventually(t, follower.Sealed, 10*time.Second, 500*time.Millisecond) } + +// TestSysHealth_Raft creates a raft cluster and verifies that the health status +// is OK for a healthy follower. The test partitions one of the nodes so that it +// can't send request forwarding RPCs. The test verifies that the status +// endpoint shows that HA isn't healthy. Finally, the test removes the +// partitioned follower and unpartitions it. The follower will learn that it has +// been removed, and should return the removed status. +func TestSysHealth_Raft(t *testing.T) { + parseHealthBody := func(t *testing.T, resp *api.Response) *vaulthttp.HealthResponse { + t.Helper() + health := vaulthttp.HealthResponse{} + defer resp.Body.Close() + require.NoError(t, jsonutil.DecodeJSONFromReader(resp.Body, &health)) + return &health + } + + opts := &vault.TestClusterOptions{ + HandlerFunc: vaulthttp.Handler, + NumCores: 3, + InmemClusterLayers: true, + } + heartbeat := 500 * time.Millisecond + teststorage.RaftBackendSetup(nil, opts) + conf := &vault.CoreConfig{ + ClusterHeartbeatInterval: heartbeat, + } + vaultCluster := vault.NewTestCluster(t, conf, opts) + defer vaultCluster.Cleanup() + testhelpers.WaitForActiveNodeAndStandbys(t, vaultCluster) + followerClient := vaultCluster.Cores[1].Client + + t.Run("healthy", func(t *testing.T) { + resp, err := followerClient.Logical().ReadRawWithData("sys/health", map[string][]string{ + "perfstandbyok": {"true"}, + "standbyok": {"true"}, + }) + require.NoError(t, err) + require.Equal(t, resp.StatusCode, 200) + r := parseHealthBody(t, resp) + require.False(t, *r.RemovedFromCluster) + require.True(t, *r.HAConnectionHealthy) + require.Less(t, r.LastRequestForwardingHeartbeatMillis, 2*heartbeat.Milliseconds()) + }) + nl := vaultCluster.Cores[1].NetworkLayer() + inmem, ok := nl.(*cluster.InmemLayer) + require.True(t, ok) + unpartition := inmem.Partition() + + t.Run("partition", func(t *testing.T) { + time.Sleep(2 * heartbeat) + var erroredResponse *api.Response + // the node isn't able to send/receive heartbeats, so it will have + // haunhealthy status. + testhelpers.RetryUntil(t, 3*time.Second, func() error { + resp, err := followerClient.Logical().ReadRawWithData("sys/health", map[string][]string{ + "perfstandbyok": {"true"}, + "standbyok": {"true"}, + }) + if err == nil { + if resp != nil && resp.Body != nil { + resp.Body.Close() + } + return errors.New("expected error") + } + if resp.StatusCode != 474 { + resp.Body.Close() + return fmt.Errorf("status code %d", resp.StatusCode) + } + erroredResponse = resp + return nil + }) + r := parseHealthBody(t, erroredResponse) + require.False(t, *r.RemovedFromCluster) + require.False(t, *r.HAConnectionHealthy) + require.Greater(t, r.LastRequestForwardingHeartbeatMillis, 2*heartbeat.Milliseconds()) + + // ensure haunhealthycode is respected + resp, err := followerClient.Logical().ReadRawWithData("sys/health", map[string][]string{ + "perfstandbyok": {"true"}, + "standbyok": {"true"}, + "haunhealthycode": {"299"}, + }) + require.NoError(t, err) + require.Equal(t, 299, resp.StatusCode) + resp.Body.Close() + }) + + t.Run("remove and unpartition", func(t *testing.T) { + leaderClient := vaultCluster.Cores[0].Client + _, err := leaderClient.Logical().Write("sys/storage/raft/remove-peer", map[string]interface{}{ + "server_id": vaultCluster.Cores[1].NodeID, + }) + require.NoError(t, err) + unpartition() + + var erroredResponse *api.Response + + // now that the node can connect again, it will start getting the removed + // error when trying to connect. The code should be removed, and the ha + // connection will be nil because there is no ha connection + testhelpers.RetryUntil(t, 10*time.Second, func() error { + resp, err := followerClient.Logical().ReadRawWithData("sys/health", map[string][]string{ + "perfstandbyok": {"true"}, + "standbyok": {"true"}, + }) + if err == nil { + if resp != nil && resp.Body != nil { + resp.Body.Close() + } + return fmt.Errorf("expected error") + } + if resp.StatusCode != 530 { + resp.Body.Close() + return fmt.Errorf("status code %d", resp.StatusCode) + } + erroredResponse = resp + return nil + }) + r := parseHealthBody(t, erroredResponse) + require.True(t, true, *r.RemovedFromCluster) + require.Nil(t, r.HAConnectionHealthy) + }) +} diff --git a/vault/ha.go b/vault/ha.go index 46fc7f7757..2368e24f8b 100644 --- a/vault/ha.go +++ b/vault/ha.go @@ -46,6 +46,8 @@ const ( // leaderPrefixCleanDelay is how long to wait between deletions // of orphaned leader keys, to prevent slamming the backend. leaderPrefixCleanDelay = 200 * time.Millisecond + + haAllowedMissedHeartbeats = 2 ) func init() { @@ -1236,3 +1238,24 @@ func (c *Core) getRemovableHABackend() physical.RemovableNodeHABackend { return haBackend } + +// GetHAHeartbeatHealth returns whether a node's last successful heartbeat was +// more than 2 intervals ago. If the node's request forwarding clients were +// cleared (due to the node being sealed or finding a new leader), or the node +// is uninitialized, healthy will be false. +func (c *Core) GetHAHeartbeatHealth() (healthy bool, sinceLastHeartbeat *time.Duration) { + heartbeat := c.rpcLastSuccessfulHeartbeat.Load() + if heartbeat == nil { + return false, nil + } + lastHeartbeat := heartbeat.(time.Time) + if lastHeartbeat.IsZero() { + return false, nil + } + diff := time.Now().Sub(lastHeartbeat) + heartbeatInterval := c.clusterHeartbeatInterval + if heartbeatInterval <= 0 { + heartbeatInterval = 5 * time.Second + } + return diff < heartbeatInterval*haAllowedMissedHeartbeats, &diff +} diff --git a/vault/ha_test.go b/vault/ha_test.go index 77444b99a9..b305749e0e 100644 --- a/vault/ha_test.go +++ b/vault/ha_test.go @@ -10,6 +10,8 @@ import ( "sync/atomic" "testing" "time" + + "github.com/stretchr/testify/require" ) // TestGrabLockOrStop is a non-deterministic test to detect deadlocks in the @@ -85,3 +87,76 @@ func TestGrabLockOrStop(t *testing.T) { } workerWg.Wait() } + +// TestGetHAHeartbeatHealth checks that heartbeat health is correctly determined +// for a variety of scenarios +func TestGetHAHeartbeatHealth(t *testing.T) { + now := time.Now().UTC() + oldLastHeartbeat := now.Add(-1 * time.Hour) + futureHeartbeat := now.Add(10 * time.Second) + zeroHeartbeat := time.Time{} + testCases := []struct { + name string + lastHeartbeat *time.Time + heartbeatInterval time.Duration + wantHealthy bool + }{ + { + name: "old heartbeat", + lastHeartbeat: &oldLastHeartbeat, + heartbeatInterval: 5 * time.Second, + wantHealthy: false, + }, + { + name: "no heartbeat", + lastHeartbeat: nil, + heartbeatInterval: 5 * time.Second, + wantHealthy: false, + }, + { + name: "recent heartbeat", + lastHeartbeat: &now, + heartbeatInterval: 20 * time.Second, + wantHealthy: true, + }, + { + name: "recent heartbeat, empty interval", + lastHeartbeat: &futureHeartbeat, + heartbeatInterval: 0, + wantHealthy: true, + }, + { + name: "old heartbeat, empty interval", + lastHeartbeat: &oldLastHeartbeat, + heartbeatInterval: 0, + wantHealthy: false, + }, + { + name: "zero value heartbeat", + lastHeartbeat: &zeroHeartbeat, + heartbeatInterval: 5 * time.Second, + wantHealthy: false, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + v := new(atomic.Value) + if tc.lastHeartbeat != nil { + v.Store(*tc.lastHeartbeat) + } + c := &Core{ + rpcLastSuccessfulHeartbeat: v, + clusterHeartbeatInterval: tc.heartbeatInterval, + } + + now := time.Now() + gotHealthy, gotLastHeartbeat := c.GetHAHeartbeatHealth() + require.Equal(t, tc.wantHealthy, gotHealthy) + if tc.lastHeartbeat != nil && !tc.lastHeartbeat.IsZero() { + require.InDelta(t, now.Sub(*tc.lastHeartbeat).Milliseconds(), gotLastHeartbeat.Milliseconds(), float64(3*time.Second.Milliseconds())) + } else { + require.Nil(t, gotLastHeartbeat) + } + }) + } +} diff --git a/vault/request_forwarding.go b/vault/request_forwarding.go index a0857c0613..7ad117a6cc 100644 --- a/vault/request_forwarding.go +++ b/vault/request_forwarding.go @@ -442,6 +442,7 @@ func (c *Core) clearForwardingClients() { clusterListener.RemoveClient(consts.RequestForwardingALPN) } c.clusterLeaderParams.Store((*ClusterLeaderParams)(nil)) + c.rpcLastSuccessfulHeartbeat.Store(time.Time{}) } // ForwardRequest forwards a given request to the active node and returns the diff --git a/vault/request_forwarding_rpc.go b/vault/request_forwarding_rpc.go index bcca7d06da..ad4c0fb809 100644 --- a/vault/request_forwarding_rpc.go +++ b/vault/request_forwarding_rpc.go @@ -201,6 +201,7 @@ func (c *forwardingClient) startHeartbeat() { c.core.logger.Debug("forwarding: error sending echo request to active node", "error", err) return } + c.core.rpcLastSuccessfulHeartbeat.Store(now) if resp == nil { c.core.logger.Debug("forwarding: empty echo response from active node") return @@ -214,6 +215,9 @@ func (c *forwardingClient) startHeartbeat() { atomic.StoreUint32(c.core.activeNodeReplicationState, resp.ReplicationState) } + // store a value before the first tick to indicate that we've started + // sending heartbeats + c.core.rpcLastSuccessfulHeartbeat.Store(time.Now()) tick() for { diff --git a/vault/testing.go b/vault/testing.go index 41bf429acd..bd5c6d0fca 100644 --- a/vault/testing.go +++ b/vault/testing.go @@ -977,6 +977,14 @@ func (c *TestClusterCore) ClusterListener() *cluster.Listener { return c.getClusterListener() } +// NetworkLayer returns the network layer for the cluster core. This can be used +// in conjunction with the cluster.InmemLayer to disconnect specific nodes from +// the cluster when we need to simulate abrupt node failure or a network +// partition in NewTestCluster tests. +func (c *TestClusterCore) NetworkLayer() cluster.NetworkLayer { + return c.Core.clusterNetworkLayer +} + func (c *TestCluster) Cleanup() { c.Logger.Info("cleaning up vault cluster") if tl, ok := c.Logger.(*corehelpers.TestLogger); ok {