Add CONCURRENTLY option to REPACK

When this flag is specified, REPACK no longer acquires access-exclusive
lock while the new copy of the table is being created; instead, it
creates the initial copy under share-update-exclusive lock only (same as
vacuum, etc), and it follows an MVCC snapshot; it sets up a replication
slot starting at that snapshot, and uses a concurrent background worker
to do logical decoding starting at the snapshot to populate a stash of
concurrent data changes.  Those changes can then be re-applied to the
new copy of the table just before swapping the relfilenodes.
Applications can continue to access the original copy of the table
normally until just before the swap, which is the only point at which
the access-exclusive lock is needed.

There are some loose ends in this commit:
1. concurrent repack needs its own replication slot in order to apply
   logical decoding, which are a scarce resource and easy to run out of.
2. due to the way the historic snapshot is initially set up, only one
   REPACK process can be running at any one time on the whole system.
3. there's a danger of deadlocking (and thus abort) due to the lock
   upgrade required at the final phase.

These issues will be addressed in upcoming commits.

The design and most of the code are by Antonin Houska, heavily based on
his own pg_squeeze third-party implementation.

Author: Antonin Houska <ah@cybertec.at>
Co-authored-by: Mihail Nikalayeu <mihailnikalayeu@gmail.com>
Co-authored-by: Álvaro Herrera <alvherre@kurilemu.de>
Reviewed-by: Matthias van de Meent <boekewurm+postgres@gmail.com>
Reviewed-by: Srinath Reddy Sadipiralla <srinath2133@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Jim Jones <jim.jones@uni-muenster.de>
Reviewed-by: Robert Treat <rob@xzilla.net>
Reviewed-by: Noriyoshi Shinoda <noriyoshi.shinoda@hpe.com>
Reviewed-by: vignesh C <vignesh21@gmail.com>
Discussion: https://postgr.es/m/5186.1706694913@antos
Discussion: https://postgr.es/m/202507262156.sb455angijk6@alvherre.pgsql
This commit is contained in:
Álvaro Herrera 2026-04-06 21:55:08 +02:00
parent 10484c2cc7
commit 28d534e2ae
No known key found for this signature in database
GPG key ID: 1C20ACB9D5C564AE
46 changed files with 3689 additions and 267 deletions

View file

@ -6990,14 +6990,35 @@ FROM pg_stat_get_backend_idset() AS backendid;
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>heap_tuples_written</structfield> <type>bigint</type>
<structfield>heap_tuples_inserted</structfield> <type>bigint</type>
</para>
<para>
Number of heap tuples written.
Number of heap tuples inserted.
This counter only advances when the phase is
<literal>seq scanning heap</literal>,
<literal>index scanning heap</literal>
or <literal>writing new heap</literal>.
<literal>index scanning heap</literal>,
<literal>writing new heap</literal>
or <literal>catch-up</literal>.
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>heap_tuples_updated</structfield> <type>bigint</type>
</para>
<para>
Number of heap tuples updated.
This counter only advances when the phase is <literal>catch-up</literal>.
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>heap_tuples_deleted</structfield> <type>bigint</type>
</para>
<para>
Number of heap tuples deleted.
This counter only advances when the phase is <literal>catch-up</literal>.
</para></entry>
</row>
@ -7078,6 +7099,14 @@ FROM pg_stat_get_backend_idset() AS backendid;
<command>REPACK</command> is currently writing the new heap.
</entry>
</row>
<row>
<entry><literal>catch-up</literal></entry>
<entry>
<command>REPACK CONCURRENTLY</command> is currently processing the DML
commands that other transactions executed during any of the preceding
phases.
</entry>
</row>
<row>
<entry><literal>swapping relation files</literal></entry>
<entry>

View file

@ -1845,15 +1845,17 @@ SELECT pg_advisory_lock(q.id) FROM
<title>Caveats</title>
<para>
Some DDL commands, currently only <link linkend="sql-truncate"><command>TRUNCATE</command></link> and the
table-rewriting forms of <link linkend="sql-altertable"><command>ALTER TABLE</command></link>, are not
Some commands, currently only <link linkend="sql-truncate"><command>TRUNCATE</command></link>, the
table-rewriting forms of <link linkend="sql-altertable"><command>ALTER
TABLE</command></link> and <command>REPACK</command> with
the <literal>CONCURRENTLY</literal> option, are not
MVCC-safe. This means that after the truncation or rewrite commits, the
table will appear empty to concurrent transactions, if they are using a
snapshot taken before the DDL command committed. This will only be an
snapshot taken before the command committed. This will only be an
issue for a transaction that did not access the table in question
before the DDL command started &mdash; any transaction that has done so
before the command started &mdash; any transaction that has done so
would hold at least an <literal>ACCESS SHARE</literal> table lock,
which would block the DDL command until that transaction completes.
which would block the truncating or rewriting command until that transaction completes.
So these commands will not cause any apparent inconsistency in the
table contents for successive queries on the target table, but they
could cause visible inconsistency between the contents of the target

View file

@ -28,6 +28,7 @@ REPACK [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] USING
VERBOSE [ <replaceable class="parameter">boolean</replaceable> ]
ANALYZE [ <replaceable class="parameter">boolean</replaceable> ]
CONCURRENTLY [ <replaceable class="parameter">boolean</replaceable> ]
<phrase>and <replaceable class="parameter">table_and_columns</replaceable> is:</phrase>
@ -54,7 +55,8 @@ REPACK [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] USING
processes every table and materialized view in the current database that
the current user has the <literal>MAINTAIN</literal> privilege on. This
form of <command>REPACK</command> cannot be executed inside a transaction
block.
block. Also, this form is not allowed if
the <literal>CONCURRENTLY</literal> option is used.
</para>
<para>
@ -67,7 +69,8 @@ REPACK [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] USING
When a table is being repacked, an <literal>ACCESS EXCLUSIVE</literal> lock
is acquired on it. This prevents any other database operations (both reads
and writes) from operating on the table until the <command>REPACK</command>
is finished.
is finished. If you want to keep the table accessible during the repacking,
consider using the <literal>CONCURRENTLY</literal> option.
</para>
<refsect2 id="sql-repack-notes-on-clustering" xreflabel="Notes on Clustering">
@ -198,6 +201,117 @@ REPACK [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] USING
</listitem>
</varlistentry>
<varlistentry>
<term><literal>CONCURRENTLY</literal></term>
<listitem>
<para>
Allow other transactions to use the table while it is being repacked.
</para>
<para>
Internally, <command>REPACK</command> copies the contents of the table
(ignoring dead tuples) into a new file, sorted by the specified index,
and also creates a new file for each index. Then it swaps the old and
new files for the table and all the indexes, and deletes the old
files. The <literal>ACCESS EXCLUSIVE</literal> lock is needed to make
sure that the old files do not change during the processing because the
changes would get lost due to the swap.
</para>
<para>
With the <literal>CONCURRENTLY</literal> option, the <literal>ACCESS
EXCLUSIVE</literal> lock is only acquired to swap the table and index
files. The data changes that took place during the creation of the new
table and index files are captured using logical decoding
(<xref linkend="logicaldecoding"/>) and applied before
the <literal>ACCESS EXCLUSIVE</literal> lock is requested. Thus the lock
is typically held only for the time needed to swap the files, which
should be pretty short. However, the time might still be noticeable if
too many data changes have been done to the table while
<command>REPACK</command> was waiting for the lock: those changes must
be processed just before the files are swapped, while the
<literal>ACCESS EXCLUSIVE</literal> lock is being held.
</para>
<para>
Note that <command>REPACK</command> with the
<literal>CONCURRENTLY</literal> option does not try to order the rows
inserted into the table after the repacking started. Also
note <command>REPACK</command> might fail to complete due to DDL
commands executed on the table by other transactions during the
repacking.
</para>
<note>
<para>
In addition to the temporary space requirements explained in
<xref linkend="sql-repack-notes-on-resources"/>,
the <literal>CONCURRENTLY</literal> option can add to the usage of
temporary space a bit more. The reason is that other transactions can
perform DML operations which cannot be applied to the new file until
<command>REPACK</command> has copied all the existing tuples from the
old file. Thus the tuples inserted into the old file during the copying
are also stored separately in a temporary file, until they can be
processed.
</para>
</note>
<para>
The <literal>CONCURRENTLY</literal> option cannot be used in the
following cases:
<itemizedlist>
<listitem>
<para>
The table is <literal>UNLOGGED</literal>.
</para>
</listitem>
<listitem>
<para>
The table is partitioned.
</para>
</listitem>
<listitem>
<para>
The table lacks a primary key and index-based replica identity.
</para>
</listitem>
<listitem>
<para>
The table is a system catalog or a <acronym>TOAST</acronym> table.
</para>
</listitem>
<listitem>
<para>
<command>REPACK</command> is executed inside a transaction block.
</para>
</listitem>
<listitem>
<para>
The <link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link>
configuration parameter does not allow for creation of an additional
replication slot.
</para>
</listitem>
</itemizedlist>
</para>
<warning>
<para>
<command>REPACK</command> with the <literal>CONCURRENTLY</literal>
option is not MVCC-safe, see <xref linkend="mvcc-caveats"/> for
details.
</para>
</warning>
</listitem>
</varlistentry>
<varlistentry>
<term><literal>VERBOSE</literal></term>
<listitem>

View file

@ -23,6 +23,7 @@ SUBDIRS = \
interfaces \
backend/replication/libpqwalreceiver \
backend/replication/pgoutput \
backend/replication/pgrepack \
fe_utils \
bin \
pl \

View file

