Restructure ASM bgsave success paths

This commit is contained in:
ShubhamTaple 2026-05-16 18:12:48 +05:30
parent b1a53ea21f
commit af8e71bfb2
3 changed files with 56 additions and 17 deletions

View file

@ -1316,15 +1316,35 @@ void asmCallbackOnFreeClient(client *c) {
}
if (c == task->rdb_channel_client) {
/* TODO: Detect whether the bgsave is completed successfully and
* update the state properly. */
task->rdb_channel_state = ASM_COMPLETED;
/* We may not have detected whether the child process has exited yet,
* so we can't determine whether the client has completed the slots
* snapshot transfer. If the RDB channel is interrupted unexpectedly,
* the destination side will also close the main channel.
* So here we just reset the RDB channel client of task. */
task->rdb_channel_client = NULL;
/* Outcome already recorded before freeClient runs:
* - asmRdbChannelTransferSucceed (from updateSlavesWaitingBgsave)
* or the normal-disconnect path below -> ASM_COMPLETED
* - asmRdbChannelTransferFailed -> task->state == ASM_FAILED
* - asmSendStreamEofIfDrained clears c->task before freeing this
* client, so this branch is unreachable on that drain path. */
if (task->rdb_channel_state == ASM_COMPLETED) return;
if (task->state == ASM_FAILED || task->state == ASM_CANCELED) return;
/* During an active migrate, the destination closes the RDB channel as
* soon as it receives SNAPSHOT-EOF, which can free this slave client
* before the source bgsave child exits and updateSlavesWaitingBgsave
* runs. That disconnect is not a failure; mark the rdb channel done
* and let the main channel / handoff logic continue. */
if (task->operation == ASM_MIGRATE &&
(task->state == ASM_SEND_BULK_AND_STREAM ||
task->state == ASM_SEND_STREAM ||
task->state == ASM_HANDOFF_PREP ||
task->state == ASM_HANDOFF ||
task->state == ASM_STREAM_EOF))
{
task->rdb_channel_state = ASM_COMPLETED;
return;
}
asmTaskSetFailed(task,
"RDB channel - Connection closed before snapshot completed");
return;
}
@ -1707,8 +1727,12 @@ void asmSlotSnapshotAndStreamStart(struct asmTask *task) {
task->dest_slots_snapshot_time = server.mstime;
}
/* Called when the RDB channel has succeeded in sending the snapshot. */
void asmSlotSnapshotSucceed(struct asmTask *task) {
/* Called when the main channel finishes bulk delivery, i.e. bgsave succeeded
* and the main-channel slave can now switch from bulk+stream to stream-only.
* Invoked from updateSlavesWaitingBgsave() while iterating the main-channel
* slave (replstate == SLAVE_STATE_SEND_BULK_AND_STREAM). The RDB channel side
* is signaled separately via asmRdbChannelTransferSucceed. */
void asmMainChannelTransferSucceed(struct asmTask *task) {
if (task == NULL || task->state != ASM_SEND_BULK_AND_STREAM) return;
/* The destination starts sending ACKs to keep the main channel alive after
@ -1717,11 +1741,20 @@ void asmSlotSnapshotSucceed(struct asmTask *task) {
task->main_channel_client->lastinteraction = server.unixtime;
task->state = ASM_SEND_STREAM;
}
/* Called when the RDB channel finishes streaming the slot snapshot
* successfully. Mirrors asmRdbChannelTransferFailed. Invoked from
* updateSlavesWaitingBgsave() while iterating the rdb-channel slave
* (replstate == SLAVE_STATE_WAIT_BGSAVE_END, RDB_CHILD_TYPE_SOCKET +
* SLAVE_REQ_SLOTS_SNAPSHOT). */
void asmRdbChannelTransferSucceed(struct asmTask *task) {
if (task == NULL) return;
task->rdb_channel_state = ASM_COMPLETED;
}
/* Called when the RDB channel fails to send the snapshot. */
void asmSlotSnapshotFailed(struct asmTask *task) {
void asmRdbChannelTransferFailed(struct asmTask *task) {
if (task == NULL || task->state != ASM_SEND_BULK_AND_STREAM) return;
asmTaskSetFailed(task, "RDB channel - Failed to send slots snapshot");

View file

@ -23,8 +23,9 @@ void asmInit(void);
void asmBeforeSleep(void);
void asmCron(void);
void asmSlotSnapshotAndStreamStart(struct asmTask *task);
void asmSlotSnapshotSucceed(struct asmTask *task);
void asmSlotSnapshotFailed(struct asmTask *task);
void asmMainChannelTransferSucceed(struct asmTask *task);
void asmRdbChannelTransferSucceed(struct asmTask *task);
void asmRdbChannelTransferFailed(struct asmTask *task);
void asmCallbackOnFreeClient(client *c);
int asmMigrateInProgress(void);
int asmImportInProgress(void);

View file

@ -1976,9 +1976,9 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
/* This is the main channel of the slave that received the RDB.
* Put it online if RDB delivery is successful. */
if (bgsaveerr == C_OK) {
/* Notify the task that the snapshot bulk delivery is done */
/* Notify the task that the main-channel bulk delivery is done */
if (slave->flags & CLIENT_ASM_MIGRATING)
asmSlotSnapshotSucceed(slave->task);
asmMainChannelTransferSucceed(slave->task);
replicaPutOnline(slave);
} else {
freeClientAsync(slave);
@ -1987,9 +1987,9 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
struct redis_stat buf;
if (bgsaveerr != C_OK) {
/* Notify the task that the snapshot bulk delivery failed */
/* Notify the task that the rdb-channel snapshot transfer failed */
if (slave->flags & CLIENT_ASM_MIGRATING)
asmSlotSnapshotFailed(slave->task);
asmRdbChannelTransferFailed(slave->task);
freeClientAsync(slave);
serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error");
continue;
@ -2003,6 +2003,11 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
if (type == RDB_CHILD_TYPE_SOCKET) {
/* Slots snapshot */
if (slave->slave_req & SLAVE_REQ_SLOTS_SNAPSHOT) {
/* Notify the task that the rdb-channel snapshot transfer
* succeeded so it can mark rdb_channel_state = COMPLETED
* before the slave client is freed. */
if (slave->flags & CLIENT_ASM_MIGRATING)
asmRdbChannelTransferSucceed(slave->task);
serverLog(LL_NOTICE, "Streamed slots snapshot transfer succeeded");
freeClientAsync(slave);
continue;