diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 85060d19a49..603a2b94d05 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1986,16 +1986,22 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) } /* - * Read up to the end of WAL starting from the decoding slot's restart_lsn. - * Return true if any meaningful/decodable WAL records are encountered, - * otherwise false. + * Read up to the end of WAL starting from the decoding slot's restart_lsn + * to end_of_wal in order to check if any meaningful/decodable WAL records + * are encountered. scan_cutoff_lsn is the LSN, where we can terminate the + * WAL scan early if we find a decodable WAL record after this LSN. + * + * Returns the last LSN decodable WAL record's LSN if found, otherwise + * returns InvalidXLogRecPtr. */ -bool -LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal) +XLogRecPtr +LogicalReplicationSlotCheckPendingWal(XLogRecPtr end_of_wal, + XLogRecPtr scan_cutoff_lsn) { - bool has_pending_wal = false; + XLogRecPtr last_pending_wal = InvalidXLogRecPtr; Assert(MyReplicationSlot); + Assert(end_of_wal >= scan_cutoff_lsn); PG_TRY(); { @@ -2023,8 +2029,7 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal) /* Invalidate non-timetravel entries */ InvalidateSystemCaches(); - /* Loop until the end of WAL or some changes are processed */ - while (!has_pending_wal && ctx->reader->EndRecPtr < end_of_wal) + while (ctx->reader->EndRecPtr < end_of_wal) { XLogRecord *record; char *errm = NULL; @@ -2037,7 +2042,20 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal) if (record != NULL) LogicalDecodingProcessRecord(ctx, ctx->reader); - has_pending_wal = ctx->processing_required; + if (ctx->processing_required) + { + last_pending_wal = ctx->reader->ReadRecPtr; + + /* + * If we find a decodable WAL after the scan_cutoff_lsn point, + * we can terminate the scan early. + */ + if (last_pending_wal >= scan_cutoff_lsn) + break; + + /* Reset the flag and continue checking */ + ctx->processing_required = false; + } CHECK_FOR_INTERRUPTS(); } @@ -2055,7 +2073,7 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal) } PG_END_TRY(); - return has_pending_wal; + return last_pending_wal; } /* diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c index 697143aec44..b505a6b4fee 100644 --- a/src/backend/utils/adt/pg_upgrade_support.c +++ b/src/backend/utils/adt/pg_upgrade_support.c @@ -282,11 +282,12 @@ binary_upgrade_set_missing_value(PG_FUNCTION_ARGS) * upgraded without data loss. */ Datum -binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS) +binary_upgrade_check_logical_slot_pending_wal(PG_FUNCTION_ARGS) { Name slot_name; XLogRecPtr end_of_wal; - bool found_pending_wal; + XLogRecPtr scan_cutoff_lsn; + XLogRecPtr last_pending_wal; CHECK_IS_BINARY_UPGRADE; @@ -297,6 +298,7 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS) Assert(has_rolreplication(GetUserId())); slot_name = PG_GETARG_NAME(0); + scan_cutoff_lsn = PG_GETARG_LSN(1); /* Acquire the given slot */ ReplicationSlotAcquire(NameStr(*slot_name), true, true); @@ -307,12 +309,16 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS) Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE); end_of_wal = GetFlushRecPtr(NULL); - found_pending_wal = LogicalReplicationSlotHasPendingWal(end_of_wal); + last_pending_wal = LogicalReplicationSlotCheckPendingWal(end_of_wal, + scan_cutoff_lsn); /* Clean up */ ReplicationSlotRelease(); - PG_RETURN_BOOL(!found_pending_wal); + if (XLogRecPtrIsValid(last_pending_wal)) + PG_RETURN_LSN(last_pending_wal); + else + PG_RETURN_NULL(); } /* diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index a8d20a92a98..5c73773bf0e 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -622,7 +622,7 @@ check_and_dump_old_cluster(void) { /* * Logical replication slots can be migrated since PG17. See comments - * atop get_old_cluster_logical_slot_infos(). + * in get_db_rel_and_slot_infos(). */ check_old_cluster_for_valid_slots(); diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index 47e8d1039a2..ad4b1530e6d 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -29,7 +29,7 @@ static void free_rel_infos(RelInfoArr *rel_arr); static void print_db_infos(DbInfoArr *db_arr); static void print_rel_infos(RelInfoArr *rel_arr); static void print_slot_infos(LogicalSlotInfoArr *slot_arr); -static char *get_old_cluster_logical_slot_infos_query(void); +static const char *get_old_cluster_logical_slot_infos_query(ClusterInfo *cluster); static void process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg); @@ -281,7 +281,6 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster) { UpgradeTask *task = upgrade_task_create(); char *rel_infos_query = NULL; - char *logical_slot_infos_query = NULL; if (cluster->dbarr.dbs != NULL) free_db_and_rel_infos(&cluster->dbarr); @@ -306,20 +305,15 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster) */ if (cluster == &old_cluster && GET_MAJOR_VERSION(cluster->major_version) > 1600) - { - logical_slot_infos_query = get_old_cluster_logical_slot_infos_query(); upgrade_task_add_step(task, - logical_slot_infos_query, + get_old_cluster_logical_slot_infos_query(cluster), process_old_cluster_logical_slot_infos, true, NULL); - } upgrade_task_run(task, cluster); upgrade_task_free(task); pg_free(rel_infos_query); - if (logical_slot_infos_query) - pg_free(logical_slot_infos_query); if (cluster == &old_cluster) pg_log(PG_VERBOSE, "\nsource databases:"); @@ -681,17 +675,15 @@ process_rel_infos(DbInfo *dbinfo, PGresult *res, void *arg) * get_db_rel_and_slot_infos()'s UpgradeTask. The status of each logical slot * is checked in check_old_cluster_for_valid_slots(). */ -static char * -get_old_cluster_logical_slot_infos_query(void) +static const char * +get_old_cluster_logical_slot_infos_query(ClusterInfo *cluster) { /* * Fetch the logical replication slot information. The check whether the * slot is considered caught up is done by an upgrade function. This * regards the slot as caught up if we don't find any decodable changes. - * See binary_upgrade_logical_slot_has_caught_up(). - * - * Note that we can't ensure whether the slot is caught up during - * live_check as the new WAL records could be generated. + * The implementation of this check varies depending on the server + * version. * * We intentionally skip checking the WALs for invalidated slots as the * corresponding WALs could have been removed for such slots. @@ -701,21 +693,80 @@ get_old_cluster_logical_slot_infos_query(void) * started and stopped several times causing any temporary slots to be * removed. */ - return psprintf("SELECT slot_name, plugin, two_phase, failover, " - "%s as caught_up, invalidation_reason IS NOT NULL as invalid " - "FROM pg_catalog.pg_replication_slots " - "WHERE slot_type = 'logical' AND " - "database = current_database() AND " - "temporary IS FALSE;", - user_opts.live_check ? "FALSE" : - "(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE " - "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) " - "END)"); + + if (user_opts.live_check) + { + /* + * We skip the caught-up check during live_check. We cannot verify + * whether the slot is caught up in this mode, as new WAL records + * could be generated concurrently. + */ + return "SELECT slot_name, plugin, two_phase, failover, " + "FALSE as caught_up, " + "invalidation_reason IS NOT NULL as invalid " + "FROM pg_catalog.pg_replication_slots " + "WHERE slot_type = 'logical' AND " + "database = current_database() AND " + "temporary IS FALSE"; + } + else if (GET_MAJOR_VERSION(cluster->major_version) >= 1900) + { + /* + * For PG19 and later, we optimize the slot caught-up check to avoid + * reading the same WAL stream multiple times: execute the caught-up + * check only for the slot with the minimum confirmed_flush_lsn, and + * apply the same result to all other slots in the same database. This + * limits the check to at most one logical slot per database. We also + * use the maximum confirmed_flush_lsn among all logical slots on the + * database as an early scan cutoff; finding a decodable WAL record + * beyond this point implies that no slot has caught up. + * + * Note that we don't distinguish slots based on their output plugin. + * If a plugin applies replication origin filters, we might get a + * false positive (i.e., erroneously considering a slot caught up). + * However, such cases are very rare, and the impact of a false + * positive is minimal. + */ + return "WITH check_caught_up AS ( " + " SELECT pg_catalog.binary_upgrade_check_logical_slot_pending_wal(slot_name, " + " MAX(confirmed_flush_lsn) OVER ()) as last_pending_wal " + " FROM pg_replication_slots " + " WHERE slot_type = 'logical' AND " + " database = current_database() AND " + " temporary IS FALSE AND " + " invalidation_reason IS NULL " + " ORDER BY confirmed_flush_lsn ASC " + " LIMIT 1 " + ") " + "SELECT slot_name, plugin, two_phase, failover, " + "CASE WHEN invalidation_reason IS NOT NULL THEN FALSE " + "ELSE last_pending_wal IS NULL OR " + " confirmed_flush_lsn > last_pending_wal " + "END as caught_up, " + "invalidation_reason IS NOT NULL as invalid " + "FROM pg_catalog.pg_replication_slots, check_caught_up " + "WHERE slot_type = 'logical' AND " + "database = current_database() AND " + "temporary IS FALSE "; + } + + /* + * For PG18 and earlier, we call + * binary_upgrade_logical_slot_has_caught_up() for each logical slot. + */ + return "SELECT slot_name, plugin, two_phase, failover, " + "CASE WHEN invalidation_reason IS NOT NULL THEN FALSE " + "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) " + "END as caught_up, " + "invalidation_reason IS NOT NULL as invalid " + "FROM pg_catalog.pg_replication_slots " + "WHERE slot_type = 'logical' AND " + "database = current_database() AND " + "temporary IS FALSE "; } /* - * Callback function for processing results of the query returned by - * get_old_cluster_logical_slot_infos_query(), which is used for + * Callback function for processing results of the query, which is used for * get_db_rel_and_slot_infos()'s UpgradeTask. This function stores the logical * slot information for later use. */ @@ -768,7 +819,7 @@ process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg) * * Note: this function always returns 0 if the old_cluster is PG16 and prior * because we gather slot information only for cluster versions greater than or - * equal to PG17. See get_old_cluster_logical_slot_infos(). + * equal to PG17. See get_db_rel_and_slot_infos(). */ int count_old_cluster_logical_slots(void) diff --git a/src/bin/pg_upgrade/t/003_logical_slots.pl b/src/bin/pg_upgrade/t/003_logical_slots.pl index b9abc3a2e21..15e6d267f2f 100644 --- a/src/bin/pg_upgrade/t/003_logical_slots.pl +++ b/src/bin/pg_upgrade/t/003_logical_slots.pl @@ -64,6 +64,7 @@ $oldpub->safe_psql( 'postgres', qq[ SELECT pg_create_logical_replication_slot('test_slot1', 'test_decoding'); SELECT pg_create_logical_replication_slot('test_slot2', 'test_decoding'); + SELECT pg_create_logical_replication_slot('test_slot3', 'test_decoding'); ]); $oldpub->stop(); @@ -77,7 +78,7 @@ command_checks_all( [@pg_upgrade_cmd], 1, [ - qr/"max_replication_slots" \(1\) must be greater than or equal to the number of logical replication slots \(2\) on the old cluster/ + qr/"max_replication_slots" \(1\) must be greater than or equal to the number of logical replication slots \(3\) on the old cluster/ ], [qr//], 'run of pg_upgrade where the new cluster has insufficient "max_replication_slots"' @@ -85,29 +86,31 @@ command_checks_all( ok(-d $newpub->data_dir . "/pg_upgrade_output.d", "pg_upgrade_output.d/ not removed after pg_upgrade failure"); -# Set 'max_replication_slots' to match the number of slots (2) present on the +# Set 'max_replication_slots' to match the number of slots (3) present on the # old cluster. Both slots will be used for subsequent tests. -$newpub->append_conf('postgresql.conf', "max_replication_slots = 2"); +$newpub->append_conf('postgresql.conf', "max_replication_slots = 3"); # ------------------------------ # TEST: Confirm pg_upgrade fails when the slot still has unconsumed WAL records # Preparations for the subsequent test: -# 1. Generate extra WAL records. At this point neither test_slot1 nor -# test_slot2 has consumed them. +# 1. Generate extra WAL records. At this point none of the slots has consumed them. # # 2. Advance the slot test_slot2 up to the current WAL location, but test_slot1 # still has unconsumed WAL records. # # 3. Emit a non-transactional message. This will cause test_slot2 to detect the # unconsumed WAL record. +# +# 4. Advance the slot test_slots3 up to the current WAL location. $oldpub->start; $oldpub->safe_psql( 'postgres', qq[ CREATE TABLE tbl AS SELECT generate_series(1, 10) AS a; SELECT pg_replication_slot_advance('test_slot2', pg_current_wal_lsn()); - SELECT count(*) FROM pg_logical_emit_message('false', 'prefix', 'This is a non-transactional message'); + SELECT count(*) FROM pg_logical_emit_message('false', 'prefix', 'This is a non-transactional message', true); + SELECT pg_replication_slot_advance('test_slot3', pg_current_wal_lsn()); ]); $oldpub->stop; @@ -138,8 +141,9 @@ find( }, $newpub->data_dir . "/pg_upgrade_output.d"); -# Check the file content. Both slots should be reporting that they have -# unconsumed WAL records. +# Check the file content. While both test_slot1 and test_slot2 should be reporting +# that they have unconsumed WAL records, test_slot3 should not be reported as +# it has caught up. like( slurp_file($slots_filename), qr/The slot \"test_slot1\" has not consumed the WAL yet/m, @@ -148,6 +152,10 @@ like( slurp_file($slots_filename), qr/The slot \"test_slot2\" has not consumed the WAL yet/m, 'the previous test failed due to unconsumed WALs'); +unlike( + slurp_file($slots_filename), + qr/test_slot3/m, + 'caught-up slot is not reported'); # ------------------------------ @@ -162,6 +170,7 @@ $oldpub->safe_psql( 'postgres', qq[ SELECT * FROM pg_drop_replication_slot('test_slot1'); SELECT * FROM pg_drop_replication_slot('test_slot2'); + SELECT * FROM pg_drop_replication_slot('test_slot3'); CREATE PUBLICATION regress_pub FOR ALL TABLES; ]); diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index fb577026666..a09d8a6c645 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -57,6 +57,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202601261 +#define CATALOG_VERSION_NO 202602051 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 5e5e33f64fc..83f6501df38 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11832,9 +11832,9 @@ proparallel => 'u', prorettype => 'void', proargtypes => 'oid', prosrc => 'binary_upgrade_set_next_pg_tablespace_oid' }, { oid => '6312', descr => 'for use by pg_upgrade', - proname => 'binary_upgrade_logical_slot_has_caught_up', provolatile => 'v', - proparallel => 'u', prorettype => 'bool', proargtypes => 'name', - prosrc => 'binary_upgrade_logical_slot_has_caught_up' }, + proname => 'binary_upgrade_check_logical_slot_pending_wal', provolatile => 'v', + proparallel => 'u', prorettype => 'pg_lsn', proargtypes => 'name pg_lsn', + prosrc => 'binary_upgrade_check_logical_slot_pending_wal' }, { oid => '6319', descr => 'for use by pg_upgrade (relation for pg_subscription_rel)', proname => 'binary_upgrade_add_sub_rel_state', proisstrict => 'f', diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 7f03537bda7..bc9d4ece672 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -148,7 +148,8 @@ extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, ReplOriginI extern void ResetLogicalStreamingState(void); extern void UpdateDecodingStats(LogicalDecodingContext *ctx); -extern bool LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal); +extern XLogRecPtr LogicalReplicationSlotCheckPendingWal(XLogRecPtr end_of_wal, + XLogRecPtr scan_cutoff_lsn); extern XLogRecPtr LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto, bool *found_consistent_snapshot);