@ -61,7 +61,8 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
Buffer newbuf, HeapTuple oldtup,
HeapTuple newtup, HeapTuple old_key_tuple,
bool all_visible_cleared, bool new_all_visible_cleared);
bool all_visible_cleared, bool new_all_visible_cleared,
bool walLogical);
#ifdef USE_ASSERT_CHECKING
static void check_lock_if_inplace_updateable_rel(Relation relation,
const ItemPointerData *otid,
@ -2716,6 +2717,7 @@ heap_delete(Relation relation, const ItemPointerData *tid,
uint16 new_infomask,
new_infomask2;
bool changingPart = (options & TABLE_DELETE_CHANGING_PARTITION) != 0;
bool walLogical = (options & TABLE_DELETE_NO_LOGICAL) == 0;
bool have_tuple_lock = false;
bool iscombo;
bool all_visible_cleared = false;
@ -2950,7 +2952,8 @@ l1:
* Compute replica identity tuple before entering the critical section so
* we don't PANIC upon a memory allocation failure.
*/
old_key_tuple = ExtractReplicaIdentity(relation, &tp, true, &old_key_copied);
old_key_tuple = walLogical ?
ExtractReplicaIdentity(relation, &tp, true, &old_key_copied) : NULL;
/*
* If this is the first possibly-multixact-able operation in the current
@ -3040,6 +3043,16 @@ l1:
xlrec.flags |= XLH_DELETE_CONTAINS_OLD_KEY;
}
/*
* Mark the change as not-for-logical-decoding if caller requested so.
*
* (This is used for changes that affect relations not visible to
* other transactions, such as the transient table during concurrent
* repack.)
*/
if (!walLogical)
xlrec.flags |= XLH_DELETE_NO_LOGICAL;
XLogBeginInsert();
XLogRegisterData(&xlrec, SizeOfHeapDelete);
@ -3190,6 +3203,7 @@ heap_update(Relation relation, const ItemPointerData *otid, HeapTuple newtup,
HeapTuple heaptup;
HeapTuple old_key_tuple = NULL;
bool old_key_copied = false;
bool walLogical = (options & TABLE_UPDATE_NO_LOGICAL) == 0;
Page page,
newpage;
BlockNumber block;
@ -4071,7 +4085,8 @@ l2:
newbuf, &oldtup, heaptup,
old_key_tuple,
all_visible_cleared,
all_visible_cleared_new);
all_visible_cleared_new,
walLogical);
if (newbuf != buffer)
{
PageSetLSN(newpage, recptr);
@ -8747,7 +8762,8 @@ static XLogRecPtr
log_heap_update(Relation reln, Buffer oldbuf,
Buffer newbuf, HeapTuple oldtup, HeapTuple newtup,
HeapTuple old_key_tuple,
bool all_visible_cleared, bool new_all_visible_cleared)
bool all_visible_cleared, bool new_all_visible_cleared,
bool walLogical)
{
xl_heap_update xlrec;
xl_heap_header xlhdr;
@ -8758,7 +8774,7 @@ log_heap_update(Relation reln, Buffer oldbuf,
suffixlen = 0;
XLogRecPtr recptr;
Page page = BufferGetPage(newbuf);
bool need_tuple_data = RelationIsLogicallyLogged(reln);
bool need_tuple_data = walLogical && RelationIsLogicallyLogged(reln);
bool init;
int bufflags;

View file

@ -51,6 +51,11 @@
static void reform_and_rewrite_tuple(HeapTuple tuple,
Relation OldHeap, Relation NewHeap,
Datum *values, bool *isnull, RewriteState rwstate);
static void heap_insert_for_repack(HeapTuple tuple, Relation OldHeap,
Relation NewHeap, Datum *values, bool *isnull,
BulkInsertState bistate);
static HeapTuple reform_tuple(HeapTuple tuple, Relation OldHeap,
Relation NewHeap, Datum *values, bool *isnull);
static bool SampleHeapTupleVisible(TableScanDesc scan, Buffer buffer,
HeapTuple tuple,
@ -589,6 +594,7 @@ static void
heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
Relation OldIndex, bool use_sort,
TransactionId OldestXmin,
Snapshot snapshot,
TransactionId *xid_cutoff,
MultiXactId *multi_cutoff,
double *num_tuples,
@ -596,6 +602,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
double *tups_recently_dead)
{
RewriteState rwstate;
BulkInsertState bistate;
IndexScanDesc indexScan;
TableScanDesc tableScan;
HeapScanDesc heapScan;
@ -609,6 +616,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
bool *isnull;
BufferHeapTupleTableSlot *hslot;
BlockNumber prev_cblock = InvalidBlockNumber;
bool concurrent = snapshot != NULL;
/* Remember if it's a system catalog */
is_system_catalog = IsSystemRelation(OldHeap);
@ -624,10 +632,21 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
values = palloc_array(Datum, natts);
isnull = palloc_array(bool, natts);
/* Initialize the rewrite operation */
rwstate = begin_heap_rewrite(OldHeap, NewHeap, OldestXmin, *xid_cutoff,
*multi_cutoff);
/*
* In non-concurrent mode, initialize the rewrite operation. This is not
* needed in concurrent mode.
*/
if (!concurrent)
rwstate = begin_heap_rewrite(OldHeap, NewHeap, OldestXmin,
*xid_cutoff, *multi_cutoff);
else
rwstate = NULL;
/* In concurrent mode, prepare for bulk-insert operation. */
if (concurrent)
bistate = GetBulkInsertState();
else
bistate = NULL;
/* Set up sorting if wanted */
if (use_sort)
@ -641,6 +660,9 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
* Prepare to scan the OldHeap. To ensure we see recently-dead tuples
* that still need to be copied, we scan with SnapshotAny and use
* HeapTupleSatisfiesVacuum for the visibility test.
*
* In the CONCURRENTLY case, we do regular MVCC visibility tests, using
* the snapshot passed by the caller.
*/
if (OldIndex != NULL && !use_sort)
{
@ -657,7 +679,9 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
tableScan = NULL;
heapScan = NULL;
indexScan = index_beginscan(OldHeap, OldIndex, SnapshotAny, NULL, 0, 0,
indexScan = index_beginscan(OldHeap, OldIndex,
snapshot ? snapshot : SnapshotAny,
NULL, 0, 0,
SO_NONE);
index_rescan(indexScan, NULL, 0, NULL, 0);
}
@ -667,7 +691,9 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
PROGRESS_REPACK_PHASE_SEQ_SCAN_HEAP);
tableScan = table_beginscan(OldHeap, SnapshotAny, 0, (ScanKey) NULL,
tableScan = table_beginscan(OldHeap,
snapshot ? snapshot : SnapshotAny,
0, (ScanKey) NULL,
SO_NONE);
heapScan = (HeapScanDesc) tableScan;
indexScan = NULL;
@ -744,83 +770,94 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
buf = hslot->buffer;
/*
* To be able to guarantee that we can set the hint bit, acquire an
* exclusive lock on the old buffer. We need the hint bits, set in
* heapam_relation_copy_for_cluster() -> HeapTupleSatisfiesVacuum(),
* to be set, as otherwise reform_and_rewrite_tuple() ->
* rewrite_heap_tuple() will get confused. Specifically,
* rewrite_heap_tuple() checks for HEAP_XMAX_INVALID in the old tuple
* to determine whether to check the old-to-new mapping hash table.
*
* It'd be better if we somehow could avoid setting hint bits on the
* old page. One reason to use VACUUM FULL are very bloated tables -
* rewriting most of the old table during VACUUM FULL doesn't exactly
* help...
* In concurrent mode, our table or index scan has used regular MVCC
* visibility test against a snapshot passed by caller; therefore we
* don't need another visibility test. In non-concurrent mode
* however, we must test the visibility of each tuple we read.
*/
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
switch (HeapTupleSatisfiesVacuum(tuple, OldestXmin, buf))
if (!concurrent)
{
case HEAPTUPLE_DEAD:
/* Definitely dead */
isdead = true;
break;
case HEAPTUPLE_RECENTLY_DEAD:
*tups_recently_dead += 1;
pg_fallthrough;
case HEAPTUPLE_LIVE:
/* Live or recently dead, must copy it */
isdead = false;
break;
case HEAPTUPLE_INSERT_IN_PROGRESS:
/*
* To be able to guarantee that we can set the hint bit, acquire
* an exclusive lock on the old buffer. We need the hint bits, set
* in heapam_relation_copy_for_cluster() ->
* HeapTupleSatisfiesVacuum(), to be set, as otherwise
* reform_and_rewrite_tuple() -> rewrite_heap_tuple() will get
* confused. Specifically, rewrite_heap_tuple() checks for
* HEAP_XMAX_INVALID in the old tuple to determine whether to
* check the old-to-new mapping hash table.
*
* It'd be better if we somehow could avoid setting hint bits on
* the old page. One reason to use VACUUM FULL are very bloated
* tables - rewriting most of the old table during VACUUM FULL
* doesn't exactly help...
*/
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
/*
* Since we hold exclusive lock on the relation, normally the
* only way to see this is if it was inserted earlier in our
* own transaction. However, it can happen in system
* catalogs, since we tend to release write lock before commit
* there. Give a warning if neither case applies; but in any
* case we had better copy it.
*/
if (!is_system_catalog &&
!TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(tuple->t_data)))
elog(WARNING, "concurrent insert in progress within table \"%s\"",
RelationGetRelationName(OldHeap));
/* treat as live */
isdead = false;
break;
case HEAPTUPLE_DELETE_IN_PROGRESS:
/*
* Similar situation to INSERT_IN_PROGRESS case.
*/
if (!is_system_catalog &&
!TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetUpdateXid(tuple->t_data)))
elog(WARNING, "concurrent delete in progress within table \"%s\"",
RelationGetRelationName(OldHeap));
/* treat as recently dead */
*tups_recently_dead += 1;
isdead = false;
break;
default:
elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
isdead = false; /* keep compiler quiet */
break;
}
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
if (isdead)
{
*tups_vacuumed += 1;
/* heap rewrite module still needs to see it... */
if (rewrite_heap_dead_tuple(rwstate, tuple))
switch (HeapTupleSatisfiesVacuum(tuple, OldestXmin, buf))
{
/* A previous recently-dead tuple is now known dead */
*tups_vacuumed += 1;
*tups_recently_dead -= 1;
case HEAPTUPLE_DEAD:
/* Definitely dead */
isdead = true;
break;
case HEAPTUPLE_RECENTLY_DEAD:
*tups_recently_dead += 1;
pg_fallthrough;
case HEAPTUPLE_LIVE:
/* Live or recently dead, must copy it */
isdead = false;
break;
case HEAPTUPLE_INSERT_IN_PROGRESS:
/*
* As long as we hold exclusive lock on the relation,
* normally the only way to see this is if it was inserted
* earlier in our own transaction. However, it can happen
* in system catalogs, since we tend to release write lock
* before commit there. Give a warning if neither case
* applies; but in any case we had better copy it.
*/
if (!is_system_catalog &&
!TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(tuple->t_data)))
elog(WARNING, "concurrent insert in progress within table \"%s\"",
RelationGetRelationName(OldHeap));
/* treat as live */
isdead = false;
break;
case HEAPTUPLE_DELETE_IN_PROGRESS:
/*
* Similar situation to INSERT_IN_PROGRESS case.
*/
if (!is_system_catalog &&
!TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetUpdateXid(tuple->t_data)))
elog(WARNING, "concurrent delete in progress within table \"%s\"",
RelationGetRelationName(OldHeap));
/* treat as recently dead */
*tups_recently_dead += 1;
isdead = false;
break;
default:
elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
isdead = false; /* keep compiler quiet */
break;
}
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
if (isdead)
{
*tups_vacuumed += 1;
/* heap rewrite module still needs to see it... */
if (rewrite_heap_dead_tuple(rwstate, tuple))
{
/* A previous recently-dead tuple is now known dead */
*tups_vacuumed += 1;
*tups_recently_dead -= 1;
}
continue;
}
continue;
}
*num_tuples += 1;
@ -839,12 +876,16 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
{
const int ct_index[] = {
PROGRESS_REPACK_HEAP_TUPLES_SCANNED,
PROGRESS_REPACK_HEAP_TUPLES_WRITTEN
PROGRESS_REPACK_HEAP_TUPLES_INSERTED
};
int64 ct_val[2];
reform_and_rewrite_tuple(tuple, OldHeap, NewHeap,
values, isnull, rwstate);
if (!concurrent)
reform_and_rewrite_tuple(tuple, OldHeap, NewHeap,
values, isnull, rwstate);
else
heap_insert_for_repack(tuple, OldHeap, NewHeap,
values, isnull, bistate);
/*
* In indexscan mode and also VACUUM FULL, report increase in
@ -892,12 +933,17 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
break;
n_tuples += 1;
reform_and_rewrite_tuple(tuple,
OldHeap, NewHeap,
values, isnull,
rwstate);
if (!concurrent)
reform_and_rewrite_tuple(tuple,
OldHeap, NewHeap,
values, isnull,
rwstate);
else
heap_insert_for_repack(tuple, OldHeap, NewHeap,
values, isnull, bistate);
/* Report n_tuples */
pgstat_progress_update_param(PROGRESS_REPACK_HEAP_TUPLES_WRITTEN,
pgstat_progress_update_param(PROGRESS_REPACK_HEAP_TUPLES_INSERTED,
n_tuples);
}
@ -905,7 +951,10 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
}
/* Write out any remaining tuples, and fsync if needed */
end_heap_rewrite(rwstate);
if (rwstate)
end_heap_rewrite(rwstate);
if (bistate)
FreeBulkInsertState(bistate);
/* Clean up */
pfree(values);
@ -2303,27 +2352,84 @@ static void
reform_and_rewrite_tuple(HeapTuple tuple,
Relation OldHeap, Relation NewHeap,
Datum *values, bool *isnull, RewriteState rwstate)
{
HeapTuple newtuple;
newtuple = reform_tuple(tuple, OldHeap, NewHeap, values, isnull);
/* The heap rewrite module does the rest */
rewrite_heap_tuple(rwstate, tuple, newtuple);
heap_freetuple(newtuple);
}
/*
* Insert tuple when processing REPACK CONCURRENTLY.
*
* rewriteheap.c is not used in the CONCURRENTLY case because it'd be
* difficult to do the same in the catch-up phase (as the logical
* decoding does not provide us with sufficient visibility
* information). Thus we must use heap_insert() both during the
* catch-up and here.
*
* We pass the NO_LOGICAL flag to heap_insert() in order to skip logical
* decoding: as soon as REPACK CONCURRENTLY swaps the relation files, it drops
* this relation, so no logical replication subscription should need the data.
*
* BulkInsertState is used because many tuples are inserted in the typical
* case.
*/
static void
heap_insert_for_repack(HeapTuple tuple, Relation OldHeap, Relation NewHeap,
Datum *values, bool *isnull, BulkInsertState bistate)
{
HeapTuple newtuple;
newtuple = reform_tuple(tuple, OldHeap, NewHeap, values, isnull);
heap_insert(NewHeap, newtuple, GetCurrentCommandId(true),
HEAP_INSERT_NO_LOGICAL, bistate);
heap_freetuple(newtuple);
}
/*
* Subroutine for reform_and_rewrite_tuple and heap_insert_for_repack.
*
* Deform the given tuple, set values of dropped columns to NULL, form a new
* tuple and return it. If no attributes need to be changed in this way, a
* copy of the original tuple is returned. Caller is responsible for freeing
* the returned tuple.
*
* XXX this coding assumes that both relations have the same tupledesc.
*/
static HeapTuple
reform_tuple(HeapTuple tuple, Relation OldHeap, Relation NewHeap,
Datum *values, bool *isnull)
{
TupleDesc oldTupDesc = RelationGetDescr(OldHeap);
TupleDesc newTupDesc = RelationGetDescr(NewHeap);
HeapTuple copiedTuple;
int i;
bool needs_reform = false;
/* Skip work if the tuple doesn't need any attributes changed */
for (int i = 0; i < newTupDesc->natts; i++)
{
if (TupleDescCompactAttr(newTupDesc, i)->attisdropped &&
!heap_attisnull(tuple, i + 1, newTupDesc))
needs_reform = true;
}
if (!needs_reform)
return heap_copytuple(tuple);
heap_deform_tuple(tuple, oldTupDesc, values, isnull);
/* Be sure to null out any dropped columns */
for (i = 0; i < newTupDesc->natts; i++)
for (int i = 0; i < newTupDesc->natts; i++)
{
if (TupleDescCompactAttr(newTupDesc, i)->attisdropped)
isnull[i] = true;
}
copiedTuple = heap_form_tuple(newTupDesc, values, isnull);
/* The heap rewrite module does the rest */
rewrite_heap_tuple(rwstate, tuple, copiedTuple);
heap_freetuple(copiedTuple);
return heap_form_tuple(newTupDesc, values, isnull);
}
/*

View file

@ -621,9 +621,9 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
uint32 options = HEAP_INSERT_SKIP_FSM;
/*
* While rewriting the heap for VACUUM FULL / CLUSTER, make sure data
* for the TOAST table are not logically decoded. The main heap is
* WAL-logged as XLOG FPI records, which are not logically decoded.
* While rewriting the heap for REPACK, make sure data for the TOAST
* table are not logically decoded. The main heap is WAL-logged as
* XLOG FPI records, which are not logically decoded.
*/
options |= HEAP_INSERT_NO_LOGICAL;

View file

@ -1343,16 +1343,19 @@ CREATE VIEW pg_stat_progress_repack AS
WHEN 2 THEN 'index scanning heap'
WHEN 3 THEN 'sorting tuples'
WHEN 4 THEN 'writing new heap'
WHEN 5 THEN 'swapping relation files'
WHEN 6 THEN 'rebuilding index'
WHEN 7 THEN 'performing final cleanup'
WHEN 5 THEN 'catch-up'
WHEN 6 THEN 'swapping relation files'
WHEN 7 THEN 'rebuilding index'
WHEN 8 THEN 'performing final cleanup'
END AS phase,
CAST(S.param3 AS oid) AS repack_index_relid,
S.param4 AS heap_tuples_scanned,
S.param5 AS heap_tuples_written,
S.param6 AS heap_blks_total,
S.param7 AS heap_blks_scanned,
S.param8 AS index_rebuild_count
S.param5 AS heap_tuples_inserted,
S.param6 AS heap_tuples_updated,
S.param7 AS heap_tuples_deleted,
S.param8 AS heap_blks_total,
S.param9 AS heap_blks_scanned,
S.param10 AS index_rebuild_count
FROM pg_stat_get_progress_info('REPACK') AS S
LEFT JOIN pg_database D ON S.datid = D.oid;
@ -1370,7 +1373,7 @@ CREATE VIEW pg_stat_progress_cluster AS
phase,
repack_index_relid AS cluster_index_relid,
heap_tuples_scanned,
heap_tuples_written,
heap_tuples_inserted + heap_tuples_updated AS heap_tuples_written,
heap_blks_total,
heap_blks_scanned,
index_rebuild_count

View file

@ -51,6 +51,7 @@ OBJS = \
propgraphcmds.o \
publicationcmds.o \
repack.o \
repack_worker.o \
schemacmds.o \
seclabel.o \
sequence.o \

View file

@ -893,6 +893,7 @@ static void
refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
{
finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
true, /* reindex */
RecentXmin, ReadNextMultiXactId(), relpersistence);
}

