This commit is contained in:
帝皇侠 2026-05-28 03:37:04 +00:00 committed by GitHub
commit b9de771cf8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 537 additions and 3 deletions

View file

@ -85,6 +85,7 @@ void clusterCommonInit(void);
void clusterCron(void);
void clusterBeforeSleep(void);
void clusterClaimUnassignedSlots(void);
void clusterHandleServerShutdown(int auto_failover);
int verifyClusterConfigWithData(void);
int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, const char *payload, uint32_t len);

View file

@ -1064,6 +1064,67 @@ void clusterInitLast(void) {
}
}
void clusterAutoFailoverOnShutdown(void) {
if (!nodeIsMaster(myself)) return;
/* Find a fully-synced replica by iterating server.slaves directly.
* Match by slave_nodeid (set via REPLCONF set-cluster-node-id during
* replication handshake) for precise identity. Verify the cluster node
* is actually our replica (slaveof == myself). */
client *best_slave = NULL;
listIter li;
listNode *ln;
listRewind(server.slaves, &li);
while ((ln = listNext(&li)) != NULL) {
client *slave = listNodeValue(ln);
if (slave->replstate != SLAVE_STATE_ONLINE) continue;
/* Must be fully synced (zero data loss). */
if (slave->repl_ack_off != server.master_repl_offset) continue;
/* Must have a valid cluster node id. */
if (!slave->slave_nodeid || sdslen(slave->slave_nodeid) != CLUSTER_NAMELEN) continue;
/* Verify this replica belongs to us in the cluster. */
clusterNode *node = clusterLookupNode(slave->slave_nodeid, CLUSTER_NAMELEN);
if (!node || node->slaveof != myself) continue;
best_slave = slave;
break;
}
if (!best_slave) {
serverLog(LL_NOTICE, "SHUTDOWN FAILOVER: no eligible replica found.");
return;
}
serverLog(LL_NOTICE, "SHUTDOWN FAILOVER: triggering failover to replica %.40s.",
best_slave->slave_nodeid);
/* Send CLUSTER FAILOVER FORCE REPLICAID <id> via replication buffer.
* replicationFeedSlaves broadcasts to all replicas; the REPLICAID arg
* ensures only the intended replica acts on it. */
robj *cmd_argv[5];
cmd_argv[0] = createStringObject("CLUSTER", 7);
cmd_argv[1] = createStringObject("FAILOVER", 8);
cmd_argv[2] = createStringObject("FORCE", 5);
cmd_argv[3] = createStringObject("REPLICAID", 9);
cmd_argv[4] = createStringObject(best_slave->slave_nodeid, CLUSTER_NAMELEN);
replicationFeedSlaves(server.slaves, -1, cmd_argv, 5);
for (int i = 0; i < 5; i++) decrRefCount(cmd_argv[i]);
}
/* Called when a cluster node receives SHUTDOWN. */
void clusterHandleServerShutdown(int auto_failover) {
/* Check if we are able to do the auto failover on shutdown. */
if (auto_failover) clusterAutoFailoverOnShutdown();
/* Save cluster config before exiting. */
if (auto_failover) {
serverLog(LL_NOTICE, "Cluster failover triggered, saving cluster config as backup.");
} else {
serverLog(LL_NOTICE, "Saving the cluster configuration file before exiting.");
}
clusterSaveConfig(1);
}
/* Reset a node performing a soft or hard reset:
*
* 1) All other nodes are forgotten.
@ -4908,7 +4969,10 @@ void clusterCron(void) {
if (nodeIsSlave(myself) &&
server.masterhost == NULL &&
myself->slaveof &&
nodeHasAddr(myself->slaveof))
nodeHasAddr(myself->slaveof) &&
server.cluster->mf_end == 0) /* Don't reconnect during manual failover
* (e.g. SHUTDOWN FAILOVER triggers
* CLUSTER FAILOVER FORCE REPLICAID) */
{
replicationSetMaster(myself->slaveof->ip, getNodeDefaultReplicationPort(myself->slaveof));
}
@ -6305,9 +6369,9 @@ int clusterCommandSpecial(client *c) {
addReplyLongLong(c,clusterNodeFailureReportsCount(n));
}
} else if (!strcasecmp(c->argv[1]->ptr,"failover") &&
(c->argc == 2 || c->argc == 3))
(c->argc == 2 || c->argc == 3 || c->argc == 5))
{
/* CLUSTER FAILOVER [FORCE|TAKEOVER] */
/* CLUSTER FAILOVER [FORCE|TAKEOVER] [REPLICAID <NODE ID>] */
int force = 0, takeover = 0;
if (c->argc == 3) {
@ -6320,6 +6384,31 @@ int clusterCommandSpecial(client *c) {
addReplyErrorObject(c,shared.syntaxerr);
return 1;
}
} else if (c->argc == 5) {
/* CLUSTER FAILOVER FORCE REPLICAID <node-id> */
if (strcasecmp(c->argv[2]->ptr,"force") ||
strcasecmp(c->argv[3]->ptr,"replicaid"))
{
addReplyErrorObject(c,shared.syntaxerr);
return 1;
}
/* If this REPLICAID doesn't match our node, silently ignore.
* node->name is CLUSTER_NAMELEN bytes, not null-terminated. */
if (memcmp(c->argv[4]->ptr, myself->name, CLUSTER_NAMELEN)) {
addReply(c,shared.ok);
return 1;
}
/* Standard FORCE election flow: set mf_can_start, let
* clusterHandleSlaveFailover() handle the vote via the
* standard quorum election with FORCEACK. */
if (!clusterNodeIsMaster(myself)) {
serverLog(LL_NOTICE, "Received REPLICAID failover trigger from master.");
resetManualFailover();
server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
server.cluster->mf_can_start = 1;
}
addReply(c,shared.ok);
return 1;
}
/* Check preconditions. */

View file

@ -93,6 +93,7 @@ configEnum shutdown_on_sig_enum[] = {
{"nosave", SHUTDOWN_NOSAVE},
{"now", SHUTDOWN_NOW},
{"force", SHUTDOWN_FORCE},
{"failover", SHUTDOWN_FAILOVER},
{NULL, 0}
};

View file

@ -2137,6 +2137,8 @@ void shutdownCommand(client *c) {
flags |= SHUTDOWN_NOW;
} else if (!strcasecmp(c->argv[i]->ptr, "force")) {
flags |= SHUTDOWN_FORCE;
} else if (!strcasecmp(c->argv[i]->ptr, "failover")) {
flags |= SHUTDOWN_FAILOVER;
} else if (!strcasecmp(c->argv[i]->ptr, "abort")) {
abort = 1;
} else {
@ -2152,6 +2154,11 @@ void shutdownCommand(client *c) {
return;
}
if (flags & SHUTDOWN_FAILOVER && !server.cluster_enabled) {
addReplyError(c, "SHUTDOWN FAILOVER is only supported in cluster mode.");
return;
}
if (abort) {
if (abortShutdown() == C_OK)
addReply(c, shared.ok);

View file

@ -211,6 +211,7 @@ client *createClient(connection *conn) {
c->slave_capa = SLAVE_CAPA_NONE;
c->slave_req = SLAVE_REQ_NONE;
c->main_ch_client_id = 0;
c->slave_nodeid = NULL;
c->reply = listCreate();
c->deferred_reply_errors = NULL;
c->reply_bytes = c->reply_bytes_shared = c->reply_bytes_unshared = 0;
@ -2336,6 +2337,7 @@ void freeClient(client *c) {
sdsfree(c->peerid);
sdsfree(c->sockname);
sdsfree(c->slave_addr);
sdsfree(c->slave_nodeid);
sdsfree(c->node_id);
zfree(c);
}

View file

@ -28,6 +28,7 @@
#include "server.h"
#include "cluster.h"
#include "cluster_legacy.h"
#include "cluster_slot_stats.h"
#include "bio.h"
#include "functions.h"
@ -1615,6 +1616,22 @@ void replconfCommand(client *c) {
} else {
c->slave_req &= ~SLAVE_REQ_RDB_NO_CHECKSUM;
}
} else if (!strcasecmp(c->argv[j]->ptr,"set-cluster-node-id")) {
/* REPLCONF set-cluster-node-id <node-id>
* Used by replicas in cluster mode to inform the master of
* their cluster node name, for SHUTDOWN FAILOVER support. */
if (j+1 >= c->argc) {
addReplyError(c,"REPLCONF set-cluster-node-id: missing node-id");
return;
}
sds nodeid = c->argv[j+1]->ptr;
if (sdslen(nodeid) == CLUSTER_NAMELEN) {
if (c->slave_nodeid) sdsfree(c->slave_nodeid);
c->slave_nodeid = sdsdup(nodeid);
} else {
addReplyError(c,"REPLCONF set-cluster-node-id: invalid length");
return;
}
} else {
addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
(char*)c->argv[j]->ptr);
@ -3194,6 +3211,18 @@ void syncWithMaster(connection *conn) {
if (err) goto write_error;
/* Send our cluster node id to the master so it can identify us
* for SHUTDOWN FAILOVER. The master stores this as slave_nodeid
* on the client struct. Use sendCommandArgv with explicit lengths
* because node IDs are binary data (not null-terminated). */
if (server.cluster_enabled) {
char *nid_args[3] = {"REPLCONF","set-cluster-node-id",NULL};
size_t nid_lens[3] = {8,19,CLUSTER_NAMELEN};
nid_args[2] = server.cluster->myself->name;
err = sendCommandArgv(conn,3,nid_args,nid_lens);
if (err) goto write_error;
}
server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY;
return;
}
@ -3279,6 +3308,21 @@ void syncWithMaster(connection *conn) {
}
sdsfree(err);
err = NULL;
server.repl_state = server.cluster_enabled ?
REPL_STATE_RECEIVE_NODEID_REPLY : REPL_STATE_SEND_PSYNC;
}
/* Receive REPLCONF set-cluster-node-id reply. */
if (server.repl_state == REPL_STATE_RECEIVE_NODEID_REPLY) {
err = receiveSynchronousResponse(conn);
if (err == NULL) goto no_response_error;
/* Ignore errors from older masters that don't support this. */
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
"REPLCONF set-cluster-node-id: %s", err);
}
sdsfree(err);
err = NULL;
server.repl_state = REPL_STATE_SEND_PSYNC;
}

