mirror of
https://github.com/redis/redis.git
synced 2026-05-28 04:02:46 -04:00
Cluster master can trigger failover on shutdown.
best replica (closest to master by repl_ack_off), sends a forced failover command via the replication link, and marks itself as slave,fail before exiting. The promoted replica takes over slots immediately instead of waiting for cluster-node-timeout.
This commit is contained in:
parent
138263a1b4
commit
1b980b8cc3
6 changed files with 146 additions and 2 deletions
|
|
@ -85,6 +85,7 @@ void clusterCommonInit(void);
|
|||
void clusterCron(void);
|
||||
void clusterBeforeSleep(void);
|
||||
void clusterClaimUnassignedSlots(void);
|
||||
void clusterHandleServerShutdown(bool auto_failover);
|
||||
int verifyClusterConfigWithData(void);
|
||||
|
||||
int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, const char *payload, uint32_t len);
|
||||
|
|
|
|||
|
|
@ -50,6 +50,7 @@ void clusterUpdateState(void);
|
|||
int clusterNodeCoversSlot(clusterNode *n, int slot);
|
||||
list *clusterGetNodesInMyShard(clusterNode *node);
|
||||
int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
|
||||
int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave);
|
||||
int clusterAddSlot(clusterNode *n, int slot);
|
||||
int clusterDelSlot(int slot);
|
||||
int clusterMoveNodeSlots(clusterNode *from_node, clusterNode *to_node);
|
||||
|
|
@ -64,7 +65,13 @@ void bitmapClearBit(unsigned char *bitmap, int pos);
|
|||
void clusterDoBeforeSleep(int flags);
|
||||
void clusterSendUpdate(clusterLink *link, clusterNode *node);
|
||||
void resetManualFailover(void);
|
||||
void clusterWriteHandler(connection *conn);
|
||||
void clusterBroadcastPong(int target);
|
||||
void clusterCloseAllSlots(void);
|
||||
|
||||
/* Broadcast targets for clusterBroadcastPong */
|
||||
#define CLUSTER_BROADCAST_ALL 0
|
||||
#define CLUSTER_BROADCAST_LOCAL_SLAVES 1
|
||||
void clusterSetNodeAsMaster(clusterNode *n);
|
||||
void clusterDelNode(clusterNode *delnode);
|
||||
sds representClusterNodeFlags(sds ci, uint16_t flags);
|
||||
|
|
@ -1064,6 +1071,101 @@ void clusterInitLast(void) {
|
|||
}
|
||||
}
|
||||
|
||||
void clusterAutoFailoverOnShutdown(void) {
|
||||
if (!nodeIsMaster(myself)) return;
|
||||
|
||||
/* Find the replica with the largest replication offset, i.e. closest to master.
|
||||
* Iterate cluster slave nodes and match to replication clients by port. */
|
||||
client *best_replica = NULL;
|
||||
clusterNode *best_node = NULL;
|
||||
long long best_offset = -1;
|
||||
|
||||
dictIterator di;
|
||||
dictEntry *de;
|
||||
dictInitSafeIterator(&di, server.cluster->nodes);
|
||||
while ((de = dictNext(&di)) != NULL) {
|
||||
clusterNode *node = dictGetVal(de);
|
||||
if (node->slaveof != myself) continue;
|
||||
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(server.slaves, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
client *replica = listNodeValue(ln);
|
||||
if (replica->replstate != SLAVE_STATE_ONLINE) continue;
|
||||
if (replica->repl_ack_off <= best_offset) continue;
|
||||
if (node->tcp_port != replica->slave_listening_port) continue;
|
||||
/* Verify IP if slave_addr is set (e.g. via slave-announce-ip). */
|
||||
if (replica->slave_addr && strcmp(node->ip, replica->slave_addr) != 0)
|
||||
continue;
|
||||
|
||||
best_offset = replica->repl_ack_off;
|
||||
best_replica = replica;
|
||||
best_node = node;
|
||||
}
|
||||
}
|
||||
dictResetIterator(&di);
|
||||
|
||||
if (best_replica == NULL) {
|
||||
serverLog(LL_NOTICE, "Unable to find a replica to perform the auto failover on shutdown.");
|
||||
return;
|
||||
}
|
||||
|
||||
serverLog(LL_NOTICE, "Auto failover on shutdown: replica %.40s offset %lld (master offset %lld).",
|
||||
best_node->name, (long long)best_offset, (long long)server.master_repl_offset);
|
||||
|
||||
/* Send CLUSTER FAILOVER FORCE REPLICAID <id> via replication buffer.
|
||||
* replicationFeedSlaves broadcasts to all replicas; the REPLICAID arg
|
||||
* ensures only the intended replica acts on it. */
|
||||
robj *cmd_argv[5];
|
||||
cmd_argv[0] = createStringObject("CLUSTER", 7);
|
||||
cmd_argv[1] = createStringObject("FAILOVER", 8);
|
||||
cmd_argv[2] = createStringObject("FORCE", 5);
|
||||
cmd_argv[3] = createStringObject("REPLICAID", 9);
|
||||
cmd_argv[4] = createStringObject(best_node->name, CLUSTER_NAMELEN);
|
||||
replicationFeedSlaves(server.slaves, -1, cmd_argv, 5);
|
||||
for (int i = 0; i < 5; i++) decrRefCount(cmd_argv[i]);
|
||||
serverLog(LL_NOTICE, "Perform auto failover to replica %.40s on shutdown.", best_node->name);
|
||||
|
||||
/* Swap roles before exiting: mark myself as a slave of the promoted replica,
|
||||
* so cluster nodes shows it as "slave,fail" instead of "master ... disconnected". */
|
||||
clusterNodeRemoveSlave(myself, best_node);
|
||||
for (int j = 0; j < CLUSTER_SLOTS; j++) {
|
||||
if (clusterNodeCoversSlot(myself, j)) {
|
||||
clusterDelSlot(j);
|
||||
clusterAddSlot(best_node, j);
|
||||
}
|
||||
}
|
||||
myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO);
|
||||
myself->flags |= CLUSTER_NODE_SLAVE;
|
||||
myself->slaveof = best_node;
|
||||
updateShardId(myself, best_node->shard_id);
|
||||
clusterNodeAddSlave(best_node, myself);
|
||||
clusterCloseAllSlots();
|
||||
|
||||
/* Broadcast our new role via cluster bus so peers see "slave,fail" */
|
||||
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
|
||||
/* Flush the queued PONGs synchronously since the event loop won't run during shutdown. */
|
||||
dictIterator bdi;
|
||||
dictEntry *bde;
|
||||
dictInitSafeIterator(&bdi, server.cluster->nodes);
|
||||
while((bde = dictNext(&bdi)) != NULL) {
|
||||
clusterNode *node = dictGetVal(bde);
|
||||
if (node->link) clusterWriteHandler(node->link->conn);
|
||||
}
|
||||
dictResetIterator(&bdi);
|
||||
}
|
||||
|
||||
/* Called when a cluster node receives SHUTDOWN. */
|
||||
void clusterHandleServerShutdown(bool auto_failover) {
|
||||
/* Check if we are able to do the auto failover on shutdown. */
|
||||
if (auto_failover) clusterAutoFailoverOnShutdown();
|
||||
|
||||
/* The error logs have been logged in the save function if the save fails. */
|
||||
serverLog(LL_NOTICE, "Saving the cluster configuration file before exiting.");
|
||||
clusterSaveConfig(1);
|
||||
}
|
||||
|
||||
/* Reset a node performing a soft or hard reset:
|
||||
*
|
||||
* 1) All other nodes are forgotten.
|
||||
|
|
@ -3099,6 +3201,12 @@ int clusterProcessPacket(clusterLink *link) {
|
|||
int slots = clusterMoveNodeSlots(sender, master);
|
||||
/* `master` is still a `slave` in this observer node's view; update its role and configEpoch */
|
||||
clusterSetNodeAsMaster(master);
|
||||
if (master == myself) {
|
||||
/* When promoting ourselves via cluster bus (e.g., after a
|
||||
* clean shutdown-failover), we must also clear replication
|
||||
* state so we stop trying to connect to the old master. */
|
||||
replicationUnsetMaster();
|
||||
}
|
||||
master->configEpoch = senderConfigEpoch;
|
||||
serverLog(LL_NOTICE, "A failover occurred in shard %.40s; node %.40s (%s)"
|
||||
" lost %d slot(s) to node %.40s (%s) with a config epoch of %llu",
|
||||
|
|
@ -6305,9 +6413,9 @@ int clusterCommandSpecial(client *c) {
|
|||
addReplyLongLong(c,clusterNodeFailureReportsCount(n));
|
||||
}
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"failover") &&
|
||||
(c->argc == 2 || c->argc == 3))
|
||||
(c->argc == 2 || c->argc == 3 || c->argc == 5))
|
||||
{
|
||||
/* CLUSTER FAILOVER [FORCE|TAKEOVER] */
|
||||
/* CLUSTER FAILOVER [FORCE|TAKEOVER] [REPLICAID <NODE ID>] */
|
||||
int force = 0, takeover = 0;
|
||||
|
||||
if (c->argc == 3) {
|
||||
|
|
@ -6320,6 +6428,28 @@ int clusterCommandSpecial(client *c) {
|
|||
addReplyErrorObject(c,shared.syntaxerr);
|
||||
return 1;
|
||||
}
|
||||
} else if (c->argc == 5) {
|
||||
/* CLUSTER FAILOVER FORCE REPLICAID <node-id> */
|
||||
if (strcasecmp(c->argv[2]->ptr,"force") ||
|
||||
strcasecmp(c->argv[3]->ptr,"replicaid"))
|
||||
{
|
||||
addReplyErrorObject(c,shared.syntaxerr);
|
||||
return 1;
|
||||
}
|
||||
/* If this REPLICAID doesn't match our node, silently ignore.
|
||||
* node->name is CLUSTER_NAMELEN bytes, not null-terminated. */
|
||||
if (memcmp(c->argv[4]->ptr, myself->name, CLUSTER_NAMELEN)) {
|
||||
addReply(c,shared.ok);
|
||||
return 1;
|
||||
}
|
||||
force = 1;
|
||||
/* If we're already a master (promoted before this command was
|
||||
* processed), the failover is done — silently ignore to avoid
|
||||
* sending an error back via the replication link. */
|
||||
if (clusterNodeIsMaster(myself)) {
|
||||
addReply(c,shared.ok);
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
/* Check preconditions. */
|
||||
|
|
|
|||
|
|
@ -93,6 +93,7 @@ configEnum shutdown_on_sig_enum[] = {
|
|||
{"nosave", SHUTDOWN_NOSAVE},
|
||||
{"now", SHUTDOWN_NOW},
|
||||
{"force", SHUTDOWN_FORCE},
|
||||
{"failover", SHUTDOWN_FAILOVER},
|
||||
{NULL, 0}
|
||||
};
|
||||
|
||||
|
|
|
|||
7
src/db.c
7
src/db.c
|
|
@ -2137,6 +2137,8 @@ void shutdownCommand(client *c) {
|
|||
flags |= SHUTDOWN_NOW;
|
||||
} else if (!strcasecmp(c->argv[i]->ptr, "force")) {
|
||||
flags |= SHUTDOWN_FORCE;
|
||||
} else if (!strcasecmp(c->argv[i]->ptr, "failover")) {
|
||||
flags |= SHUTDOWN_FAILOVER;
|
||||
} else if (!strcasecmp(c->argv[i]->ptr, "abort")) {
|
||||
abort = 1;
|
||||
} else {
|
||||
|
|
@ -2152,6 +2154,11 @@ void shutdownCommand(client *c) {
|
|||
return;
|
||||
}
|
||||
|
||||
if (flags & SHUTDOWN_FAILOVER && !server.cluster_enabled) {
|
||||
addReplyError(c, "SHUTDOWN FAILOVER is only supported in cluster mode.");
|
||||
return;
|
||||
}
|
||||
|
||||
if (abort) {
|
||||
if (abortShutdown() == C_OK)
|
||||
addReply(c, shared.ok);
|
||||
|
|
|
|||
|
|
@ -5006,6 +5006,7 @@ int finishShutdown(void) {
|
|||
int save = server.shutdown_flags & SHUTDOWN_SAVE;
|
||||
int nosave = server.shutdown_flags & SHUTDOWN_NOSAVE;
|
||||
int force = server.shutdown_flags & SHUTDOWN_FORCE;
|
||||
bool failover = (server.shutdown_flags & SHUTDOWN_FAILOVER) != 0;
|
||||
|
||||
/* Log a warning for each replica that is lagging. */
|
||||
listIter replicas_iter;
|
||||
|
|
@ -5142,6 +5143,9 @@ int finishShutdown(void) {
|
|||
unlink(server.pidfile);
|
||||
}
|
||||
|
||||
/* Handle cluster-related matters when shutdown. */
|
||||
if (server.cluster_enabled) clusterHandleServerShutdown(failover);
|
||||
|
||||
/* Best effort flush of slave output buffers, so that we hopefully
|
||||
* send them pending writes. */
|
||||
flushSlavesOutputBuffers();
|
||||
|
|
|
|||
|
|
@ -711,6 +711,7 @@ typedef enum {
|
|||
#define SHUTDOWN_NOSAVE 2 /* Don't SAVE on SHUTDOWN. */
|
||||
#define SHUTDOWN_NOW 4 /* Don't wait for replicas to catch up. */
|
||||
#define SHUTDOWN_FORCE 8 /* Don't let errors prevent shutdown. */
|
||||
#define SHUTDOWN_FAILOVER 16 /* Perform cluster failover on shutdown. */
|
||||
|
||||
/* Cluster slot stats flags */
|
||||
#define CLUSTER_SLOT_STATS_CPU 1 /* Track CPU usage per slot. */
|
||||
|
|
|
|||
Loading…
Reference in a new issue