View file

@ -39,6 +39,7 @@ backend_sources += files(
'propgraphcmds.c',
'publicationcmds.c',
'repack.c',
'repack_worker.c',
'schemacmds.c',
'seclabel.c',
'sequence.c',

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,533 @@
/*-------------------------------------------------------------------------
*
* repack_worker.c
* Implementation of the background worker for ad-hoc logical decoding
* during REPACK (CONCURRENTLY).
*
*
* Copyright (c) 2026, PostgreSQL Global Development Group
*
*
* IDENTIFICATION
* src/backend/commands/repack_worker.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/table.h"
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "access/xlogwait.h"
#include "commands/repack.h"
#include "commands/repack_internal.h"
#include "libpq/pqmq.h"
#include "replication/snapbuild.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
#include "utils/memutils.h"
#define REPL_PLUGIN_NAME "pgrepack"
static void RepackWorkerShutdown(int code, Datum arg);
static LogicalDecodingContext *repack_setup_logical_decoding(Oid relid);
static void repack_cleanup_logical_decoding(LogicalDecodingContext *ctx);
static void export_initial_snapshot(Snapshot snapshot,
DecodingWorkerShared *shared);
static bool decode_concurrent_changes(LogicalDecodingContext *ctx,
DecodingWorkerShared *shared);
/* Is this process a REPACK worker? */
static bool am_repack_worker = false;
/* The WAL segment being decoded. */
static XLogSegNo repack_current_segment = 0;
/*
* Keep track of the table we're processing, to skip logical decoding of data
* from other relations.
*/
static RelFileLocator repacked_rel_locator = {.relNumber = InvalidOid};
static RelFileLocator repacked_rel_toast_locator = {.relNumber = InvalidOid};
/* REPACK decoding worker entry point */
void
RepackWorkerMain(Datum main_arg)
{
dsm_segment *seg;
DecodingWorkerShared *shared;
shm_mq *mq;
shm_mq_handle *mqh;
LogicalDecodingContext *decoding_ctx;
SharedFileSet *sfs;
Snapshot snapshot;
am_repack_worker = true;
/*
* Override the default bgworker_die() with die() so we can use
* CHECK_FOR_INTERRUPTS().
*/
pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();
seg = dsm_attach(DatumGetUInt32(main_arg));
if (seg == NULL)
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not map dynamic shared memory segment"));
shared = (DecodingWorkerShared *) dsm_segment_address(seg);
shared->dsm_seg = seg;
/* Arrange to signal the leader if we exit. */
before_shmem_exit(RepackWorkerShutdown, PointerGetDatum(shared));
/*
* Join locking group - see the comments around the call of
* start_repack_decoding_worker().
*/
if (!BecomeLockGroupMember(shared->backend_proc, shared->backend_pid))
return; /* The leader is not running anymore. */
/*
* Setup a queue to send error messages to the backend that launched this
* worker.
*/
mq = (shm_mq *) (char *) BUFFERALIGN(shared->error_queue);
shm_mq_set_sender(mq, MyProc);
mqh = shm_mq_attach(mq, seg, NULL);
pq_redirect_to_shm_mq(seg, mqh);
pq_set_parallel_leader(shared->backend_pid,
shared->backend_proc_number);
/* Connect to the database. */
BackgroundWorkerInitializeConnectionByOid(shared->dbid, shared->roleid, 0);
/*
* Transaction is needed to open relation, and it also provides us with a
* resource owner.
*/
StartTransactionCommand();
shared = (DecodingWorkerShared *) dsm_segment_address(seg);
/*
* Not sure the spinlock is needed here - the backend should not change
* anything in the shared memory until we have serialized the snapshot.
*/
SpinLockAcquire(&shared->mutex);
Assert(!XLogRecPtrIsValid(shared->lsn_upto));
sfs = &shared->sfs;
SpinLockRelease(&shared->mutex);
SharedFileSetAttach(sfs, seg);
/*
* Prepare to capture the concurrent data changes ourselves.
*/
decoding_ctx = repack_setup_logical_decoding(shared->relid);
/* Announce that we're ready. */
SpinLockAcquire(&shared->mutex);
shared->initialized = true;
SpinLockRelease(&shared->mutex);
ConditionVariableSignal(&shared->cv);
/* There doesn't seem to a nice API to set these */
XactIsoLevel = XACT_REPEATABLE_READ;
XactReadOnly = true;
/* Build the initial snapshot and export it. */
snapshot = SnapBuildInitialSnapshot(decoding_ctx->snapshot_builder);
export_initial_snapshot(snapshot, shared);
/*
* Only historic snapshots should be used now. Do not let us restrict the
* progress of xmin horizon.
*/
InvalidateCatalogSnapshot();
for (;;)
{
bool stop = decode_concurrent_changes(decoding_ctx, shared);
if (stop)
break;
}
/* Cleanup. */
repack_cleanup_logical_decoding(decoding_ctx);
CommitTransactionCommand();
}
/*
* See ParallelWorkerShutdown for details.
*/
static void
RepackWorkerShutdown(int code, Datum arg)
{
DecodingWorkerShared *shared = (DecodingWorkerShared *) DatumGetPointer(arg);
SendProcSignal(shared->backend_pid,
PROCSIG_REPACK_MESSAGE,
shared->backend_proc_number);
dsm_detach(shared->dsm_seg);
}
bool
AmRepackWorker(void)
{
return am_repack_worker;
}
/*
* This function is much like pg_create_logical_replication_slot() except that
* the new slot is neither released (if anyone else could read changes from
* our slot, we could miss changes other backends do while we copy the
* existing data into temporary table), nor persisted (it's easier to handle
* crash by restarting all the work from scratch).
*/
static LogicalDecodingContext *
repack_setup_logical_decoding(Oid relid)
{
Relation rel;
Oid toastrelid;
LogicalDecodingContext *ctx;
NameData slotname;
RepackDecodingState *dstate;
MemoryContext oldcxt;
/*
* REPACK CONCURRENTLY is not allowed in a transaction block, so this
* should never fire.
*/
Assert(!TransactionIdIsValid(GetTopTransactionIdIfAny()));
/*
* Make sure we can use logical decoding.
*/
CheckSlotPermissions();
CheckLogicalDecodingRequirements();
/*
* A single backend should not execute multiple REPACK commands at a time,
* so use PID to make the slot unique.
*
* RS_TEMPORARY so that the slot gets cleaned up on ERROR.
*/
snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid);
ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, false,
false);
EnsureLogicalDecodingEnabled();
/*
* Neither prepare_write nor do_write callback nor update_progress is
* useful for us.
*/
ctx = CreateInitDecodingContext(REPL_PLUGIN_NAME,
NIL,
true,
InvalidXLogRecPtr,
XL_ROUTINE(.page_read = read_local_xlog_page,
.segment_open = wal_segment_open,
.segment_close = wal_segment_close),
NULL, NULL, NULL);
/*
* We don't have control on setting fast_forward, so at least check it.
*/
Assert(!ctx->fast_forward);
/* Avoid logical decoding of other relations. */
rel = table_open(relid, AccessShareLock);
repacked_rel_locator = rel->rd_locator;
toastrelid = rel->rd_rel->reltoastrelid;
if (OidIsValid(toastrelid))
{
Relation toastrel;
/* Avoid logical decoding of other TOAST relations. */
toastrel = table_open(toastrelid, AccessShareLock);
repacked_rel_toast_locator = toastrel->rd_locator;
table_close(toastrel, AccessShareLock);
}
table_close(rel, AccessShareLock);
DecodingContextFindStartpoint(ctx);
/*
* decode_concurrent_changes() needs non-blocking callback.
*/
ctx->reader->routine.page_read = read_local_xlog_page_no_wait;
/* Some WAL records should have been read. */
Assert(ctx->reader->EndRecPtr != InvalidXLogRecPtr);
/*
* Initialize repack_current_segment so that we can notice WAL segment
* boundaries.
*/
XLByteToSeg(ctx->reader->EndRecPtr, repack_current_segment,
wal_segment_size);
/* Our private state belongs to the decoding context. */
oldcxt = MemoryContextSwitchTo(ctx->context);
/*
* read_local_xlog_page_no_wait() needs to be able to indicate the end of
* WAL.
*/
ctx->reader->private_data = palloc0_object(ReadLocalXLogPageNoWaitPrivate);
dstate = palloc0_object(RepackDecodingState);
MemoryContextSwitchTo(oldcxt);
#ifdef USE_ASSERT_CHECKING
dstate->relid = relid;
#endif
dstate->change_cxt = AllocSetContextCreate(ctx->context,
"REPACK - change",
ALLOCSET_DEFAULT_SIZES);
/* The file will be set as soon as we have it opened. */
dstate->file = NULL;
/*
* Memory context and resource owner for long-lived resources.
*/
dstate->worker_cxt = CurrentMemoryContext;
dstate->worker_resowner = CurrentResourceOwner;
ctx->output_writer_private = dstate;
return ctx;
}
static void
repack_cleanup_logical_decoding(LogicalDecodingContext *ctx)
{
RepackDecodingState *dstate;
dstate = (RepackDecodingState *) ctx->output_writer_private;
if (dstate->slot)
ExecDropSingleTupleTableSlot(dstate->slot);
FreeDecodingContext(ctx);
ReplicationSlotDropAcquired();
}
/*
* Make snapshot available to the backend that launched the decoding worker.
*/
static void
export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared)
{
char fname[MAXPGPATH];
BufFile *file;
Size snap_size;
char *snap_space;
snap_size = EstimateSnapshotSpace(snapshot);
snap_space = (char *) palloc(snap_size);
SerializeSnapshot(snapshot, snap_space);
DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
file = BufFileCreateFileSet(&shared->sfs.fs, fname);
/* To make restoration easier, write the snapshot size first. */
BufFileWrite(file, &snap_size, sizeof(snap_size));
BufFileWrite(file, snap_space, snap_size);
pfree(snap_space);
BufFileClose(file);
/* Increase the counter to tell the backend that the file is available. */
SpinLockAcquire(&shared->mutex);
shared->last_exported++;
SpinLockRelease(&shared->mutex);
ConditionVariableSignal(&shared->cv);
}
/*
* Decode logical changes from the WAL sequence and store them to a file.
*
* If true is returned, there is no more work for the worker.
*/
static bool
decode_concurrent_changes(LogicalDecodingContext *ctx,
DecodingWorkerShared *shared)
{
RepackDecodingState *dstate;
XLogRecPtr lsn_upto;
bool done;
char fname[MAXPGPATH];
dstate = (RepackDecodingState *) ctx->output_writer_private;
/* Open the output file. */
DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
dstate->file = BufFileCreateFileSet(&shared->sfs.fs, fname);
SpinLockAcquire(&shared->mutex);
lsn_upto = shared->lsn_upto;
done = shared->done;
SpinLockRelease(&shared->mutex);
while (true)
{
XLogRecord *record;
XLogSegNo segno_new;
char *errm = NULL;
XLogRecPtr end_lsn;
CHECK_FOR_INTERRUPTS();
record = XLogReadRecord(ctx->reader, &errm);
if (record)
{
LogicalDecodingProcessRecord(ctx, ctx->reader);
/*
* If WAL segment boundary has been crossed, inform the decoding
* system that the catalog_xmin can advance.
*/
end_lsn = ctx->reader->EndRecPtr;
XLByteToSeg(end_lsn, segno_new, wal_segment_size);
if (segno_new != repack_current_segment)
{
LogicalConfirmReceivedLocation(end_lsn);
elog(DEBUG1, "REPACK: confirmed receive location %X/%X",
(uint32) (end_lsn >> 32), (uint32) end_lsn);
repack_current_segment = segno_new;
}
}
else
{
ReadLocalXLogPageNoWaitPrivate *priv;
if (errm)
ereport(ERROR,
errmsg("%s", errm));
/*
* In the decoding loop we do not want to get blocked when there
* is no more WAL available, otherwise the loop would become
* uninterruptible.
*/
priv = (ReadLocalXLogPageNoWaitPrivate *) ctx->reader->private_data;
if (priv->end_of_wal)
/* Do not miss the end of WAL condition next time. */
priv->end_of_wal = false;
else
ereport(ERROR,
errmsg("could not read WAL record"));
}
/*
* Whether we could read new record or not, keep checking if
* 'lsn_upto' was specified.
*/
if (!XLogRecPtrIsValid(lsn_upto))
{
SpinLockAcquire(&shared->mutex);
lsn_upto = shared->lsn_upto;
/* 'done' should be set at the same time as 'lsn_upto' */
done = shared->done;
SpinLockRelease(&shared->mutex);
}
if (XLogRecPtrIsValid(lsn_upto) &&
ctx->reader->EndRecPtr >= lsn_upto)
break;
if (record == NULL)
{
int64 timeout = 0;
WaitLSNResult res;
/*
* Before we retry reading, wait until new WAL is flushed.
*
* There is a race condition such that the backend executing
* REPACK determines 'lsn_upto', but before it sets the shared
* variable, we reach the end of WAL. In that case we'd need to
* wait until the next WAL flush (unrelated to REPACK). Although
* that should not be a problem in a busy system, it might be
* noticeable in other cases, including regression tests (which
* are not necessarily executed in parallel). Therefore it makes
* sense to use timeout.
*
* If lsn_upto is valid, WAL records having LSN lower than that
* should already have been flushed to disk.
*/
if (!XLogRecPtrIsValid(lsn_upto))
timeout = 100L;
res = WaitForLSN(WAIT_LSN_TYPE_PRIMARY_FLUSH,
ctx->reader->EndRecPtr + 1,
timeout);
if (res != WAIT_LSN_RESULT_SUCCESS &&
res != WAIT_LSN_RESULT_TIMEOUT)
ereport(ERROR,
errmsg("waiting for WAL failed"));
}
}
/*
* Close the file so we can make it available to the backend.
*/
BufFileClose(dstate->file);
dstate->file = NULL;
SpinLockAcquire(&shared->mutex);
shared->lsn_upto = InvalidXLogRecPtr;
shared->last_exported++;
SpinLockRelease(&shared->mutex);
ConditionVariableSignal(&shared->cv);
return done;
}
/*
* Does the WAL record contain a data change that this backend does not need
* to decode on behalf of REPACK (CONCURRENTLY)?
*/
bool
change_useless_for_repack(XLogRecordBuffer *buf)
{
XLogReaderState *r = buf->record;
RelFileLocator locator;
/* TOAST locator should not be set unless the main is. */
Assert(!OidIsValid(repacked_rel_toast_locator.relNumber) ||
OidIsValid(repacked_rel_locator.relNumber));
/*
* Backends not involved in REPACK (CONCURRENTLY) should not do the
* filtering.
*/
if (!OidIsValid(repacked_rel_locator.relNumber))
return false;
/*
* If the record does not contain the block 0, it's probably not INSERT /
* UPDATE / DELETE. In any case, we do not have enough information to
* filter the change out.
*/
if (!XLogRecGetBlockTagExtended(r, 0, &locator, NULL, NULL, NULL))
return false;
/*
* Decode the change if it belongs to the table we are repacking, or if it
* belongs to its TOAST relation.
*/
if (RelFileLocatorEquals(locator, repacked_rel_locator))
return false;
if (OidIsValid(repacked_rel_toast_locator.relNumber) &&
RelFileLocatorEquals(locator, repacked_rel_toast_locator))
return false;
/* Filter out changes of other tables. */
return true;
}

