diff --git a/src/cluster_asm.c b/src/cluster_asm.c index ac94a5103..6e94dca5f 100644 --- a/src/cluster_asm.c +++ b/src/cluster_asm.c @@ -1316,15 +1316,35 @@ void asmCallbackOnFreeClient(client *c) { } if (c == task->rdb_channel_client) { - /* TODO: Detect whether the bgsave is completed successfully and - * update the state properly. */ - task->rdb_channel_state = ASM_COMPLETED; - /* We may not have detected whether the child process has exited yet, - * so we can't determine whether the client has completed the slots - * snapshot transfer. If the RDB channel is interrupted unexpectedly, - * the destination side will also close the main channel. - * So here we just reset the RDB channel client of task. */ task->rdb_channel_client = NULL; + + /* Outcome already recorded before freeClient runs: + * - asmRdbChannelTransferSucceed (from updateSlavesWaitingBgsave) + * or the normal-disconnect path below -> ASM_COMPLETED + * - asmRdbChannelTransferFailed -> task->state == ASM_FAILED + * - asmSendStreamEofIfDrained clears c->task before freeing this + * client, so this branch is unreachable on that drain path. */ + if (task->rdb_channel_state == ASM_COMPLETED) return; + if (task->state == ASM_FAILED || task->state == ASM_CANCELED) return; + + /* During an active migrate, the destination closes the RDB channel as + * soon as it receives SNAPSHOT-EOF, which can free this slave client + * before the source bgsave child exits and updateSlavesWaitingBgsave + * runs. That disconnect is not a failure; mark the rdb channel done + * and let the main channel / handoff logic continue. */ + if (task->operation == ASM_MIGRATE && + (task->state == ASM_SEND_BULK_AND_STREAM || + task->state == ASM_SEND_STREAM || + task->state == ASM_HANDOFF_PREP || + task->state == ASM_HANDOFF || + task->state == ASM_STREAM_EOF)) + { + task->rdb_channel_state = ASM_COMPLETED; + return; + } + + asmTaskSetFailed(task, + "RDB channel - Connection closed before snapshot completed"); return; } @@ -1707,8 +1727,12 @@ void asmSlotSnapshotAndStreamStart(struct asmTask *task) { task->dest_slots_snapshot_time = server.mstime; } -/* Called when the RDB channel has succeeded in sending the snapshot. */ -void asmSlotSnapshotSucceed(struct asmTask *task) { +/* Called when the main channel finishes bulk delivery, i.e. bgsave succeeded + * and the main-channel slave can now switch from bulk+stream to stream-only. + * Invoked from updateSlavesWaitingBgsave() while iterating the main-channel + * slave (replstate == SLAVE_STATE_SEND_BULK_AND_STREAM). The RDB channel side + * is signaled separately via asmRdbChannelTransferSucceed. */ +void asmMainChannelTransferSucceed(struct asmTask *task) { if (task == NULL || task->state != ASM_SEND_BULK_AND_STREAM) return; /* The destination starts sending ACKs to keep the main channel alive after @@ -1717,11 +1741,20 @@ void asmSlotSnapshotSucceed(struct asmTask *task) { task->main_channel_client->lastinteraction = server.unixtime; task->state = ASM_SEND_STREAM; +} + +/* Called when the RDB channel finishes streaming the slot snapshot + * successfully. Mirrors asmRdbChannelTransferFailed. Invoked from + * updateSlavesWaitingBgsave() while iterating the rdb-channel slave + * (replstate == SLAVE_STATE_WAIT_BGSAVE_END, RDB_CHILD_TYPE_SOCKET + + * SLAVE_REQ_SLOTS_SNAPSHOT). */ +void asmRdbChannelTransferSucceed(struct asmTask *task) { + if (task == NULL) return; task->rdb_channel_state = ASM_COMPLETED; } /* Called when the RDB channel fails to send the snapshot. */ -void asmSlotSnapshotFailed(struct asmTask *task) { +void asmRdbChannelTransferFailed(struct asmTask *task) { if (task == NULL || task->state != ASM_SEND_BULK_AND_STREAM) return; asmTaskSetFailed(task, "RDB channel - Failed to send slots snapshot"); diff --git a/src/cluster_asm.h b/src/cluster_asm.h index b08837b8e..e45735078 100644 --- a/src/cluster_asm.h +++ b/src/cluster_asm.h @@ -23,8 +23,9 @@ void asmInit(void); void asmBeforeSleep(void); void asmCron(void); void asmSlotSnapshotAndStreamStart(struct asmTask *task); -void asmSlotSnapshotSucceed(struct asmTask *task); -void asmSlotSnapshotFailed(struct asmTask *task); +void asmMainChannelTransferSucceed(struct asmTask *task); +void asmRdbChannelTransferSucceed(struct asmTask *task); +void asmRdbChannelTransferFailed(struct asmTask *task); void asmCallbackOnFreeClient(client *c); int asmMigrateInProgress(void); int asmImportInProgress(void); diff --git a/src/replication.c b/src/replication.c index aaedabd12..4f2d0f089 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1976,9 +1976,9 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { /* This is the main channel of the slave that received the RDB. * Put it online if RDB delivery is successful. */ if (bgsaveerr == C_OK) { - /* Notify the task that the snapshot bulk delivery is done */ + /* Notify the task that the main-channel bulk delivery is done */ if (slave->flags & CLIENT_ASM_MIGRATING) - asmSlotSnapshotSucceed(slave->task); + asmMainChannelTransferSucceed(slave->task); replicaPutOnline(slave); } else { freeClientAsync(slave); @@ -1987,9 +1987,9 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { struct redis_stat buf; if (bgsaveerr != C_OK) { - /* Notify the task that the snapshot bulk delivery failed */ + /* Notify the task that the rdb-channel snapshot transfer failed */ if (slave->flags & CLIENT_ASM_MIGRATING) - asmSlotSnapshotFailed(slave->task); + asmRdbChannelTransferFailed(slave->task); freeClientAsync(slave); serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error"); continue; @@ -2003,6 +2003,11 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { if (type == RDB_CHILD_TYPE_SOCKET) { /* Slots snapshot */ if (slave->slave_req & SLAVE_REQ_SLOTS_SNAPSHOT) { + /* Notify the task that the rdb-channel snapshot transfer + * succeeded so it can mark rdb_channel_state = COMPLETED + * before the slave client is freed. */ + if (slave->flags & CLIENT_ASM_MIGRATING) + asmRdbChannelTransferSucceed(slave->task); serverLog(LL_NOTICE, "Streamed slots snapshot transfer succeeded"); freeClientAsync(slave); continue;