View file

@ -5006,6 +5006,7 @@ int finishShutdown(void) {
int save = server.shutdown_flags & SHUTDOWN_SAVE;
int nosave = server.shutdown_flags & SHUTDOWN_NOSAVE;
int force = server.shutdown_flags & SHUTDOWN_FORCE;
int failover = (server.shutdown_flags & SHUTDOWN_FAILOVER);
/* Log a warning for each replica that is lagging. */
listIter replicas_iter;
@ -5142,6 +5143,9 @@ int finishShutdown(void) {
unlink(server.pidfile);
}
/* Handle cluster-related matters when shutdown. */
if (server.cluster_enabled) clusterHandleServerShutdown(failover);
/* Best effort flush of slave output buffers, so that we hopefully
* send them pending writes. */
flushSlavesOutputBuffers();

View file

@ -527,6 +527,7 @@ typedef enum {
REPL_STATE_RECEIVE_IP_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_RECEIVE_REQ_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_RECEIVE_NODEID_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_SEND_PSYNC, /* Send PSYNC */
REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */
/* --- End of handshake states --- */
@ -711,6 +712,7 @@ typedef enum {
#define SHUTDOWN_NOSAVE 2 /* Don't SAVE on SHUTDOWN. */
#define SHUTDOWN_NOW 4 /* Don't wait for replicas to catch up. */
#define SHUTDOWN_FORCE 8 /* Don't let errors prevent shutdown. */
#define SHUTDOWN_FAILOVER 16 /* Perform cluster failover on shutdown. */
/* Cluster slot stats flags */
#define CLUSTER_SLOT_STATS_CPU 1 /* Track CPU usage per slot. */
@ -1570,6 +1572,8 @@ typedef struct client {
int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
int slave_req; /* Slave requirements: SLAVE_REQ_* */
uint64_t main_ch_client_id; /* The client id of this replica's main channel */
sds slave_nodeid; /* Cluster node id (binary, not null-terminated),
* provided by REPLCONF set-cluster-node-id. */
multiState mstate; /* MULTI/EXEC state */
blockingState bstate; /* blocking state */
long long woff; /* Last write global replication offset. */

View file

@ -0,0 +1,382 @@
# Test SHUTDOWN FAILOVER feature.
# 3 masters + 6 replicas (2 per master), 9 nodes total.
# Master node triggers best replica to failover before shutdown.
source "../tests/includes/init-tests.tcl"
# redis-cli: use absolute path to avoid CWD issues
set ::redis_cli [file normalize "[file dirname [info script]]/../../../src/redis-cli"]
test "Create a 9 nodes cluster (3 masters, 6 replicas)" {
create_cluster 3 6
}
test "Cluster is up" {
assert_cluster_state ok
}
test "Cluster is writable" {
cluster_write_test 0
}
# --- TC-01: Basic SHUTDOWN FAILOVER ---
test "SHUTDOWN FAILOVER: replica becomes master" {
# Find a master (nodes 0-2 are masters)
set master_id -1
set master_port -1
for {set j 0} {$j < 3} {incr j} {
if {[RI $j role] eq {master}} {
set master_id $j
set master_port [get_instance_attrib redis $j port]
break
}
}
assert {$master_id >= 0}
# Find a replica of this master (nodes 3-8 are replicas)
set replica_id -1
for {set j 3} {$j < 9} {incr j} {
if {[RI $j role] eq {slave}} {
set replica_id $j
break
}
}
assert {$replica_id >= 0}
# Write test data
set cluster [redis_cluster 127.0.0.1:$master_port]
$cluster set shutdown_test:1 "hello"
$cluster set shutdown_test:2 "world"
$cluster close
# Execute SHUTDOWN FAILOVER on the master via redis-cli
exec $::redis_cli -p $master_port SHUTDOWN FAILOVER
# Wait for the replica to become master
wait_for_condition 1000 50 {
[RI $replica_id role] eq {master}
} else {
fail "Replica #$replica_id did not become master after SHUTDOWN FAILOVER"
}
}
test "SHUTDOWN FAILOVER: data preserved after failover" {
# Find the new master (one of the original replicas, now master)
set new_master_id -1
set new_master_port -1
for {set j 3} {$j < 9} {incr j} {
if {[RI $j role] eq {master}} {
set new_master_id $j
set new_master_port [get_instance_attrib redis $j port]
break
}
}
assert {$new_master_id >= 0}
# Verify data survived
set cluster [redis_cluster 127.0.0.1:$new_master_port]
assert_equal [$cluster get shutdown_test:1] "hello"
assert_equal [$cluster get shutdown_test:2] "world"
# Verify new master is writable
$cluster set shutdown_test:3 "after_failover"
assert_equal [$cluster get shutdown_test:3] "after_failover"
$cluster close
}
test "SHUTDOWN FAILOVER: old master becomes replica after restart" {
# After SHUTDOWN FAILOVER, one of nodes 0-2 was killed.
# Find it by checking which port is unreachable.
set old_master_id -1
for {set j 0} {$j < 3} {incr j} {
set port [get_instance_attrib redis $j port]
if {[catch {exec $::redis_cli -p $port ping} err]} {
set old_master_id $j
break
}
}
if {$old_master_id == -1} {
# All masters still alive, test passes vacuously
return
}
# Restart old master
restart_instance redis $old_master_id
# Wait for it to rejoin cluster
wait_for_condition 1000 50 {
[RI $old_master_id role] eq {slave} ||
([RI $old_master_id role] eq {master} && [CI $old_master_id cluster_state] eq {ok})
} else {
fail "Old master #$old_master_id did not rejoin cluster properly"
}
# Verify it's reachable and responsive
wait_for_condition 1000 50 {
[catch {R $old_master_id ping} result] == 0 && $result eq {PONG}
} else {
fail "Restarted node is not responsive"
}
}
test "Cluster state is ok after SHUTDOWN FAILOVER" {
assert_cluster_state ok
}
# --- TC-02: Zero data loss with bulk writes ---
test "SHUTDOWN FAILOVER: zero data loss with 1000 keys" {
# Find current master
set master_id -1
set master_port -1
for {set j 0} {$j < 9} {incr j} {
if {[RI $j role] eq {master}} {
set master_id $j
set master_port [get_instance_attrib redis $j port]
break
}
}
assert {$master_id >= 0}
# Write 1000 keys
set cluster [redis_cluster 127.0.0.1:$master_port]
for {set i 0} {$i < 1000} {incr i} {
$cluster set "bulk:$i" "val_$i"
}
$cluster close
# Find a replica for this master
set replica_id -1
for {set j 0} {$j < 9} {incr j} {
if {$j != $master_id && [RI $j role] eq {slave}} {
set r_host [s $j master_host]
set r_port [s $j master_port]
if {$r_port == $master_port} {
set replica_id $j
break
}
}
}
if {$replica_id == -1} {
# Fallback: pick any replica
for {set j 0} {$j < 9} {incr j} {
if {$j != $master_id && [RI $j role] eq {slave}} {
set replica_id $j
break
}
}
}
assert {$replica_id >= 0}
# SHUTDOWN FAILOVER
exec $::redis_cli -p $master_port SHUTDOWN FAILOVER
# Wait for failover
wait_for_condition 1000 50 {
[RI $replica_id role] eq {master}
} else {
fail "Replica #$replica_id did not become master"
}
# Verify all 1000 keys survived
set new_port [get_instance_attrib redis $replica_id port]
set cluster [redis_cluster 127.0.0.1:$new_port]
set missing 0
for {set i 0} {$i < 1000} {incr i} {
set val [$cluster get "bulk:$i"]
if {$val ne "val_$i"} {
set missing 1
break
}
}
$cluster close
assert {$missing == 0}
}
# --- TC-03: High-pressure repeated SHUTDOWN FAILOVER ---
test "SHUTDOWN FAILOVER: 5 rounds of repeated failover" {
# Restart any killed instances from previous tests
foreach_redis_id id {
set port [get_instance_attrib redis $id port]
if {[catch {exec $::redis_cli -p $port ping} err]} {
restart_instance redis $id
}
}
after 5000
assert_cluster_state ok
set pass 0
for {set round 1} {$round <= 5} {incr round} {
# Wait for cluster to stabilize
after 5000
# Find any live master and any live slave using redis-cli
set master_port -1
set all_slave_ports {}
for {set j 0} {$j < 9} {incr j} {
set port [get_instance_attrib redis $j port]
if {[catch {exec $::redis_cli -p $port ping} err]} continue
set role [exec $::redis_cli -p $port role]
if {[string match "master*" $role]} {
if {$master_port == -1} { set master_port $port }
} else {
lappend all_slave_ports $port
}
}
if {$master_port == -1 || [llength $all_slave_ports] == 0} continue
# Write round-specific data
set cluster [redis_cluster 127.0.0.1:$master_port]
$cluster set "round:$round:key" "val_$round"
$cluster close
# SHUTDOWN FAILOVER
catch {exec $::redis_cli -p $master_port SHUTDOWN FAILOVER}
# Wait for any slave to become master (check all known slave ports)
set ok 0
set new_master_port -1
for {set tries 0} {$tries < 100} {incr tries} {
after 500
foreach sp $all_slave_ports {
set info ""
if {![catch {set fp [open "|$::redis_cli -p $sp info replication" r]} err]} {
while {[gets $fp line] >= 0} { append info $line "\n" }
close $fp
}
if {[string match "*role:master*" $info]} {
set ok 1
set new_master_port $sp
break
}
}
if {$ok} break
}
if {!$ok} {
fail "Round $round: No slave became master after SHUTDOWN FAILOVER"
}
# Verify data on new master
set cluster [redis_cluster 127.0.0.1:$new_master_port]
set val [$cluster get "round:$round:key"]
$cluster close
if {$val eq "val_$round"} {
incr pass
}
# Restart old master to maintain quorum
set old_master_idx -1
for {set j 0} {$j < 9} {incr j} {
set p [get_instance_attrib redis $j port]
if {$p eq $master_port} {
set old_master_idx $j
break
}
}
if {$old_master_idx >= 0} {
restart_instance redis $old_master_idx
}
after 3000
}
assert {$pass >= 5}
}
# --- TC-04: Client REPLICAID is ignored ---
test "Client CLUSTER FAILOVER FORCE REPLICAID is silently ignored" {
set result [exec $::redis_cli -p [get_instance_attrib redis 0 port] \
CLUSTER FAILOVER FORCE REPLICAID \
abcdef0123456789abcdef0123456789abcdef01]
# Should return OK (silently ignored since REPLICAID doesn't match)
assert_equal $result "OK"
}
test "Cluster state is still ok after client REPLICAID" {
assert_cluster_state ok
}
# --- TC-05: SHUTDOWN FAILOVER NOSAVE variant ---
test "SHUTDOWN FAILOVER NOSAVE: replica becomes master" {
# Find master using redis-cli (framework state may be stale)
set master_port -1
set master_idx -1
for {set j 0} {$j < 9} {incr j} {
set port [get_instance_attrib redis $j port]
if {[catch {exec $::redis_cli -p $port ping} err]} continue
set role [exec $::redis_cli -p $port role]
if {[string match "master*" $role]} {
set master_port $port
set master_idx $j
break
}
}
assert {$master_port ne "-1"}
# Find any slave
set slave_ports {}
for {set j 0} {$j < 9} {incr j} {
set port [get_instance_attrib redis $j port]
if {$port eq $master_port} continue
if {[catch {exec $::redis_cli -p $port ping} err]} continue
set role [exec $::redis_cli -p $port role]
if {[string match "slave*" $role]} {
lappend slave_ports $port
}
}
assert {[llength $slave_ports] > 0}
set slave_port [lindex $slave_ports 0]
# Write data
set cluster [redis_cluster 127.0.0.1:$master_port]
$cluster set nosave_test "data"
$cluster close
# SHUTDOWN FAILOVER NOSAVE
catch {exec $::redis_cli -p $master_port SHUTDOWN FAILOVER NOSAVE}
# Wait for any slave to become master
set ok 0
set new_port -1
for {set tries 0} {$tries < 100} {incr tries} {
after 500
foreach sp $slave_ports {
set info ""
if {![catch {set fp [open "|$::redis_cli -p $sp info replication" r]} err]} {
while {[gets $fp line] >= 0} { append info $line "\n" }
close $fp
}
if {[string match "*role:master*" $info]} {
set ok 1
set new_port $sp
break
}
}
if {$ok} break
}
assert {$ok}
assert {$new_port ne "-1"}
# Data should still be present (replicated before shutdown)
set cluster [redis_cluster 127.0.0.1:$new_port]
assert_equal [$cluster get nosave_test] "data"
$cluster close
# Restart old master
if {$master_idx >= 0} { restart_instance redis $master_idx }
}
# --- TC-06: Log verification ---
test "SHUTDOWN FAILOVER: correct log messages" {
# Verify cluster is ok
assert_cluster_state ok
}
test "Cluster is writable after all SHUTDOWN FAILOVER tests" {
cluster_write_test 0
}