View file

@ -6058,6 +6058,7 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode,
finish_heap_swap(tab->relid, OIDNewHeap,
false, false, true,
!OidIsValid(tab->newTableSpace),
true, /* reindex */
RecentXmin,
ReadNextMultiXactId(),
persistence);

View file

@ -127,7 +127,7 @@ static void vac_truncate_clog(TransactionId frozenXID,
TransactionId lastSaneFrozenXid,
MultiXactId lastSaneMinMulti);
static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams params,
BufferAccessStrategy bstrategy);
BufferAccessStrategy bstrategy, bool isTopLevel);
static double compute_parallel_delay(void);
static VacOptValue get_vacoptval_from_boolean(DefElem *def);
static bool vac_tid_reaped(ItemPointer itemptr, void *state);
@ -630,7 +630,8 @@ vacuum(List *relations, const VacuumParams *params, BufferAccessStrategy bstrate
if (params->options & VACOPT_VACUUM)
{
if (!vacuum_rel(vrel->oid, vrel->relation, *params, bstrategy))
if (!vacuum_rel(vrel->oid, vrel->relation, *params, bstrategy,
isTopLevel))
continue;
}
@ -2004,7 +2005,7 @@ vac_truncate_clog(TransactionId frozenXID,
*/
static bool
vacuum_rel(Oid relid, RangeVar *relation, VacuumParams params,
BufferAccessStrategy bstrategy)
BufferAccessStrategy bstrategy, bool isTopLevel)
{
LOCKMODE lmode;
Relation rel;
@ -2295,7 +2296,7 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams params,
/* VACUUM FULL is a variant of REPACK; see repack.c */
cluster_rel(REPACK_COMMAND_VACUUMFULL, rel, InvalidOid,
&cluster_params);
&cluster_params, isTopLevel);
/* cluster_rel closes the relation, but keeps lock */
rel = NULL;
@ -2338,7 +2339,8 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams params,
toast_vacuum_params.options |= VACOPT_PROCESS_MAIN;
toast_vacuum_params.toast_parent = relid;
vacuum_rel(toast_relid, NULL, toast_vacuum_params, bstrategy);
vacuum_rel(toast_relid, NULL, toast_vacuum_params, bstrategy,
isTopLevel);
}
/*

View file

@ -14,6 +14,7 @@
#include "postgres.h"
#include "access/parallel.h"
#include "commands/repack.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "libpq/pqmq.h"
@ -177,6 +178,10 @@ mq_putmessage(char msgtype, const char *s, size_t len)
SendProcSignal(pq_mq_parallel_leader_pid,
PROCSIG_PARALLEL_APPLY_MESSAGE,
pq_mq_parallel_leader_proc_number);
else if (AmRepackWorker())
SendProcSignal(pq_mq_parallel_leader_pid,
PROCSIG_REPACK_MESSAGE,
pq_mq_parallel_leader_proc_number);
else
{
Assert(IsParallelWorker());

View file

@ -219,5 +219,6 @@ pg_test_mod_args = pg_mod_args + {
subdir('jit/llvm')
subdir('replication/libpqwalreceiver')
subdir('replication/pgoutput')
subdir('replication/pgrepack')
subdir('snowball')
subdir('utils/mb/conversion_procs')

View file

@ -13,6 +13,7 @@
#include "postgres.h"
#include "access/parallel.h"
#include "commands/repack.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
@ -146,6 +147,10 @@ static const struct
.fn_name = "ParallelWorkerMain",
.fn_addr = ParallelWorkerMain
},
{
.fn_name = "RepackWorkerMain",
.fn_addr = RepackWorkerMain
},
{
.fn_name = "SequenceSyncWorkerMain",
.fn_addr = SequenceSyncWorkerMain

View file

@ -33,6 +33,7 @@
#include "access/xlogreader.h"
#include "access/xlogrecord.h"
#include "catalog/pg_control.h"
#include "commands/repack.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/message.h"
@ -436,7 +437,8 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
case XLOG_HEAP2_MULTI_INSERT:
if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
!ctx->fast_forward)
!ctx->fast_forward &&
!change_useless_for_repack(buf))
DecodeMultiInsert(ctx, buf);
break;
case XLOG_HEAP2_NEW_CID:
@ -498,7 +500,8 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
case XLOG_HEAP_INSERT:
if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
!ctx->fast_forward)
!ctx->fast_forward &&
!change_useless_for_repack(buf))
DecodeInsert(ctx, buf);
break;
@ -510,19 +513,22 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
case XLOG_HEAP_HOT_UPDATE:
case XLOG_HEAP_UPDATE:
if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
!ctx->fast_forward)
!ctx->fast_forward &&
!change_useless_for_repack(buf))
DecodeUpdate(ctx, buf);
break;
case XLOG_HEAP_DELETE:
if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
!ctx->fast_forward)
!ctx->fast_forward &&
!change_useless_for_repack(buf))
DecodeDelete(ctx, buf);
break;
case XLOG_HEAP_TRUNCATE:
if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
!ctx->fast_forward)
!ctx->fast_forward &&
!change_useless_for_repack(buf))
DecodeTruncate(ctx, buf);
break;
@ -538,7 +544,8 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
case XLOG_HEAP_CONFIRM:
if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
!ctx->fast_forward)
!ctx->fast_forward &&
!change_useless_for_repack(buf))
DecodeSpecConfirm(ctx, buf);
break;
@ -1035,6 +1042,15 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
xlrec = (xl_heap_delete *) XLogRecGetData(r);
/*
* Skip changes that were marked as ignorable at origin.
*
* (This is used for changes that affect relations not visible to other
* transactions, such as the transient table during concurrent repack.)
*/
if (xlrec->flags & XLH_DELETE_NO_LOGICAL)
return;
/* only interested in our database */
XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
if (target_locator.dbOid != ctx->slot->data.database)

