From 1b980b8cc3b715d9c9a4824bfb3c19f456df4eee Mon Sep 17 00:00:00 2001 From: a981008 <2587450776@qq.com> Date: Tue, 26 May 2026 22:58:59 +0800 Subject: [PATCH] Cluster master can trigger failover on shutdown. best replica (closest to master by repl_ack_off), sends a forced failover command via the replication link, and marks itself as slave,fail before exiting. The promoted replica takes over slots immediately instead of waiting for cluster-node-timeout. --- src/cluster.h | 1 + src/cluster_legacy.c | 134 ++++++++++++++++++++++++++++++++++++++++++- src/config.c | 1 + src/db.c | 7 +++ src/server.c | 4 ++ src/server.h | 1 + 6 files changed, 146 insertions(+), 2 deletions(-) diff --git a/src/cluster.h b/src/cluster.h index a124f18cc..41754d08d 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -85,6 +85,7 @@ void clusterCommonInit(void); void clusterCron(void); void clusterBeforeSleep(void); void clusterClaimUnassignedSlots(void); +void clusterHandleServerShutdown(bool auto_failover); int verifyClusterConfigWithData(void); int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, const char *payload, uint32_t len); diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index c93aea2ad..73a3c82c8 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -50,6 +50,7 @@ void clusterUpdateState(void); int clusterNodeCoversSlot(clusterNode *n, int slot); list *clusterGetNodesInMyShard(clusterNode *node); int clusterNodeAddSlave(clusterNode *master, clusterNode *slave); +int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave); int clusterAddSlot(clusterNode *n, int slot); int clusterDelSlot(int slot); int clusterMoveNodeSlots(clusterNode *from_node, clusterNode *to_node); @@ -64,7 +65,13 @@ void bitmapClearBit(unsigned char *bitmap, int pos); void clusterDoBeforeSleep(int flags); void clusterSendUpdate(clusterLink *link, clusterNode *node); void resetManualFailover(void); +void clusterWriteHandler(connection *conn); +void clusterBroadcastPong(int target); void clusterCloseAllSlots(void); + +/* Broadcast targets for clusterBroadcastPong */ +#define CLUSTER_BROADCAST_ALL 0 +#define CLUSTER_BROADCAST_LOCAL_SLAVES 1 void clusterSetNodeAsMaster(clusterNode *n); void clusterDelNode(clusterNode *delnode); sds representClusterNodeFlags(sds ci, uint16_t flags); @@ -1064,6 +1071,101 @@ void clusterInitLast(void) { } } +void clusterAutoFailoverOnShutdown(void) { + if (!nodeIsMaster(myself)) return; + + /* Find the replica with the largest replication offset, i.e. closest to master. + * Iterate cluster slave nodes and match to replication clients by port. */ + client *best_replica = NULL; + clusterNode *best_node = NULL; + long long best_offset = -1; + + dictIterator di; + dictEntry *de; + dictInitSafeIterator(&di, server.cluster->nodes); + while ((de = dictNext(&di)) != NULL) { + clusterNode *node = dictGetVal(de); + if (node->slaveof != myself) continue; + + listIter li; + listNode *ln; + listRewind(server.slaves, &li); + while ((ln = listNext(&li)) != NULL) { + client *replica = listNodeValue(ln); + if (replica->replstate != SLAVE_STATE_ONLINE) continue; + if (replica->repl_ack_off <= best_offset) continue; + if (node->tcp_port != replica->slave_listening_port) continue; + /* Verify IP if slave_addr is set (e.g. via slave-announce-ip). */ + if (replica->slave_addr && strcmp(node->ip, replica->slave_addr) != 0) + continue; + + best_offset = replica->repl_ack_off; + best_replica = replica; + best_node = node; + } + } + dictResetIterator(&di); + + if (best_replica == NULL) { + serverLog(LL_NOTICE, "Unable to find a replica to perform the auto failover on shutdown."); + return; + } + + serverLog(LL_NOTICE, "Auto failover on shutdown: replica %.40s offset %lld (master offset %lld).", + best_node->name, (long long)best_offset, (long long)server.master_repl_offset); + + /* Send CLUSTER FAILOVER FORCE REPLICAID via replication buffer. + * replicationFeedSlaves broadcasts to all replicas; the REPLICAID arg + * ensures only the intended replica acts on it. */ + robj *cmd_argv[5]; + cmd_argv[0] = createStringObject("CLUSTER", 7); + cmd_argv[1] = createStringObject("FAILOVER", 8); + cmd_argv[2] = createStringObject("FORCE", 5); + cmd_argv[3] = createStringObject("REPLICAID", 9); + cmd_argv[4] = createStringObject(best_node->name, CLUSTER_NAMELEN); + replicationFeedSlaves(server.slaves, -1, cmd_argv, 5); + for (int i = 0; i < 5; i++) decrRefCount(cmd_argv[i]); + serverLog(LL_NOTICE, "Perform auto failover to replica %.40s on shutdown.", best_node->name); + + /* Swap roles before exiting: mark myself as a slave of the promoted replica, + * so cluster nodes shows it as "slave,fail" instead of "master ... disconnected". */ + clusterNodeRemoveSlave(myself, best_node); + for (int j = 0; j < CLUSTER_SLOTS; j++) { + if (clusterNodeCoversSlot(myself, j)) { + clusterDelSlot(j); + clusterAddSlot(best_node, j); + } + } + myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO); + myself->flags |= CLUSTER_NODE_SLAVE; + myself->slaveof = best_node; + updateShardId(myself, best_node->shard_id); + clusterNodeAddSlave(best_node, myself); + clusterCloseAllSlots(); + + /* Broadcast our new role via cluster bus so peers see "slave,fail" */ + clusterBroadcastPong(CLUSTER_BROADCAST_ALL); + /* Flush the queued PONGs synchronously since the event loop won't run during shutdown. */ + dictIterator bdi; + dictEntry *bde; + dictInitSafeIterator(&bdi, server.cluster->nodes); + while((bde = dictNext(&bdi)) != NULL) { + clusterNode *node = dictGetVal(bde); + if (node->link) clusterWriteHandler(node->link->conn); + } + dictResetIterator(&bdi); +} + +/* Called when a cluster node receives SHUTDOWN. */ +void clusterHandleServerShutdown(bool auto_failover) { + /* Check if we are able to do the auto failover on shutdown. */ + if (auto_failover) clusterAutoFailoverOnShutdown(); + + /* The error logs have been logged in the save function if the save fails. */ + serverLog(LL_NOTICE, "Saving the cluster configuration file before exiting."); + clusterSaveConfig(1); +} + /* Reset a node performing a soft or hard reset: * * 1) All other nodes are forgotten. @@ -3099,6 +3201,12 @@ int clusterProcessPacket(clusterLink *link) { int slots = clusterMoveNodeSlots(sender, master); /* `master` is still a `slave` in this observer node's view; update its role and configEpoch */ clusterSetNodeAsMaster(master); + if (master == myself) { + /* When promoting ourselves via cluster bus (e.g., after a + * clean shutdown-failover), we must also clear replication + * state so we stop trying to connect to the old master. */ + replicationUnsetMaster(); + } master->configEpoch = senderConfigEpoch; serverLog(LL_NOTICE, "A failover occurred in shard %.40s; node %.40s (%s)" " lost %d slot(s) to node %.40s (%s) with a config epoch of %llu", @@ -6305,9 +6413,9 @@ int clusterCommandSpecial(client *c) { addReplyLongLong(c,clusterNodeFailureReportsCount(n)); } } else if (!strcasecmp(c->argv[1]->ptr,"failover") && - (c->argc == 2 || c->argc == 3)) + (c->argc == 2 || c->argc == 3 || c->argc == 5)) { - /* CLUSTER FAILOVER [FORCE|TAKEOVER] */ + /* CLUSTER FAILOVER [FORCE|TAKEOVER] [REPLICAID ] */ int force = 0, takeover = 0; if (c->argc == 3) { @@ -6320,6 +6428,28 @@ int clusterCommandSpecial(client *c) { addReplyErrorObject(c,shared.syntaxerr); return 1; } + } else if (c->argc == 5) { + /* CLUSTER FAILOVER FORCE REPLICAID */ + if (strcasecmp(c->argv[2]->ptr,"force") || + strcasecmp(c->argv[3]->ptr,"replicaid")) + { + addReplyErrorObject(c,shared.syntaxerr); + return 1; + } + /* If this REPLICAID doesn't match our node, silently ignore. + * node->name is CLUSTER_NAMELEN bytes, not null-terminated. */ + if (memcmp(c->argv[4]->ptr, myself->name, CLUSTER_NAMELEN)) { + addReply(c,shared.ok); + return 1; + } + force = 1; + /* If we're already a master (promoted before this command was + * processed), the failover is done — silently ignore to avoid + * sending an error back via the replication link. */ + if (clusterNodeIsMaster(myself)) { + addReply(c,shared.ok); + return 1; + } } /* Check preconditions. */ diff --git a/src/config.c b/src/config.c index 97fa58a11..1f61012e0 100644 --- a/src/config.c +++ b/src/config.c @@ -93,6 +93,7 @@ configEnum shutdown_on_sig_enum[] = { {"nosave", SHUTDOWN_NOSAVE}, {"now", SHUTDOWN_NOW}, {"force", SHUTDOWN_FORCE}, + {"failover", SHUTDOWN_FAILOVER}, {NULL, 0} }; diff --git a/src/db.c b/src/db.c index 87881a991..cb5e8b8a5 100644 --- a/src/db.c +++ b/src/db.c @@ -2137,6 +2137,8 @@ void shutdownCommand(client *c) { flags |= SHUTDOWN_NOW; } else if (!strcasecmp(c->argv[i]->ptr, "force")) { flags |= SHUTDOWN_FORCE; + } else if (!strcasecmp(c->argv[i]->ptr, "failover")) { + flags |= SHUTDOWN_FAILOVER; } else if (!strcasecmp(c->argv[i]->ptr, "abort")) { abort = 1; } else { @@ -2152,6 +2154,11 @@ void shutdownCommand(client *c) { return; } + if (flags & SHUTDOWN_FAILOVER && !server.cluster_enabled) { + addReplyError(c, "SHUTDOWN FAILOVER is only supported in cluster mode."); + return; + } + if (abort) { if (abortShutdown() == C_OK) addReply(c, shared.ok); diff --git a/src/server.c b/src/server.c index df660175e..2493b3da3 100644 --- a/src/server.c +++ b/src/server.c @@ -5006,6 +5006,7 @@ int finishShutdown(void) { int save = server.shutdown_flags & SHUTDOWN_SAVE; int nosave = server.shutdown_flags & SHUTDOWN_NOSAVE; int force = server.shutdown_flags & SHUTDOWN_FORCE; + bool failover = (server.shutdown_flags & SHUTDOWN_FAILOVER) != 0; /* Log a warning for each replica that is lagging. */ listIter replicas_iter; @@ -5142,6 +5143,9 @@ int finishShutdown(void) { unlink(server.pidfile); } + /* Handle cluster-related matters when shutdown. */ + if (server.cluster_enabled) clusterHandleServerShutdown(failover); + /* Best effort flush of slave output buffers, so that we hopefully * send them pending writes. */ flushSlavesOutputBuffers(); diff --git a/src/server.h b/src/server.h index 9318eec68..c8893dd2a 100644 --- a/src/server.h +++ b/src/server.h @@ -711,6 +711,7 @@ typedef enum { #define SHUTDOWN_NOSAVE 2 /* Don't SAVE on SHUTDOWN. */ #define SHUTDOWN_NOW 4 /* Don't wait for replicas to catch up. */ #define SHUTDOWN_FORCE 8 /* Don't let errors prevent shutdown. */ +#define SHUTDOWN_FAILOVER 16 /* Perform cluster failover on shutdown. */ /* Cluster slot stats flags */ #define CLUSTER_SLOT_STATS_CPU 1 /* Track CPU usage per slot. */