diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index d69d03b2ef3..6c2304fef33 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6571,6 +6571,8 @@ StartupXLOG(void) if (ArchiveRecoveryRequested) CleanupAfterArchiveRecovery(EndOfLogTLI, EndOfLog, newTLI); + INJECTION_POINT("promotion-after-wal-segment-cleanup", NULL); + /* * Local WAL inserts enabled, so it's time to finish initialization of * commit timestamp. diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 04aa770d981..c931d9b4fa8 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1104,7 +1104,29 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req am_cascading_walsender = RecoveryInProgress(); if (am_cascading_walsender) - GetXLogReplayRecPtr(&currTLI); + { + TimeLineID insertTLI; + + /* + * If the insertion timeline has already been set, use it. + * InsertTimeLineID is set before the WAL segments of the old timeline + * are removed, before SharedRecoveryState switches to + * RECOVERY_STATE_DONE. + * + * There is a window where RecoveryInProgress() still returns true but + * the old timeline's WAL segments have already been removed or + * recycled. Using the WAL insertion timeline avoids attempting to + * read from those removed segments, improving availability, and is a + * safe thing to do as promotion copies the contents in the last + * segment of the old timeline to the first segment of the new + * timeline, up to the switchpoint. + */ + insertTLI = GetWALInsertionTimeLineIfSet(); + if (insertTLI != 0) + currTLI = insertTLI; + else + GetXLogReplayRecPtr(&currTLI); + } else currTLI = GetWALInsertionTimeLine(); diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl index 4421059f100..b3a5bb2694c 100644 --- a/src/test/recovery/t/035_standby_logical_decoding.pl +++ b/src/test/recovery/t/035_standby_logical_decoding.pl @@ -1060,4 +1060,73 @@ is($cascading_stdout, $expected, 'got same expected output from pg_recvlogical decoding session on cascading standby' ); +################################################## +# Test that logical decoding on standby correctly handles a timeline +# change during promotion. This relies on an injection point that +# waits between the moment the segments of the old timeline are removed +# and the moment RecoveryInProgress() would set, catching that a WAL +# sender is still able to decode changes across a promotion. +################################################## + +# Create a logical slot on the cascading standby for this test. +$node_cascading_standby->create_logical_slot_on_standby($node_standby, + 'race_slot', 'testdb'); + +$node_standby->safe_psql('testdb', + qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(10,13) s;] +); +$node_standby->wait_for_replay_catchup($node_cascading_standby); + +$expected = q{BEGIN +table public.decoding_test: INSERT: x[integer]:10 y[text]:'10' +table public.decoding_test: INSERT: x[integer]:11 y[text]:'11' +table public.decoding_test: INSERT: x[integer]:12 y[text]:'12' +table public.decoding_test: INSERT: x[integer]:13 y[text]:'13' +COMMIT}; + +$node_standby->safe_psql('testdb', 'CREATE EXTENSION injection_points;'); +$node_standby->wait_for_replay_catchup($node_cascading_standby); + +# Attach injection point to pause startup after WAL segment cleanup +# but before RecoveryInProgress() flips to false. +$node_cascading_standby->safe_psql('testdb', + "SELECT injection_points_attach('promotion-after-wal-segment-cleanup', 'wait');" +); + +# Promote, wait for the removal of the segments on the old timeline. +$node_cascading_standby->safe_psql('testdb', "SELECT pg_promote(false)"); +$node_cascading_standby->wait_for_event('startup', + 'promotion-after-wal-segment-cleanup'); + +# Start pg_recvlogical. +my ($stdout2, $stderr2); +my $handle2 = IPC::Run::start( + [ + 'pg_recvlogical', + '--dbname' => $node_cascading_standby->connstr('testdb'), + '--slot' => 'race_slot', + '--option' => 'include-xids=0', + '--option' => 'skip-empty-xacts=1', + '--file' => '-', + '--no-loop', + '--start', + ], + '>' => \$stdout2, + '2>' => \$stderr2, + IPC::Run::timeout($default_timeout)); + +# Verify that pg_recvlogical successfully decodes the data while startup +# is still paused in the injection point. +$pump_timeout = IPC::Run::timer($default_timeout); +ok( pump_until($handle2, $pump_timeout, \$stdout2, qr/COMMIT/s), + 'pg_recvlogical works during promotion timeline switch'); +chomp($stdout2); +is($stdout2, $expected, + 'got expected output from pg_recvlogical during promotion timeline switch' +); + +# Resume promotion. +$node_cascading_standby->safe_psql('testdb', + "SELECT injection_points_wakeup('promotion-after-wal-segment-cleanup');"); + done_testing();