View file

@ -0,0 +1,32 @@
#-------------------------------------------------------------------------
#
# Makefile--
# Makefile for src/backend/replication/pgrepack
#
# IDENTIFICATION
# src/backend/replication/pgrepack
#
#-------------------------------------------------------------------------
subdir = src/backend/replication/pgrepack
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = \
$(WIN32RES) \
pgrepack.o
PGFILEDESC = "pgrepack - logical replication output plugin for REPACK"
NAME = pgrepack
all: all-shared-lib
include $(top_srcdir)/src/Makefile.shlib
install: all installdirs install-lib
installdirs: installdirs-lib
uninstall: uninstall-lib
clean distclean: clean-lib
rm -f $(OBJS)

View file

@ -0,0 +1,18 @@
# Copyright (c) 2026, PostgreSQL Global Development Group
pgrepack_sources = files(
'pgrepack.c',
)
if host_system == 'windows'
pgrepack_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
'--NAME', 'pgrepack',
'--FILEDESC', 'pgrepack - logical replication output plugin for REPACK',])
endif
pgrepack = shared_module('pgrepack',
pgrepack_sources,
kwargs: pg_mod_args,
)
backend_targets += pgrepack

View file

@ -0,0 +1,287 @@
/*-------------------------------------------------------------------------
*
* pgrepack.c
* Logical Replication output plugin for REPACK command
*
* Copyright (c) 2026, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/replication/pgrepack/pgrepack.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/detoast.h"
#include "commands/repack_internal.h"
#include "replication/snapbuild.h"
#include "utils/memutils.h"
PG_MODULE_MAGIC;
static void repack_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *opt, bool is_init);
static void repack_shutdown(LogicalDecodingContext *ctx);
static void repack_begin_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void repack_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
static void repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation rel, ReorderBufferChange *change);
static void repack_store_change(LogicalDecodingContext *ctx, Relation relation,
ConcurrentChangeKind kind, HeapTuple tuple);
void
_PG_output_plugin_init(OutputPluginCallbacks *cb)
{
cb->startup_cb = repack_startup;
cb->begin_cb = repack_begin_txn;
cb->change_cb = repack_process_change;
cb->commit_cb = repack_commit_txn;
cb->shutdown_cb = repack_shutdown;
}
/* initialize this plugin */
static void
repack_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
bool is_init)
{
ctx->output_plugin_private = NULL;
/* Probably unnecessary, as we don't use the SQL interface ... */
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
if (ctx->output_plugin_options != NIL)
{
ereport(ERROR,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("this plugin does not expect any options"));
}
}
static void
repack_shutdown(LogicalDecodingContext *ctx)
{
}
/*
* As we don't release the slot during processing of particular table, there's
* no room for SQL interface, even for debugging purposes. Therefore we need
* neither OutputPluginPrepareWrite() nor OutputPluginWrite() in the plugin
* callbacks. (Although we might want to write custom callbacks, this API
* seems to be unnecessarily generic for our purposes.)
*/
/* BEGIN callback */
static void
repack_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
}
/* COMMIT callback */
static void
repack_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
}
/*
* Callback for individual changed tuples
*/
static void
repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
RepackDecodingState *private PG_USED_FOR_ASSERTS_ONLY =
(RepackDecodingState *) ctx->output_writer_private;
/* Changes of other relation should not have been decoded. */
Assert(RelationGetRelid(relation) == private->relid);
/* Decode entry depending on its type */
switch (change->action)
{
case REORDER_BUFFER_CHANGE_INSERT:
{
HeapTuple newtuple;
newtuple = change->data.tp.newtuple;
/*
* Identity checks in the main function should have made this
* impossible.
*/
if (newtuple == NULL)
elog(ERROR, "incomplete insert info");
repack_store_change(ctx, relation, CHANGE_INSERT, newtuple);
}
break;
case REORDER_BUFFER_CHANGE_UPDATE:
{
HeapTuple oldtuple,
newtuple;
oldtuple = change->data.tp.oldtuple;
newtuple = change->data.tp.newtuple;
if (newtuple == NULL)
elog(ERROR, "incomplete update info");
if (oldtuple != NULL)
repack_store_change(ctx, relation, CHANGE_UPDATE_OLD, oldtuple);
repack_store_change(ctx, relation, CHANGE_UPDATE_NEW, newtuple);
}
break;
case REORDER_BUFFER_CHANGE_DELETE:
{
HeapTuple oldtuple;
oldtuple = change->data.tp.oldtuple;
if (oldtuple == NULL)
elog(ERROR, "incomplete delete info");
repack_store_change(ctx, relation, CHANGE_DELETE, oldtuple);
}
break;
default:
/*
* Should not come here. This includes TRUNCATE of the table being
* processed. heap_decode() cannot check the file locator easily,
* but we assume that TRUNCATE uses AccessExclusiveLock on the
* table so it should not occur during REPACK (CONCURRENTLY).
*/
Assert(false);
break;
}
}
/*
* Write the given tuple, with the given change kind, to the repack spill
* file. Later, the repack decoding worker can read these and replay
* the operations on the new copy of the table.
*
* For each change affecting the table being repacked, we store enough
* information about each tuple in it, so that it can be replayed in the
* new copy of the table.
*/
static void
repack_store_change(LogicalDecodingContext *ctx, Relation relation,
ConcurrentChangeKind kind, HeapTuple tuple)
{
RepackDecodingState *dstate;
MemoryContext oldcxt;
BufFile *file;
List *attrs_ext = NIL;
int natt_ext;
dstate = (RepackDecodingState *) ctx->output_writer_private;
file = dstate->file;
/* Store the change kind. */
BufFileWrite(file, &kind, 1);
/* Use a frequently-reset context to avoid dealing with leaks manually */
oldcxt = MemoryContextSwitchTo(dstate->change_cxt);
/*
* If the tuple contains "external indirect" attributes, we need to write
* the contents to the file because we have no control over that memory.
*/
if (HeapTupleHasExternal(tuple))
{
TupleDesc desc = RelationGetDescr(relation);
TupleTableSlot *slot;
/* Initialize the slot, if not done already */
if (dstate->slot == NULL)
{
ResourceOwner saveResourceOwner;
MemoryContextSwitchTo(dstate->worker_cxt);
saveResourceOwner = CurrentResourceOwner;
CurrentResourceOwner = dstate->worker_resowner;
dstate->slot = MakeSingleTupleTableSlot(desc, &TTSOpsHeapTuple);
MemoryContextSwitchTo(dstate->change_cxt);
CurrentResourceOwner = saveResourceOwner;
}
slot = dstate->slot;
ExecStoreHeapTuple(tuple, slot, false);
/*
* Loop over all attributes, and find out which ones we need to spill
* separately, to wit: each one that's a non-null varlena and stored
* out of line.
*/
for (int i = 0; i < desc->natts; i++)
{
CompactAttribute *attr = TupleDescCompactAttr(desc, i);
varlena *varlen;
if (attr->attisdropped || attr->attlen != -1 ||
slot_attisnull(slot, i + 1))
continue;
slot_getsomeattrs(slot, i + 1);
/*
* This is a non-null varlena datum, but we only care if it's
* out-of-line
*/
varlen = (varlena *) DatumGetPointer(slot->tts_values[i]);
if (!VARATT_IS_EXTERNAL(varlen))
continue;
/*
* We spill any indirect-external attributes separately from the
* heap tuple. Anything else is written as is.
*/
if (VARATT_IS_EXTERNAL_INDIRECT(varlen))
attrs_ext = lappend(attrs_ext, varlen);
else
{
/*
* Logical decoding should not produce "external expanded"
* attributes (those actually should never appear on disk), so
* only TOASTed attribute can be seen here.
*
* We get here if the table has external values but only
* in-line values are being updated now.
*/
Assert(VARATT_IS_EXTERNAL_ONDISK(varlen));
}
}
ExecClearTuple(slot);
}
/*
* First, write the original heap tuple, prefixed by its length. Note
* that the external-toast tag for each toasted attribute will be present
* in what we write, so that we know where to restore each one later.
*/
BufFileWrite(file, &tuple->t_len, sizeof(tuple->t_len));
BufFileWrite(file, tuple->t_data, tuple->t_len);
/* Then, write the number of external attributes we found. */
natt_ext = list_length(attrs_ext);
BufFileWrite(file, &natt_ext, sizeof(natt_ext));
/* Finally, the attributes themselves, if any */
foreach_ptr(varlena, attr_val, attrs_ext)
{
attr_val = detoast_external_attr(attr_val);
BufFileWrite(file, attr_val, VARSIZE_ANY(attr_val));
/* These attributes could be large, so free them right away */
pfree(attr_val);
}
/* Cleanup. */
MemoryContextSwitchTo(oldcxt);
MemoryContextReset(dstate->change_cxt);
}

