diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 3d4ab929f91..04aa770d981 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -209,6 +209,13 @@ static bool waiting_for_ping_response = false; /* Timestamp when walsender received the shutdown request */ static TimestampTz shutdown_request_timestamp = 0; +/* + * Set after queueing the CommandComplete message that ends WAL streaming + * during shutdown. This prevents WalSndDone() and WalSndDoneImmediate() + * from queueing the same message twice. + */ +static bool shutdown_stream_done_queued = false; + /* * While streaming WAL in Copy mode, streamingDoneSending is set to true * after we have sent CopyDone. We should not send any more CopyData messages @@ -3713,15 +3720,17 @@ WalSndDoneImmediate(void) { WalSndState state = MyWalSnd->state; - if (state == WALSNDSTATE_CATCHUP || - state == WALSNDSTATE_STREAMING || - state == WALSNDSTATE_STOPPING) + if ((state == WALSNDSTATE_CATCHUP || + state == WALSNDSTATE_STREAMING || + state == WALSNDSTATE_STOPPING) && + !shutdown_stream_done_queued) { QueryCompletion qc; /* Try to inform receiver that XLOG streaming is done */ SetQueryCompletion(&qc, CMDTAG_COPY, 0); - EndCommand(&qc, DestRemote, false); + EndCommandExtended(&qc, DestRemote, false, true); + shutdown_stream_done_queued = true; /* * Note that the output buffer may be full during the forced shutdown @@ -3778,10 +3787,55 @@ WalSndDone(WalSndSendDataCallback send_data) { QueryCompletion qc; + Assert(!shutdown_stream_done_queued); + /* Inform the standby that XLOG streaming is done */ SetQueryCompletion(&qc, CMDTAG_COPY, 0); - EndCommand(&qc, DestRemote, false); - pq_flush(); + EndCommandExtended(&qc, DestRemote, false, true); + shutdown_stream_done_queued = true; + + /* + * Reset last_reply_timestamp so subsequent WalSndComputeSleeptime() + * calls ignore wal_sender_timeout during shutdown. + */ + last_reply_timestamp = 0; + + /* + * Do not call pq_flush() here, since it can block indefinitely while + * waiting for the socket to become writable, preventing + * wal_sender_shutdown_timeout from being enforced. Instead, use the + * walsender nonblocking flush path so the shutdown timeout continues + * to be checked while the send buffer drains. + */ + for (;;) + { + long sleeptime; + + /* + * During shutdown, die if the shutdown timeout expires. Call this + * before WalSndComputeSleeptime() so the timeout is considered + * when computing sleep time. + */ + WalSndCheckShutdownTimeout(); + + if (!pq_is_send_pending()) + break; + + sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); + + /* Sleep until something happens or we time out */ + WalSndWait(WL_SOCKET_WRITEABLE, sleeptime, + WAIT_EVENT_WAL_SENDER_WRITE_DATA); + + /* Clear any already-pending wakeups */ + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + + /* Try to flush pending output to the client */ + if (pq_flush_if_writable() != 0) + WalSndShutdown(); + } proc_exit(0); } diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c index fb163930c89..bdc3dad3357 100644 --- a/src/backend/tcop/dest.c +++ b/src/backend/tcop/dest.c @@ -165,8 +165,10 @@ CreateDestReceiver(CommandDest dest) * EndCommand - clean up the destination at end of command * ---------------- */ + void -EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output) +EndCommandExtended(const QueryCompletion *qc, CommandDest dest, + bool force_undecorated_output, bool noblock) { char completionTag[COMPLETION_TAG_BUFSIZE]; Size len; @@ -179,7 +181,10 @@ EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_o len = BuildQueryCompletionString(completionTag, qc, force_undecorated_output); - pq_putmessage(PqMsg_CommandComplete, completionTag, len + 1); + if (noblock) + pq_putmessage_noblock(PqMsg_CommandComplete, completionTag, len + 1); + else + pq_putmessage(PqMsg_CommandComplete, completionTag, len + 1); break; case DestNone: @@ -196,6 +201,12 @@ EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_o } } +void +EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output) +{ + EndCommandExtended(qc, dest, force_undecorated_output, false); +} + /* ---------------- * EndReplicationCommand - stripped down version of EndCommand * diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h index 4e4f532d8cc..103f27fc3cb 100644 --- a/src/include/tcop/dest.h +++ b/src/include/tcop/dest.h @@ -136,6 +136,8 @@ extern PGDLLIMPORT DestReceiver *None_Receiver; /* permanent receiver for extern void BeginCommand(CommandTag commandTag, CommandDest dest); extern DestReceiver *CreateDestReceiver(CommandDest dest); +extern void EndCommandExtended(const QueryCompletion *qc, CommandDest dest, + bool force_undecorated_output, bool noblock); extern void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output); extern void EndReplicationCommand(const char *commandTag);