View file

@ -707,6 +707,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
if (CheckProcSignal(PROCSIG_PARALLEL_APPLY_MESSAGE))
HandleParallelApplyMessageInterrupt();
if (CheckProcSignal(PROCSIG_REPACK_MESSAGE))
HandleRepackMessageInterrupt();
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT))
HandleRecoveryConflictInterrupt();

View file

@ -3578,6 +3578,9 @@ ProcessInterrupts(void)
if (ParallelApplyMessagePending)
ProcessParallelApplyMessages();
if (RepackMessagePending)
ProcessRepackMessages();
}
/*

View file

@ -156,6 +156,7 @@ RECOVERY_CONFLICT_SNAPSHOT "Waiting for recovery conflict resolution for a vacuu
RECOVERY_CONFLICT_TABLESPACE "Waiting for recovery conflict resolution for dropping a tablespace."
RECOVERY_END_COMMAND "Waiting for <xref linkend="guc-recovery-end-command"/> to complete."
RECOVERY_PAUSE "Waiting for recovery to be resumed."
REPACK_WORKER_EXPORT "Waiting for decoding worker to export a new output file."
REPLICATION_ORIGIN_DROP "Waiting for a replication origin to become inactive so it can be dropped."
REPLICATION_SLOT_DROP "Waiting for a replication slot to become inactive so it can be dropped."
RESTORE_COMMAND "Waiting for <xref linkend="guc-restore-command"/> to complete."

View file

@ -5236,8 +5236,8 @@ match_previous_words(int pattern_id,
* one word, so the above test is correct.
*/
if (ends_with(prev_wd, '(') || ends_with(prev_wd, ','))
COMPLETE_WITH("ANALYZE", "VERBOSE");
else if (TailMatches("ANALYZE", "VERBOSE"))
COMPLETE_WITH("ANALYZE", "CONCURRENTLY", "VERBOSE");
else if (TailMatches("ANALYZE", "CONCURRENTLY", "VERBOSE"))
COMPLETE_WITH("ON", "OFF");
}

View file

@ -104,6 +104,8 @@
#define XLH_DELETE_CONTAINS_OLD_KEY (1<<2)
#define XLH_DELETE_IS_SUPER (1<<3)
#define XLH_DELETE_IS_PARTITION_MOVE (1<<4)
/* See heap_delete() */
#define XLH_DELETE_NO_LOGICAL (1<<5)
/* convenience macro for checking whether any form of old tuple was logged */
#define XLH_DELETE_CONTAINS_OLD \

View file

@ -284,9 +284,10 @@ typedef struct TM_IndexDeleteOp
/* "options" flag bits for table_tuple_delete */
#define TABLE_DELETE_CHANGING_PARTITION (1 << 0)
#define TABLE_DELETE_NO_LOGICAL (1 << 1)
/* "options" flag bits for table_tuple_update */
/* XXX none at present */
#define TABLE_UPDATE_NO_LOGICAL (1 << 0)
/* flag bits for table_tuple_lock */
/* Follow tuples whose update is in progress if lock modes don't conflict */
@ -662,6 +663,7 @@ typedef struct TableAmRoutine
Relation OldIndex,
bool use_sort,
TransactionId OldestXmin,
Snapshot snapshot,
TransactionId *xid_cutoff,
MultiXactId *multi_cutoff,
double *num_tuples,
@ -1563,7 +1565,12 @@ table_tuple_delete(Relation rel, ItemPointer tid, CommandId cid,
* cmax/cmin if successful)
* options - bitmask of options. No values are currently recognized.
* crosscheck - if not InvalidSnapshot, also check old tuple against this
* wait - true if should wait for any conflicting update to commit/abort
* options - These allow the caller to specify options that may change the
* behavior of the AM. The AM will ignore options that it does not support.
* TABLE_UPDATE_WAIT -- set if should wait for any conflicting update to
* commit/abort
* TABLE_UPDATE_NO_LOGICAL -- force-disables the emitting of logical
* decoding information for the tuple.
*
* Output parameters:
* slot - newly constructed tuple data to store
@ -1725,6 +1732,8 @@ table_relation_copy_data(Relation rel, const RelFileLocator *newrlocator)
* not needed for the relation's AM
* - *xid_cutoff - ditto
* - *multi_cutoff - ditto
* - snapshot - if != NULL, ignore data changes done by transactions that this
* (MVCC) snapshot considers still in-progress or in the future.
*
* Output parameters:
* - *xid_cutoff - rel's new relfrozenxid value, may be invalid
@ -1737,6 +1746,7 @@ table_relation_copy_for_cluster(Relation OldTable, Relation NewTable,
Relation OldIndex,
bool use_sort,
TransactionId OldestXmin,
Snapshot snapshot,
TransactionId *xid_cutoff,
MultiXactId *multi_cutoff,
double *num_tuples,
@ -1745,6 +1755,7 @@ table_relation_copy_for_cluster(Relation OldTable, Relation NewTable,
{
OldTable->rd_tableam->relation_copy_for_cluster(OldTable, NewTable, OldIndex,
use_sort, OldestXmin,
snapshot,
xid_cutoff, multi_cutoff,
num_tuples, tups_vacuumed,
tups_recently_dead);

View file

@ -86,10 +86,12 @@
#define PROGRESS_REPACK_PHASE 1
#define PROGRESS_REPACK_INDEX_RELID 2
#define PROGRESS_REPACK_HEAP_TUPLES_SCANNED 3
#define PROGRESS_REPACK_HEAP_TUPLES_WRITTEN 4
#define PROGRESS_REPACK_TOTAL_HEAP_BLKS 5
#define PROGRESS_REPACK_HEAP_BLKS_SCANNED 6
#define PROGRESS_REPACK_INDEX_REBUILD_COUNT 7
#define PROGRESS_REPACK_HEAP_TUPLES_INSERTED 4
#define PROGRESS_REPACK_HEAP_TUPLES_UPDATED 5
#define PROGRESS_REPACK_HEAP_TUPLES_DELETED 6
#define PROGRESS_REPACK_TOTAL_HEAP_BLKS 7
#define PROGRESS_REPACK_HEAP_BLKS_SCANNED 8
#define PROGRESS_REPACK_INDEX_REBUILD_COUNT 9
/*
* Phases of repack (as advertised via PROGRESS_REPACK_PHASE).
@ -98,9 +100,10 @@
#define PROGRESS_REPACK_PHASE_INDEX_SCAN_HEAP 2
#define PROGRESS_REPACK_PHASE_SORT_TUPLES 3
#define PROGRESS_REPACK_PHASE_WRITE_NEW_HEAP 4
#define PROGRESS_REPACK_PHASE_SWAP_REL_FILES 5
#define PROGRESS_REPACK_PHASE_REBUILD_INDEX 6
#define PROGRESS_REPACK_PHASE_FINAL_CLEANUP 7
#define PROGRESS_REPACK_PHASE_CATCH_UP 5
#define PROGRESS_REPACK_PHASE_SWAP_REL_FILES 6
#define PROGRESS_REPACK_PHASE_REBUILD_INDEX 7
#define PROGRESS_REPACK_PHASE_FINAL_CLEANUP 8
/* Progress parameters for CREATE INDEX */
/* 3, 4 and 5 reserved for "waitfor" metrics */

View file

@ -13,6 +13,8 @@
#ifndef REPACK_H
#define REPACK_H
#include <signal.h>
#include "nodes/parsenodes.h"
#include "parser/parse_node.h"
#include "storage/lockdefs.h"
@ -25,6 +27,7 @@
#define CLUOPT_RECHECK_ISCLUSTERED 0x04 /* recheck relation state for
* indisclustered */
#define CLUOPT_ANALYZE 0x08 /* do an ANALYZE */
#define CLUOPT_CONCURRENT 0x10 /* allow concurrent data changes */
/* options for CLUSTER */
typedef struct ClusterParams
@ -32,11 +35,13 @@ typedef struct ClusterParams
uint32 options; /* bitmask of CLUOPT_* */
} ClusterParams;
extern PGDLLIMPORT volatile sig_atomic_t RepackMessagePending;
extern void ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel);
extern void cluster_rel(RepackCommand command, Relation OldHeap, Oid indexOid,
ClusterParams *params);
ClusterParams *params, bool isTopLevel);
extern void check_index_is_clusterable(Relation OldHeap, Oid indexOid,
LOCKMODE lockmode);
extern void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal);
@ -48,8 +53,16 @@ extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
bool swap_toast_by_content,
bool check_constraints,
bool is_internal,
bool reindex,
TransactionId frozenXid,
MultiXactId cutoffMulti,
char newrelpersistence);
extern void HandleRepackMessageInterrupt(void);
extern void ProcessRepackMessages(void);
/* in repack_worker.c */
extern void RepackWorkerMain(Datum main_arg);
extern bool AmRepackWorker(void);
#endif /* REPACK_H */

View file

@ -0,0 +1,125 @@
/*-------------------------------------------------------------------------
*
* repack_internal.h
* header for REPACK internals
*
* Copyright (c) 2026, PostgreSQL Global Development Group
*
* src/include/commands/repack_internal.h
*
*-------------------------------------------------------------------------
*/
#ifndef REPACK_INTERNAL_H
#define REPACK_INTERNAL_H
#include "nodes/execnodes.h"
#include "replication/decode.h"
#include "postmaster/bgworker.h"
#include "replication/logical.h"
#include "storage/buffile.h"
#include "storage/sharedfileset.h"
#include "storage/shm_mq.h"
#include "utils/resowner.h"
/*
* The type of a change stored in the output files.
*/
typedef char ConcurrentChangeKind;
#define CHANGE_INSERT 'i'
#define CHANGE_UPDATE_OLD 'u'
#define CHANGE_UPDATE_NEW 'U'
#define CHANGE_DELETE 'd'
/*
* Logical decoding state.
*
* The output plugin uses it to store the data changes that it decodes from
* WAL while the table contents is being copied to a new storage.
*/
typedef struct RepackDecodingState
{
#ifdef USE_ASSERT_CHECKING
/* The relation whose changes we're decoding. */
Oid relid;
#endif
/* Per-change memory context. */
MemoryContext change_cxt;
/* A tuple slot used to pass tuples back and forth */
TupleTableSlot *slot;
/*
* Memory context and resource owner of the decoding worker's transaction.
*/
MemoryContext worker_cxt;
ResourceOwner worker_resowner;
/* The current output file. */
BufFile *file;
} RepackDecodingState;
/*
* Shared memory used for communication between the backend running REPACK and
* the worker that performs logical decoding of data changes.
*/
typedef struct DecodingWorkerShared
{
/* Is the decoding initialized? */
bool initialized;
/*
* Once the worker has reached this LSN, it should close the current
* output file and either create a new one or exit, according to the field
* 'done'. If the value is InvalidXLogRecPtr, the worker should decode all
* the WAL available and keep checking this field. It is ok if the worker
* had already decoded records whose LSN is >= lsn_upto before this field
* has been set.
*/
XLogRecPtr lsn_upto;
/* Exit after closing the current file? */
bool done;
/* The output is stored here. */
SharedFileSet sfs;
/* Number of the last file exported by the worker. */
int last_exported;
/* Synchronize access to the fields above. */
slock_t mutex;
/* Database to connect to. */
Oid dbid;
/* Role to connect as. */
Oid roleid;
/* Relation from which data changes to decode. */
Oid relid;
/* CV the backend waits on */
ConditionVariable cv;
/* Info to signal the backend. */
PGPROC *backend_proc;
pid_t backend_pid;
ProcNumber backend_proc_number;
dsm_segment *dsm_seg;
/*
* Memory the queue is located in.
*
* For considerations on the value see the comments of
* PARALLEL_ERROR_QUEUE_SIZE.
*/
#define REPACK_ERROR_QUEUE_SIZE 16384
char error_queue[FLEXIBLE_ARRAY_MEMBER];
} DecodingWorkerShared;
extern void DecodingWorkerFileName(char *fname, Oid relid, uint32 seq);
#endif /* REPACK_INTERNAL_H */

View file

@ -32,4 +32,8 @@ extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf
extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
XLogReaderState *record);
/* in commands/repack_worker.c */
extern bool change_useless_for_repack(XLogRecordBuffer *buf);
#endif

View file

@ -36,8 +36,8 @@ typedef int LOCKMODE;
#define AccessShareLock 1 /* SELECT */
#define RowShareLock 2 /* SELECT FOR UPDATE/FOR SHARE */
#define RowExclusiveLock 3 /* INSERT, UPDATE, DELETE */
#define ShareUpdateExclusiveLock 4 /* VACUUM (non-FULL), ANALYZE, CREATE
* INDEX CONCURRENTLY */
#define ShareUpdateExclusiveLock 4 /* VACUUM (non-exclusive), ANALYZE, CREATE
* INDEX CONCURRENTLY, REPACK CONCURRENTLY */
#define ShareLock 5 /* CREATE INDEX (WITHOUT CONCURRENTLY) */
#define ShareRowExclusiveLock 6 /* like EXCLUSIVE MODE, but allows ROW
* SHARE */

View file

@ -36,6 +36,7 @@ typedef enum
PROCSIG_BARRIER, /* global barrier interrupt */
PROCSIG_LOG_MEMORY_CONTEXT, /* ask backend to log the memory contexts */
PROCSIG_PARALLEL_APPLY_MESSAGE, /* Message from parallel apply workers */
PROCSIG_REPACK_MESSAGE, /* Message from repack worker */
PROCSIG_RECOVERY_CONFLICT, /* backend is blocking recovery, check
* PGPROC->pendingRecoveryConflicts for the
* reason */

View file

@ -16,6 +16,7 @@ ifneq (,$(findstring backend,$(subdir)))
ifeq (,$(findstring conversion_procs,$(subdir)))
ifeq (,$(findstring libpqwalreceiver,$(subdir)))
ifeq (,$(findstring replication/pgoutput,$(subdir)))
ifeq (,$(findstring replication/pgrepack,$(subdir)))
ifeq (,$(findstring snowball,$(subdir)))
override CPPFLAGS+= -DBUILDING_DLL
endif
@ -23,6 +24,7 @@ endif
endif
endif
endif
endif
ifneq (,$(findstring src/common,$(subdir)))
override CPPFLAGS+= -DBUILDING_DLL

View file

@ -14,6 +14,7 @@ ifneq (,$(findstring backend,$(subdir)))
ifeq (,$(findstring conversion_procs,$(subdir)))
ifeq (,$(findstring libpqwalreceiver,$(subdir)))
ifeq (,$(findstring replication/pgoutput,$(subdir)))
ifeq (,$(findstring replication/pgrepack,$(subdir)))
ifeq (,$(findstring snowball,$(subdir)))
override CPPFLAGS+= -DBUILDING_DLL
endif
@ -21,6 +22,7 @@ endif
endif
endif
endif
endif
ifneq (,$(findstring src/common,$(subdir)))
override CPPFLAGS+= -DBUILDING_DLL

View file

@ -14,6 +14,8 @@ REGRESS_OPTS = --dlpath=$(top_builddir)/src/test/regress
ISOLATION = basic \
inplace \
repack \
repack_toast \
syscache-update-pruned \
heap_lock_update

View file

@ -0,0 +1,113 @@
Parsed test spec with 2 sessions
starting permutation: wait_before_lock change_existing change_new change_subxact1 change_subxact2 check2 wakeup_before_lock check1
injection_points_attach
-----------------------
(1 row)
step wait_before_lock:
REPACK (CONCURRENTLY) repack_test USING INDEX repack_test_pkey;
<waiting ...>
step change_existing:
UPDATE repack_test SET i=10 where i=1;
UPDATE repack_test SET j=20 where i=2;
UPDATE repack_test SET i=30 where i=3;
UPDATE repack_test SET i=40 where i=30;
DELETE FROM repack_test WHERE i=4;
step change_new:
INSERT INTO repack_test(i, j) VALUES (5, 5), (6, 6), (7, 7), (8, 8);
UPDATE repack_test SET i=50 where i=5;
UPDATE repack_test SET j=60 where i=6;
DELETE FROM repack_test WHERE i=7;
step change_subxact1:
BEGIN;
INSERT INTO repack_test(i, j) VALUES (100, 100);
SAVEPOINT s1;
UPDATE repack_test SET i=101 where i=100;
SAVEPOINT s2;
UPDATE repack_test SET i=102 where i=101;
COMMIT;
step change_subxact2:
BEGIN;
SAVEPOINT s1;
INSERT INTO repack_test(i, j) VALUES (110, 110);
ROLLBACK TO SAVEPOINT s1;
INSERT INTO repack_test(i, j) VALUES (110, 111);
COMMIT;
step check2:
INSERT INTO relfilenodes(node)
SELECT relfilenode FROM pg_class WHERE relname='repack_test';
SELECT i, j FROM repack_test ORDER BY i, j;
INSERT INTO data_s2(i, j)
SELECT i, j FROM repack_test;
i| j
---+---
2| 20
6| 60
8| 8
10| 1
40| 3
50| 5
102|100
110|111
(8 rows)
step wakeup_before_lock:
SELECT injection_points_wakeup('repack-concurrently-before-lock');
injection_points_wakeup
-----------------------
(1 row)
step wait_before_lock: <... completed>
step check1:
INSERT INTO relfilenodes(node)
SELECT relfilenode FROM pg_class WHERE relname='repack_test';
SELECT count(DISTINCT node) FROM relfilenodes;
SELECT i, j FROM repack_test ORDER BY i, j;
INSERT INTO data_s1(i, j)
SELECT i, j FROM repack_test;
SELECT count(*)
FROM data_s1 d1 FULL JOIN data_s2 d2 USING (i, j)
WHERE d1.i ISNULL OR d2.i ISNULL;
count
-----
2
(1 row)
i| j
---+---
2| 20
6| 60
8| 8
10| 1
40| 3
50| 5
102|100
110|111
(8 rows)
count
-----
0
(1 row)
injection_points_detach
-----------------------
(1 row)

View file

@ -0,0 +1,65 @@
Parsed test spec with 2 sessions
starting permutation: wait_before_lock change check2 wakeup_before_lock check1
injection_points_attach
-----------------------
(1 row)
step wait_before_lock:
REPACK (CONCURRENTLY) repack_test;
<waiting ...>
step change:
UPDATE repack_test SET j=get_long_string() where i=2;
DELETE FROM repack_test WHERE i=3;
INSERT INTO repack_test(i, j) VALUES (4, get_long_string());
UPDATE repack_test SET i=3 where i=1;
step check2:
INSERT INTO relfilenodes(node)
SELECT c2.relfilenode
FROM pg_class c1 JOIN pg_class c2 ON c2.oid = c1.oid OR c2.oid = c1.reltoastrelid
WHERE c1.relname='repack_test';
INSERT INTO data_s2(i, j)
SELECT i, j FROM repack_test;
step wakeup_before_lock:
SELECT injection_points_wakeup('repack-concurrently-before-lock');
injection_points_wakeup
-----------------------
(1 row)
step wait_before_lock: <... completed>
step check1:
INSERT INTO relfilenodes(node)
SELECT c2.relfilenode
FROM pg_class c1 JOIN pg_class c2 ON c2.oid = c1.oid OR c2.oid = c1.reltoastrelid
WHERE c1.relname='repack_test';
SELECT count(DISTINCT node) FROM relfilenodes;
INSERT INTO data_s1(i, j)
SELECT i, j FROM repack_test;
SELECT count(*)
FROM data_s1 d1 FULL JOIN data_s2 d2 USING (i, j)
WHERE d1.i ISNULL OR d2.i ISNULL;
count
-----
4
(1 row)
count
-----
0
(1 row)
injection_points_detach
-----------------------
(1 row)

View file

@ -45,6 +45,8 @@ tests += {
'specs': [
'basic',
'inplace',
'repack',
'repack_toast',
'syscache-update-pruned',
'heap_lock_update',
],

View file

@ -0,0 +1,142 @@
# REPACK (CONCURRENTLY) ... USING INDEX ...;
setup
{
CREATE EXTENSION injection_points;
CREATE TABLE repack_test(i int PRIMARY KEY, j int);
INSERT INTO repack_test(i, j) VALUES (1, 1), (2, 2), (3, 3), (4, 4);
CREATE TABLE relfilenodes(node oid);
CREATE TABLE data_s1(i int, j int);
CREATE TABLE data_s2(i int, j int);
}
teardown
{
DROP TABLE repack_test;
DROP EXTENSION injection_points;
DROP TABLE relfilenodes;
DROP TABLE data_s1;
DROP TABLE data_s2;
}
session s1
setup
{
SELECT injection_points_set_local();
SELECT injection_points_attach('repack-concurrently-before-lock', 'wait');
}
# Perform the initial load and wait for s2 to do some data changes.
step wait_before_lock
{
REPACK (CONCURRENTLY) repack_test USING INDEX repack_test_pkey;
}
# Check the table from the perspective of s1.
#
# Besides the contents, we also check that relfilenode has changed.
# Have each session write the contents into a table and use FULL JOIN to check
# if the outputs are identical.
step check1
{
INSERT INTO relfilenodes(node)
SELECT relfilenode FROM pg_class WHERE relname='repack_test';
SELECT count(DISTINCT node) FROM relfilenodes;
SELECT i, j FROM repack_test ORDER BY i, j;
INSERT INTO data_s1(i, j)
SELECT i, j FROM repack_test;
SELECT count(*)
FROM data_s1 d1 FULL JOIN data_s2 d2 USING (i, j)
WHERE d1.i ISNULL OR d2.i ISNULL;
}
teardown
{
SELECT injection_points_detach('repack-concurrently-before-lock');
}
session s2
# Change the existing data. UPDATE changes both key and non-key columns. Also
# update one row twice to test whether tuple version generated by this session
# can be found.
step change_existing
{
UPDATE repack_test SET i=10 where i=1;
UPDATE repack_test SET j=20 where i=2;
UPDATE repack_test SET i=30 where i=3;
UPDATE repack_test SET i=40 where i=30;
DELETE FROM repack_test WHERE i=4;
}
# Insert new rows and UPDATE / DELETE some of them. Again, update both key and
# non-key column.
step change_new
{
INSERT INTO repack_test(i, j) VALUES (5, 5), (6, 6), (7, 7), (8, 8);
UPDATE repack_test SET i=50 where i=5;
UPDATE repack_test SET j=60 where i=6;
DELETE FROM repack_test WHERE i=7;
}
# When applying concurrent data changes, we should see the effects of an
# in-progress subtransaction.
#
# XXX Not sure this test is useful now - it was designed for the patch that
# preserves tuple visibility and which therefore modifies
# TransactionIdIsCurrentTransactionId().
step change_subxact1
{
BEGIN;
INSERT INTO repack_test(i, j) VALUES (100, 100);
SAVEPOINT s1;
UPDATE repack_test SET i=101 where i=100;
SAVEPOINT s2;
UPDATE repack_test SET i=102 where i=101;
COMMIT;
}
# When applying concurrent data changes, we should not see the effects of a
# rolled back subtransaction.
#
# XXX Is this test useful? See above.
step change_subxact2
{
BEGIN;
SAVEPOINT s1;
INSERT INTO repack_test(i, j) VALUES (110, 110);
ROLLBACK TO SAVEPOINT s1;
INSERT INTO repack_test(i, j) VALUES (110, 111);
COMMIT;
}
# Check the table from the perspective of s2.
step check2
{
INSERT INTO relfilenodes(node)
SELECT relfilenode FROM pg_class WHERE relname='repack_test';
SELECT i, j FROM repack_test ORDER BY i, j;
INSERT INTO data_s2(i, j)
SELECT i, j FROM repack_test;
}
step wakeup_before_lock
{
SELECT injection_points_wakeup('repack-concurrently-before-lock');
}
# Test if data changes introduced while one session is performing REPACK
# CONCURRENTLY find their way into the table.
permutation
wait_before_lock
change_existing
change_new
change_subxact1
change_subxact2
check2
wakeup_before_lock
check1

View file

@ -0,0 +1,112 @@
# REPACK (CONCURRENTLY);
#
# Test handling of TOAST. At the same time, no tuplesort.
setup
{
CREATE EXTENSION injection_points;
-- Return a string that needs to be TOASTed.
CREATE FUNCTION get_long_string()
RETURNS text
LANGUAGE sql as $$
SELECT string_agg(chr(65 + trunc(25 * random())::int), '')
FROM generate_series(1, 2048) s(x);
$$;
CREATE TABLE repack_test(i int PRIMARY KEY, j text);
INSERT INTO repack_test(i, j) VALUES (1, get_long_string()),
(2, get_long_string()), (3, get_long_string());
CREATE TABLE relfilenodes(node oid);
CREATE TABLE data_s1(i int, j text);
CREATE TABLE data_s2(i int, j text);
}
teardown
{
DROP TABLE repack_test;
DROP EXTENSION injection_points;
DROP FUNCTION get_long_string();
DROP TABLE relfilenodes;
DROP TABLE data_s1;
DROP TABLE data_s2;
}
session s1
setup
{
SELECT injection_points_set_local();
SELECT injection_points_attach('repack-concurrently-before-lock', 'wait');
}
# Perform the initial load and wait for s2 to do some data changes.
step wait_before_lock
{
REPACK (CONCURRENTLY) repack_test;
}
# Check the table from the perspective of s1.
#
# Besides the contents, we also check that relfilenode has changed.
# Have each session write the contents into a table and use FULL JOIN to check
# if the outputs are identical.
step check1
{
INSERT INTO relfilenodes(node)
SELECT c2.relfilenode
FROM pg_class c1 JOIN pg_class c2 ON c2.oid = c1.oid OR c2.oid = c1.reltoastrelid
WHERE c1.relname='repack_test';
SELECT count(DISTINCT node) FROM relfilenodes;
INSERT INTO data_s1(i, j)
SELECT i, j FROM repack_test;
SELECT count(*)
FROM data_s1 d1 FULL JOIN data_s2 d2 USING (i, j)
WHERE d1.i ISNULL OR d2.i ISNULL;
}
teardown
{
SELECT injection_points_detach('repack-concurrently-before-lock');
}
session s2
step change
# Separately test UPDATE of both plain ("i") and TOASTed ("j") attribute. In
# the first case, the new tuple we get from reorderbuffer.c contains "j" as a
# TOAST pointer, which we need to update so it points to the new heap. In the
# latter case, we receive "j" as "external indirect" value - here we test that
# the decoding worker writes the tuple to a file correctly and that the
# backend executing REPACK manages to restore it.
{
UPDATE repack_test SET j=get_long_string() where i=2;
DELETE FROM repack_test WHERE i=3;
INSERT INTO repack_test(i, j) VALUES (4, get_long_string());
UPDATE repack_test SET i=3 where i=1;
}
# Check the table from the perspective of s2.
step check2
{
INSERT INTO relfilenodes(node)
SELECT c2.relfilenode
FROM pg_class c1 JOIN pg_class c2 ON c2.oid = c1.oid OR c2.oid = c1.reltoastrelid
WHERE c1.relname='repack_test';
INSERT INTO data_s2(i, j)
SELECT i, j FROM repack_test;
}
step wakeup_before_lock
{
SELECT injection_points_wakeup('repack-concurrently-before-lock');
}
# Test if data changes introduced while one session is performing REPACK
# CONCURRENTLY find their way into the table.
permutation
wait_before_lock
change
check2
wakeup_before_lock
check1

View file

@ -537,6 +537,10 @@ SELECT relname, old.level, old.relkind, old.relfilenode = new.relfilenode FROM o
clstrpart33 | 2 | r | f
(7 rows)
-- CONCURRENTLY doesn't like partitioned tables
REPACK (CONCURRENTLY) clstrpart;
ERROR: REPACK (CONCURRENTLY) is not supported for partitioned tables
HINT: Consider running the command on individual partitions.
DROP TABLE clstrpart;
-- Ownership of partitions is checked
CREATE TABLE ptnowner(i int unique) PARTITION BY LIST (i);
@ -802,6 +806,10 @@ ORDER BY o.relname;
clstr_3
(2 rows)
-- concurrently
REPACK (CONCURRENTLY) pg_class;
ERROR: cannot repack relation "pg_class"
HINT: REPACK CONCURRENTLY is not supported for catalog relations.
-- clean up
DROP TABLE clustertest;
DROP TABLE clstr_1;

View file

@ -2021,7 +2021,7 @@ pg_stat_progress_cluster| SELECT pid,
phase,
repack_index_relid AS cluster_index_relid,
heap_tuples_scanned,
heap_tuples_written,
(heap_tuples_inserted + heap_tuples_updated) AS heap_tuples_written,
heap_blks_total,
heap_blks_scanned,
index_rebuild_count
@ -2136,17 +2136,20 @@ pg_stat_progress_repack| SELECT s.pid,
WHEN 2 THEN 'index scanning heap'::text
WHEN 3 THEN 'sorting tuples'::text
WHEN 4 THEN 'writing new heap'::text
WHEN 5 THEN 'swapping relation files'::text
WHEN 6 THEN 'rebuilding index'::text
WHEN 7 THEN 'performing final cleanup'::text
WHEN 5 THEN 'catch-up'::text
WHEN 6 THEN 'swapping relation files'::text
WHEN 7 THEN 'rebuilding index'::text
WHEN 8 THEN 'performing final cleanup'::text
ELSE NULL::text
END AS phase,
(s.param3)::oid AS repack_index_relid,
s.param4 AS heap_tuples_scanned,
s.param5 AS heap_tuples_written,
s.param6 AS heap_blks_total,
s.param7 AS heap_blks_scanned,
s.param8 AS index_rebuild_count
s.param5 AS heap_tuples_inserted,
s.param6 AS heap_tuples_updated,
s.param7 AS heap_tuples_deleted,
s.param8 AS heap_blks_total,
s.param9 AS heap_blks_scanned,
s.param10 AS index_rebuild_count
FROM (pg_stat_get_progress_info('REPACK'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
LEFT JOIN pg_database d ON ((s.datid = d.oid)));
pg_stat_progress_vacuum| SELECT s.pid,

View file

@ -248,6 +248,9 @@ REPACK clstrpart;
CREATE TEMP TABLE new_cluster_info AS SELECT relname, level, relfilenode, relkind FROM pg_partition_tree('clstrpart'::regclass) AS tree JOIN pg_class c ON c.oid=tree.relid ;
SELECT relname, old.level, old.relkind, old.relfilenode = new.relfilenode FROM old_cluster_info AS old JOIN new_cluster_info AS new USING (relname) ORDER BY relname COLLATE "C";
-- CONCURRENTLY doesn't like partitioned tables
REPACK (CONCURRENTLY) clstrpart;
DROP TABLE clstrpart;
-- Ownership of partitions is checked
@ -383,6 +386,9 @@ JOIN relnodes_new n ON o.relname = n.relname
WHERE o.relfilenode <> n.relfilenode
ORDER BY o.relname;
-- concurrently
REPACK (CONCURRENTLY) pg_class;
-- clean up
DROP TABLE clustertest;
DROP TABLE clstr_1;

View file

@ -430,6 +430,7 @@ CatCacheHeader
CatalogId
CatalogIdMapEntry
CatalogIndexState
ChangeContext
ChangeVarNodes_callback
ChangeVarNodes_context
ChannelName
@ -509,6 +510,7 @@ CompressFileHandle
CompressionLocation
CompressorState
ComputeXidHorizonsResult
ConcurrentChangeKind
ConditionVariable
ConditionVariableMinimallyPadded
ConditionalStack
@ -655,6 +657,8 @@ DeclareCursorStmt
DecodedBkpBlock
DecodedXLogRecord
DecodingOutputState
DecodingWorker
DecodingWorkerShared
DefElem
DefElemAction
DefaultACLInfo
@ -1318,6 +1322,7 @@ IndexElem
IndexFetchHeapData
IndexFetchTableData
IndexInfo
IndexInsertState
IndexList
IndexOnlyScan
IndexOnlyScanState
@ -2635,6 +2640,7 @@ ReorderBufferTupleCidKey
ReorderBufferUpdateProgressTxnCB
ReorderTuple
RepackCommand
RepackDecodingState
RepackStmt
ReparameterizeForeignPathByChild_function
ReplOriginId