mirror of
https://github.com/postgres/postgres.git
synced 2026-04-10 11:37:33 -04:00
Add CONCURRENTLY option to REPACK
When this flag is specified, REPACK no longer acquires access-exclusive lock while the new copy of the table is being created; instead, it creates the initial copy under share-update-exclusive lock only (same as vacuum, etc), and it follows an MVCC snapshot; it sets up a replication slot starting at that snapshot, and uses a concurrent background worker to do logical decoding starting at the snapshot to populate a stash of concurrent data changes. Those changes can then be re-applied to the new copy of the table just before swapping the relfilenodes. Applications can continue to access the original copy of the table normally until just before the swap, which is the only point at which the access-exclusive lock is needed. There are some loose ends in this commit: 1. concurrent repack needs its own replication slot in order to apply logical decoding, which are a scarce resource and easy to run out of. 2. due to the way the historic snapshot is initially set up, only one REPACK process can be running at any one time on the whole system. 3. there's a danger of deadlocking (and thus abort) due to the lock upgrade required at the final phase. These issues will be addressed in upcoming commits. The design and most of the code are by Antonin Houska, heavily based on his own pg_squeeze third-party implementation. Author: Antonin Houska <ah@cybertec.at> Co-authored-by: Mihail Nikalayeu <mihailnikalayeu@gmail.com> Co-authored-by: Álvaro Herrera <alvherre@kurilemu.de> Reviewed-by: Matthias van de Meent <boekewurm+postgres@gmail.com> Reviewed-by: Srinath Reddy Sadipiralla <srinath2133@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-by: Jim Jones <jim.jones@uni-muenster.de> Reviewed-by: Robert Treat <rob@xzilla.net> Reviewed-by: Noriyoshi Shinoda <noriyoshi.shinoda@hpe.com> Reviewed-by: vignesh C <vignesh21@gmail.com> Discussion: https://postgr.es/m/5186.1706694913@antos Discussion: https://postgr.es/m/202507262156.sb455angijk6@alvherre.pgsql
This commit is contained in:
parent
10484c2cc7
commit
28d534e2ae
46 changed files with 3689 additions and 267 deletions
|
|
@ -6990,14 +6990,35 @@ FROM pg_stat_get_backend_idset() AS backendid;
|
|||
|
||||
<row>
|
||||
<entry role="catalog_table_entry"><para role="column_definition">
|
||||
<structfield>heap_tuples_written</structfield> <type>bigint</type>
|
||||
<structfield>heap_tuples_inserted</structfield> <type>bigint</type>
|
||||
</para>
|
||||
<para>
|
||||
Number of heap tuples written.
|
||||
Number of heap tuples inserted.
|
||||
This counter only advances when the phase is
|
||||
<literal>seq scanning heap</literal>,
|
||||
<literal>index scanning heap</literal>
|
||||
or <literal>writing new heap</literal>.
|
||||
<literal>index scanning heap</literal>,
|
||||
<literal>writing new heap</literal>
|
||||
or <literal>catch-up</literal>.
|
||||
</para></entry>
|
||||
</row>
|
||||
|
||||
<row>
|
||||
<entry role="catalog_table_entry"><para role="column_definition">
|
||||
<structfield>heap_tuples_updated</structfield> <type>bigint</type>
|
||||
</para>
|
||||
<para>
|
||||
Number of heap tuples updated.
|
||||
This counter only advances when the phase is <literal>catch-up</literal>.
|
||||
</para></entry>
|
||||
</row>
|
||||
|
||||
<row>
|
||||
<entry role="catalog_table_entry"><para role="column_definition">
|
||||
<structfield>heap_tuples_deleted</structfield> <type>bigint</type>
|
||||
</para>
|
||||
<para>
|
||||
Number of heap tuples deleted.
|
||||
This counter only advances when the phase is <literal>catch-up</literal>.
|
||||
</para></entry>
|
||||
</row>
|
||||
|
||||
|
|
@ -7078,6 +7099,14 @@ FROM pg_stat_get_backend_idset() AS backendid;
|
|||
<command>REPACK</command> is currently writing the new heap.
|
||||
</entry>
|
||||
</row>
|
||||
<row>
|
||||
<entry><literal>catch-up</literal></entry>
|
||||
<entry>
|
||||
<command>REPACK CONCURRENTLY</command> is currently processing the DML
|
||||
commands that other transactions executed during any of the preceding
|
||||
phases.
|
||||
</entry>
|
||||
</row>
|
||||
<row>
|
||||
<entry><literal>swapping relation files</literal></entry>
|
||||
<entry>
|
||||
|
|
|
|||
|
|
@ -1845,15 +1845,17 @@ SELECT pg_advisory_lock(q.id) FROM
|
|||
<title>Caveats</title>
|
||||
|
||||
<para>
|
||||
Some DDL commands, currently only <link linkend="sql-truncate"><command>TRUNCATE</command></link> and the
|
||||
table-rewriting forms of <link linkend="sql-altertable"><command>ALTER TABLE</command></link>, are not
|
||||
Some commands, currently only <link linkend="sql-truncate"><command>TRUNCATE</command></link>, the
|
||||
table-rewriting forms of <link linkend="sql-altertable"><command>ALTER
|
||||
TABLE</command></link> and <command>REPACK</command> with
|
||||
the <literal>CONCURRENTLY</literal> option, are not
|
||||
MVCC-safe. This means that after the truncation or rewrite commits, the
|
||||
table will appear empty to concurrent transactions, if they are using a
|
||||
snapshot taken before the DDL command committed. This will only be an
|
||||
snapshot taken before the command committed. This will only be an
|
||||
issue for a transaction that did not access the table in question
|
||||
before the DDL command started — any transaction that has done so
|
||||
before the command started — any transaction that has done so
|
||||
would hold at least an <literal>ACCESS SHARE</literal> table lock,
|
||||
which would block the DDL command until that transaction completes.
|
||||
which would block the truncating or rewriting command until that transaction completes.
|
||||
So these commands will not cause any apparent inconsistency in the
|
||||
table contents for successive queries on the target table, but they
|
||||
could cause visible inconsistency between the contents of the target
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ REPACK [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] USING
|
|||
|
||||
VERBOSE [ <replaceable class="parameter">boolean</replaceable> ]
|
||||
ANALYZE [ <replaceable class="parameter">boolean</replaceable> ]
|
||||
CONCURRENTLY [ <replaceable class="parameter">boolean</replaceable> ]
|
||||
|
||||
<phrase>and <replaceable class="parameter">table_and_columns</replaceable> is:</phrase>
|
||||
|
||||
|
|
@ -54,7 +55,8 @@ REPACK [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] USING
|
|||
processes every table and materialized view in the current database that
|
||||
the current user has the <literal>MAINTAIN</literal> privilege on. This
|
||||
form of <command>REPACK</command> cannot be executed inside a transaction
|
||||
block.
|
||||
block. Also, this form is not allowed if
|
||||
the <literal>CONCURRENTLY</literal> option is used.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
|
|
@ -67,7 +69,8 @@ REPACK [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] USING
|
|||
When a table is being repacked, an <literal>ACCESS EXCLUSIVE</literal> lock
|
||||
is acquired on it. This prevents any other database operations (both reads
|
||||
and writes) from operating on the table until the <command>REPACK</command>
|
||||
is finished.
|
||||
is finished. If you want to keep the table accessible during the repacking,
|
||||
consider using the <literal>CONCURRENTLY</literal> option.
|
||||
</para>
|
||||
|
||||
<refsect2 id="sql-repack-notes-on-clustering" xreflabel="Notes on Clustering">
|
||||
|
|
@ -198,6 +201,117 @@ REPACK [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] USING
|
|||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry>
|
||||
<term><literal>CONCURRENTLY</literal></term>
|
||||
<listitem>
|
||||
<para>
|
||||
Allow other transactions to use the table while it is being repacked.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
Internally, <command>REPACK</command> copies the contents of the table
|
||||
(ignoring dead tuples) into a new file, sorted by the specified index,
|
||||
and also creates a new file for each index. Then it swaps the old and
|
||||
new files for the table and all the indexes, and deletes the old
|
||||
files. The <literal>ACCESS EXCLUSIVE</literal> lock is needed to make
|
||||
sure that the old files do not change during the processing because the
|
||||
changes would get lost due to the swap.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
With the <literal>CONCURRENTLY</literal> option, the <literal>ACCESS
|
||||
EXCLUSIVE</literal> lock is only acquired to swap the table and index
|
||||
files. The data changes that took place during the creation of the new
|
||||
table and index files are captured using logical decoding
|
||||
(<xref linkend="logicaldecoding"/>) and applied before
|
||||
the <literal>ACCESS EXCLUSIVE</literal> lock is requested. Thus the lock
|
||||
is typically held only for the time needed to swap the files, which
|
||||
should be pretty short. However, the time might still be noticeable if
|
||||
too many data changes have been done to the table while
|
||||
<command>REPACK</command> was waiting for the lock: those changes must
|
||||
be processed just before the files are swapped, while the
|
||||
<literal>ACCESS EXCLUSIVE</literal> lock is being held.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
Note that <command>REPACK</command> with the
|
||||
<literal>CONCURRENTLY</literal> option does not try to order the rows
|
||||
inserted into the table after the repacking started. Also
|
||||
note <command>REPACK</command> might fail to complete due to DDL
|
||||
commands executed on the table by other transactions during the
|
||||
repacking.
|
||||
</para>
|
||||
|
||||
<note>
|
||||
<para>
|
||||
In addition to the temporary space requirements explained in
|
||||
<xref linkend="sql-repack-notes-on-resources"/>,
|
||||
the <literal>CONCURRENTLY</literal> option can add to the usage of
|
||||
temporary space a bit more. The reason is that other transactions can
|
||||
perform DML operations which cannot be applied to the new file until
|
||||
<command>REPACK</command> has copied all the existing tuples from the
|
||||
old file. Thus the tuples inserted into the old file during the copying
|
||||
are also stored separately in a temporary file, until they can be
|
||||
processed.
|
||||
</para>
|
||||
</note>
|
||||
|
||||
<para>
|
||||
The <literal>CONCURRENTLY</literal> option cannot be used in the
|
||||
following cases:
|
||||
|
||||
<itemizedlist>
|
||||
<listitem>
|
||||
<para>
|
||||
The table is <literal>UNLOGGED</literal>.
|
||||
</para>
|
||||
</listitem>
|
||||
|
||||
<listitem>
|
||||
<para>
|
||||
The table is partitioned.
|
||||
</para>
|
||||
</listitem>
|
||||
|
||||
<listitem>
|
||||
<para>
|
||||
The table lacks a primary key and index-based replica identity.
|
||||
</para>
|
||||
</listitem>
|
||||
|
||||
<listitem>
|
||||
<para>
|
||||
The table is a system catalog or a <acronym>TOAST</acronym> table.
|
||||
</para>
|
||||
</listitem>
|
||||
|
||||
<listitem>
|
||||
<para>
|
||||
<command>REPACK</command> is executed inside a transaction block.
|
||||
</para>
|
||||
</listitem>
|
||||
|
||||
<listitem>
|
||||
<para>
|
||||
The <link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link>
|
||||
configuration parameter does not allow for creation of an additional
|
||||
replication slot.
|
||||
</para>
|
||||
</listitem>
|
||||
</itemizedlist>
|
||||
</para>
|
||||
|
||||
<warning>
|
||||
<para>
|
||||
<command>REPACK</command> with the <literal>CONCURRENTLY</literal>
|
||||
option is not MVCC-safe, see <xref linkend="mvcc-caveats"/> for
|
||||
details.
|
||||
</para>
|
||||
</warning>
|
||||
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry>
|
||||
<term><literal>VERBOSE</literal></term>
|
||||
<listitem>
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ SUBDIRS = \
|
|||
interfaces \
|
||||
backend/replication/libpqwalreceiver \
|
||||
backend/replication/pgoutput \
|
||||
backend/replication/pgrepack \
|
||||
fe_utils \
|
||||
bin \
|
||||
pl \
|
||||
|
|
|
|||
|
|
@ -61,7 +61,8 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
|
|||
static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
|
||||
Buffer newbuf, HeapTuple oldtup,
|
||||
HeapTuple newtup, HeapTuple old_key_tuple,
|
||||
bool all_visible_cleared, bool new_all_visible_cleared);
|
||||
bool all_visible_cleared, bool new_all_visible_cleared,
|
||||
bool walLogical);
|
||||
#ifdef USE_ASSERT_CHECKING
|
||||
static void check_lock_if_inplace_updateable_rel(Relation relation,
|
||||
const ItemPointerData *otid,
|
||||
|
|
@ -2716,6 +2717,7 @@ heap_delete(Relation relation, const ItemPointerData *tid,
|
|||
uint16 new_infomask,
|
||||
new_infomask2;
|
||||
bool changingPart = (options & TABLE_DELETE_CHANGING_PARTITION) != 0;
|
||||
bool walLogical = (options & TABLE_DELETE_NO_LOGICAL) == 0;
|
||||
bool have_tuple_lock = false;
|
||||
bool iscombo;
|
||||
bool all_visible_cleared = false;
|
||||
|
|
@ -2950,7 +2952,8 @@ l1:
|
|||
* Compute replica identity tuple before entering the critical section so
|
||||
* we don't PANIC upon a memory allocation failure.
|
||||
*/
|
||||
old_key_tuple = ExtractReplicaIdentity(relation, &tp, true, &old_key_copied);
|
||||
old_key_tuple = walLogical ?
|
||||
ExtractReplicaIdentity(relation, &tp, true, &old_key_copied) : NULL;
|
||||
|
||||
/*
|
||||
* If this is the first possibly-multixact-able operation in the current
|
||||
|
|
@ -3040,6 +3043,16 @@ l1:
|
|||
xlrec.flags |= XLH_DELETE_CONTAINS_OLD_KEY;
|
||||
}
|
||||
|
||||
/*
|
||||
* Mark the change as not-for-logical-decoding if caller requested so.
|
||||
*
|
||||
* (This is used for changes that affect relations not visible to
|
||||
* other transactions, such as the transient table during concurrent
|
||||
* repack.)
|
||||
*/
|
||||
if (!walLogical)
|
||||
xlrec.flags |= XLH_DELETE_NO_LOGICAL;
|
||||
|
||||
XLogBeginInsert();
|
||||
XLogRegisterData(&xlrec, SizeOfHeapDelete);
|
||||
|
||||
|
|
@ -3190,6 +3203,7 @@ heap_update(Relation relation, const ItemPointerData *otid, HeapTuple newtup,
|
|||
HeapTuple heaptup;
|
||||
HeapTuple old_key_tuple = NULL;
|
||||
bool old_key_copied = false;
|
||||
bool walLogical = (options & TABLE_UPDATE_NO_LOGICAL) == 0;
|
||||
Page page,
|
||||
newpage;
|
||||
BlockNumber block;
|
||||
|
|
@ -4071,7 +4085,8 @@ l2:
|
|||
newbuf, &oldtup, heaptup,
|
||||
old_key_tuple,
|
||||
all_visible_cleared,
|
||||
all_visible_cleared_new);
|
||||
all_visible_cleared_new,
|
||||
walLogical);
|
||||
if (newbuf != buffer)
|
||||
{
|
||||
PageSetLSN(newpage, recptr);
|
||||
|
|
@ -8747,7 +8762,8 @@ static XLogRecPtr
|
|||
log_heap_update(Relation reln, Buffer oldbuf,
|
||||
Buffer newbuf, HeapTuple oldtup, HeapTuple newtup,
|
||||
HeapTuple old_key_tuple,
|
||||
bool all_visible_cleared, bool new_all_visible_cleared)
|
||||
bool all_visible_cleared, bool new_all_visible_cleared,
|
||||
bool walLogical)
|
||||
{
|
||||
xl_heap_update xlrec;
|
||||
xl_heap_header xlhdr;
|
||||
|
|
@ -8758,7 +8774,7 @@ log_heap_update(Relation reln, Buffer oldbuf,
|
|||
suffixlen = 0;
|
||||
XLogRecPtr recptr;
|
||||
Page page = BufferGetPage(newbuf);
|
||||
bool need_tuple_data = RelationIsLogicallyLogged(reln);
|
||||
bool need_tuple_data = walLogical && RelationIsLogicallyLogged(reln);
|
||||
bool init;
|
||||
int bufflags;
|
||||
|
||||
|
|
|
|||
|
|
@ -51,6 +51,11 @@
|
|||
static void reform_and_rewrite_tuple(HeapTuple tuple,
|
||||
Relation OldHeap, Relation NewHeap,
|
||||
Datum *values, bool *isnull, RewriteState rwstate);
|
||||
static void heap_insert_for_repack(HeapTuple tuple, Relation OldHeap,
|
||||
Relation NewHeap, Datum *values, bool *isnull,
|
||||
BulkInsertState bistate);
|
||||
static HeapTuple reform_tuple(HeapTuple tuple, Relation OldHeap,
|
||||
Relation NewHeap, Datum *values, bool *isnull);
|
||||
|
||||
static bool SampleHeapTupleVisible(TableScanDesc scan, Buffer buffer,
|
||||
HeapTuple tuple,
|
||||
|
|
@ -589,6 +594,7 @@ static void
|
|||
heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
|
||||
Relation OldIndex, bool use_sort,
|
||||
TransactionId OldestXmin,
|
||||
Snapshot snapshot,
|
||||
TransactionId *xid_cutoff,
|
||||
MultiXactId *multi_cutoff,
|
||||
double *num_tuples,
|
||||
|
|
@ -596,6 +602,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
|
|||
double *tups_recently_dead)
|
||||
{
|
||||
RewriteState rwstate;
|
||||
BulkInsertState bistate;
|
||||
IndexScanDesc indexScan;
|
||||
TableScanDesc tableScan;
|
||||
HeapScanDesc heapScan;
|
||||
|
|
@ -609,6 +616,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
|
|||
bool *isnull;
|
||||
BufferHeapTupleTableSlot *hslot;
|
||||
BlockNumber prev_cblock = InvalidBlockNumber;
|
||||
bool concurrent = snapshot != NULL;
|
||||
|
||||
/* Remember if it's a system catalog */
|
||||
is_system_catalog = IsSystemRelation(OldHeap);
|
||||
|
|
@ -624,10 +632,21 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
|
|||
values = palloc_array(Datum, natts);
|
||||
isnull = palloc_array(bool, natts);
|
||||
|
||||
/* Initialize the rewrite operation */
|
||||
rwstate = begin_heap_rewrite(OldHeap, NewHeap, OldestXmin, *xid_cutoff,
|
||||
*multi_cutoff);
|
||||
/*
|
||||
* In non-concurrent mode, initialize the rewrite operation. This is not
|
||||
* needed in concurrent mode.
|
||||
*/
|
||||
if (!concurrent)
|
||||
rwstate = begin_heap_rewrite(OldHeap, NewHeap, OldestXmin,
|
||||
*xid_cutoff, *multi_cutoff);
|
||||
else
|
||||
rwstate = NULL;
|
||||
|
||||
/* In concurrent mode, prepare for bulk-insert operation. */
|
||||
if (concurrent)
|
||||
bistate = GetBulkInsertState();
|
||||
else
|
||||
bistate = NULL;
|
||||
|
||||
/* Set up sorting if wanted */
|
||||
if (use_sort)
|
||||
|
|
@ -641,6 +660,9 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
|
|||
* Prepare to scan the OldHeap. To ensure we see recently-dead tuples
|
||||
* that still need to be copied, we scan with SnapshotAny and use
|
||||
* HeapTupleSatisfiesVacuum for the visibility test.
|
||||
*
|
||||
* In the CONCURRENTLY case, we do regular MVCC visibility tests, using
|
||||
* the snapshot passed by the caller.
|
||||
*/
|
||||
if (OldIndex != NULL && !use_sort)
|
||||
{
|
||||
|
|
@ -657,7 +679,9 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
|
|||
|
||||
tableScan = NULL;
|
||||
heapScan = NULL;
|
||||
indexScan = index_beginscan(OldHeap, OldIndex, SnapshotAny, NULL, 0, 0,
|
||||
indexScan = index_beginscan(OldHeap, OldIndex,
|
||||
snapshot ? snapshot : SnapshotAny,
|
||||
NULL, 0, 0,
|
||||
SO_NONE);
|
||||
index_rescan(indexScan, NULL, 0, NULL, 0);
|
||||
}
|
||||
|
|
@ -667,7 +691,9 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
|
|||
pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
|
||||
PROGRESS_REPACK_PHASE_SEQ_SCAN_HEAP);
|
||||
|
||||
tableScan = table_beginscan(OldHeap, SnapshotAny, 0, (ScanKey) NULL,
|
||||
tableScan = table_beginscan(OldHeap,
|
||||
snapshot ? snapshot : SnapshotAny,
|
||||
0, (ScanKey) NULL,
|
||||
SO_NONE);
|
||||
heapScan = (HeapScanDesc) tableScan;
|
||||
indexScan = NULL;
|
||||
|
|
@ -744,83 +770,94 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
|
|||
buf = hslot->buffer;
|
||||
|
||||
/*
|
||||
* To be able to guarantee that we can set the hint bit, acquire an
|
||||
* exclusive lock on the old buffer. We need the hint bits, set in
|
||||
* heapam_relation_copy_for_cluster() -> HeapTupleSatisfiesVacuum(),
|
||||
* to be set, as otherwise reform_and_rewrite_tuple() ->
|
||||
* rewrite_heap_tuple() will get confused. Specifically,
|
||||
* rewrite_heap_tuple() checks for HEAP_XMAX_INVALID in the old tuple
|
||||
* to determine whether to check the old-to-new mapping hash table.
|
||||
*
|
||||
* It'd be better if we somehow could avoid setting hint bits on the
|
||||
* old page. One reason to use VACUUM FULL are very bloated tables -
|
||||
* rewriting most of the old table during VACUUM FULL doesn't exactly
|
||||
* help...
|
||||
* In concurrent mode, our table or index scan has used regular MVCC
|
||||
* visibility test against a snapshot passed by caller; therefore we
|
||||
* don't need another visibility test. In non-concurrent mode
|
||||
* however, we must test the visibility of each tuple we read.
|
||||
*/
|
||||
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
|
||||
|
||||
switch (HeapTupleSatisfiesVacuum(tuple, OldestXmin, buf))
|
||||
if (!concurrent)
|
||||
{
|
||||
case HEAPTUPLE_DEAD:
|
||||
/* Definitely dead */
|
||||
isdead = true;
|
||||
break;
|
||||
case HEAPTUPLE_RECENTLY_DEAD:
|
||||
*tups_recently_dead += 1;
|
||||
pg_fallthrough;
|
||||
case HEAPTUPLE_LIVE:
|
||||
/* Live or recently dead, must copy it */
|
||||
isdead = false;
|
||||
break;
|
||||
case HEAPTUPLE_INSERT_IN_PROGRESS:
|
||||
/*
|
||||
* To be able to guarantee that we can set the hint bit, acquire
|
||||
* an exclusive lock on the old buffer. We need the hint bits, set
|
||||
* in heapam_relation_copy_for_cluster() ->
|
||||
* HeapTupleSatisfiesVacuum(), to be set, as otherwise
|
||||
* reform_and_rewrite_tuple() -> rewrite_heap_tuple() will get
|
||||
* confused. Specifically, rewrite_heap_tuple() checks for
|
||||
* HEAP_XMAX_INVALID in the old tuple to determine whether to
|
||||
* check the old-to-new mapping hash table.
|
||||
*
|
||||
* It'd be better if we somehow could avoid setting hint bits on
|
||||
* the old page. One reason to use VACUUM FULL are very bloated
|
||||
* tables - rewriting most of the old table during VACUUM FULL
|
||||
* doesn't exactly help...
|
||||
*/
|
||||
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
|
||||
|
||||
/*
|
||||
* Since we hold exclusive lock on the relation, normally the
|
||||
* only way to see this is if it was inserted earlier in our
|
||||
* own transaction. However, it can happen in system
|
||||
* catalogs, since we tend to release write lock before commit
|
||||
* there. Give a warning if neither case applies; but in any
|
||||
* case we had better copy it.
|
||||
*/
|
||||
if (!is_system_catalog &&
|
||||
!TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(tuple->t_data)))
|
||||
elog(WARNING, "concurrent insert in progress within table \"%s\"",
|
||||
RelationGetRelationName(OldHeap));
|
||||
/* treat as live */
|
||||
isdead = false;
|
||||
break;
|
||||
case HEAPTUPLE_DELETE_IN_PROGRESS:
|
||||
|
||||
/*
|
||||
* Similar situation to INSERT_IN_PROGRESS case.
|
||||
*/
|
||||
if (!is_system_catalog &&
|
||||
!TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetUpdateXid(tuple->t_data)))
|
||||
elog(WARNING, "concurrent delete in progress within table \"%s\"",
|
||||
RelationGetRelationName(OldHeap));
|
||||
/* treat as recently dead */
|
||||
*tups_recently_dead += 1;
|
||||
isdead = false;
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
|
||||
isdead = false; /* keep compiler quiet */
|
||||
break;
|
||||
}
|
||||
|
||||
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
|
||||
|
||||
if (isdead)
|
||||
{
|
||||
*tups_vacuumed += 1;
|
||||
/* heap rewrite module still needs to see it... */
|
||||
if (rewrite_heap_dead_tuple(rwstate, tuple))
|
||||
switch (HeapTupleSatisfiesVacuum(tuple, OldestXmin, buf))
|
||||
{
|
||||
/* A previous recently-dead tuple is now known dead */
|
||||
*tups_vacuumed += 1;
|
||||
*tups_recently_dead -= 1;
|
||||
case HEAPTUPLE_DEAD:
|
||||
/* Definitely dead */
|
||||
isdead = true;
|
||||
break;
|
||||
case HEAPTUPLE_RECENTLY_DEAD:
|
||||
*tups_recently_dead += 1;
|
||||
pg_fallthrough;
|
||||
case HEAPTUPLE_LIVE:
|
||||
/* Live or recently dead, must copy it */
|
||||
isdead = false;
|
||||
break;
|
||||
case HEAPTUPLE_INSERT_IN_PROGRESS:
|
||||
|
||||
/*
|
||||
* As long as we hold exclusive lock on the relation,
|
||||
* normally the only way to see this is if it was inserted
|
||||
* earlier in our own transaction. However, it can happen
|
||||
* in system catalogs, since we tend to release write lock
|
||||
* before commit there. Give a warning if neither case
|
||||
* applies; but in any case we had better copy it.
|
||||
*/
|
||||
if (!is_system_catalog &&
|
||||
!TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(tuple->t_data)))
|
||||
elog(WARNING, "concurrent insert in progress within table \"%s\"",
|
||||
RelationGetRelationName(OldHeap));
|
||||
/* treat as live */
|
||||
isdead = false;
|
||||
break;
|
||||
case HEAPTUPLE_DELETE_IN_PROGRESS:
|
||||
|
||||
/*
|
||||
* Similar situation to INSERT_IN_PROGRESS case.
|
||||
*/
|
||||
if (!is_system_catalog &&
|
||||
!TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetUpdateXid(tuple->t_data)))
|
||||
elog(WARNING, "concurrent delete in progress within table \"%s\"",
|
||||
RelationGetRelationName(OldHeap));
|
||||
/* treat as recently dead */
|
||||
*tups_recently_dead += 1;
|
||||
isdead = false;
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
|
||||
isdead = false; /* keep compiler quiet */
|
||||
break;
|
||||
}
|
||||
|
||||
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
|
||||
|
||||
if (isdead)
|
||||
{
|
||||
*tups_vacuumed += 1;
|
||||
/* heap rewrite module still needs to see it... */
|
||||
if (rewrite_heap_dead_tuple(rwstate, tuple))
|
||||
{
|
||||
/* A previous recently-dead tuple is now known dead */
|
||||
*tups_vacuumed += 1;
|
||||
*tups_recently_dead -= 1;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
*num_tuples += 1;
|
||||
|
|
@ -839,12 +876,16 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
|
|||
{
|
||||
const int ct_index[] = {
|
||||
PROGRESS_REPACK_HEAP_TUPLES_SCANNED,
|
||||
PROGRESS_REPACK_HEAP_TUPLES_WRITTEN
|
||||
PROGRESS_REPACK_HEAP_TUPLES_INSERTED
|
||||
};
|
||||
int64 ct_val[2];
|
||||
|
||||
reform_and_rewrite_tuple(tuple, OldHeap, NewHeap,
|
||||
values, isnull, rwstate);
|
||||
if (!concurrent)
|
||||
reform_and_rewrite_tuple(tuple, OldHeap, NewHeap,
|
||||
values, isnull, rwstate);
|
||||
else
|
||||
heap_insert_for_repack(tuple, OldHeap, NewHeap,
|
||||
values, isnull, bistate);
|
||||
|
||||
/*
|
||||
* In indexscan mode and also VACUUM FULL, report increase in
|
||||
|
|
@ -892,12 +933,17 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
|
|||
break;
|
||||
|
||||
n_tuples += 1;
|
||||
reform_and_rewrite_tuple(tuple,
|
||||
OldHeap, NewHeap,
|
||||
values, isnull,
|
||||
rwstate);
|
||||
if (!concurrent)
|
||||
reform_and_rewrite_tuple(tuple,
|
||||
OldHeap, NewHeap,
|
||||
values, isnull,
|
||||
rwstate);
|
||||
else
|
||||
heap_insert_for_repack(tuple, OldHeap, NewHeap,
|
||||
values, isnull, bistate);
|
||||
|
||||
/* Report n_tuples */
|
||||
pgstat_progress_update_param(PROGRESS_REPACK_HEAP_TUPLES_WRITTEN,
|
||||
pgstat_progress_update_param(PROGRESS_REPACK_HEAP_TUPLES_INSERTED,
|
||||
n_tuples);
|
||||
}
|
||||
|
||||
|
|
@ -905,7 +951,10 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
|
|||
}
|
||||
|
||||
/* Write out any remaining tuples, and fsync if needed */
|
||||
end_heap_rewrite(rwstate);
|
||||
if (rwstate)
|
||||
end_heap_rewrite(rwstate);
|
||||
if (bistate)
|
||||
FreeBulkInsertState(bistate);
|
||||
|
||||
/* Clean up */
|
||||
pfree(values);
|
||||
|
|
@ -2303,27 +2352,84 @@ static void
|
|||
reform_and_rewrite_tuple(HeapTuple tuple,
|
||||
Relation OldHeap, Relation NewHeap,
|
||||
Datum *values, bool *isnull, RewriteState rwstate)
|
||||
{
|
||||
HeapTuple newtuple;
|
||||
|
||||
newtuple = reform_tuple(tuple, OldHeap, NewHeap, values, isnull);
|
||||
|
||||
/* The heap rewrite module does the rest */
|
||||
rewrite_heap_tuple(rwstate, tuple, newtuple);
|
||||
|
||||
heap_freetuple(newtuple);
|
||||
}
|
||||
|
||||
/*
|
||||
* Insert tuple when processing REPACK CONCURRENTLY.
|
||||
*
|
||||
* rewriteheap.c is not used in the CONCURRENTLY case because it'd be
|
||||
* difficult to do the same in the catch-up phase (as the logical
|
||||
* decoding does not provide us with sufficient visibility
|
||||
* information). Thus we must use heap_insert() both during the
|
||||
* catch-up and here.
|
||||
*
|
||||
* We pass the NO_LOGICAL flag to heap_insert() in order to skip logical
|
||||
* decoding: as soon as REPACK CONCURRENTLY swaps the relation files, it drops
|
||||
* this relation, so no logical replication subscription should need the data.
|
||||
*
|
||||
* BulkInsertState is used because many tuples are inserted in the typical
|
||||
* case.
|
||||
*/
|
||||
static void
|
||||
heap_insert_for_repack(HeapTuple tuple, Relation OldHeap, Relation NewHeap,
|
||||
Datum *values, bool *isnull, BulkInsertState bistate)
|
||||
{
|
||||
HeapTuple newtuple;
|
||||
|
||||
newtuple = reform_tuple(tuple, OldHeap, NewHeap, values, isnull);
|
||||
|
||||
heap_insert(NewHeap, newtuple, GetCurrentCommandId(true),
|
||||
HEAP_INSERT_NO_LOGICAL, bistate);
|
||||
|
||||
heap_freetuple(newtuple);
|
||||
}
|
||||
|
||||
/*
|
||||
* Subroutine for reform_and_rewrite_tuple and heap_insert_for_repack.
|
||||
*
|
||||
* Deform the given tuple, set values of dropped columns to NULL, form a new
|
||||
* tuple and return it. If no attributes need to be changed in this way, a
|
||||
* copy of the original tuple is returned. Caller is responsible for freeing
|
||||
* the returned tuple.
|
||||
*
|
||||
* XXX this coding assumes that both relations have the same tupledesc.
|
||||
*/
|
||||
static HeapTuple
|
||||
reform_tuple(HeapTuple tuple, Relation OldHeap, Relation NewHeap,
|
||||
Datum *values, bool *isnull)
|
||||
{
|
||||
TupleDesc oldTupDesc = RelationGetDescr(OldHeap);
|
||||
TupleDesc newTupDesc = RelationGetDescr(NewHeap);
|
||||
HeapTuple copiedTuple;
|
||||
int i;
|
||||
bool needs_reform = false;
|
||||
|
||||
/* Skip work if the tuple doesn't need any attributes changed */
|
||||
for (int i = 0; i < newTupDesc->natts; i++)
|
||||
{
|
||||
if (TupleDescCompactAttr(newTupDesc, i)->attisdropped &&
|
||||
!heap_attisnull(tuple, i + 1, newTupDesc))
|
||||
needs_reform = true;
|
||||
}
|
||||
if (!needs_reform)
|
||||
return heap_copytuple(tuple);
|
||||
|
||||
heap_deform_tuple(tuple, oldTupDesc, values, isnull);
|
||||
|
||||
/* Be sure to null out any dropped columns */
|
||||
for (i = 0; i < newTupDesc->natts; i++)
|
||||
for (int i = 0; i < newTupDesc->natts; i++)
|
||||
{
|
||||
if (TupleDescCompactAttr(newTupDesc, i)->attisdropped)
|
||||
isnull[i] = true;
|
||||
}
|
||||
|
||||
copiedTuple = heap_form_tuple(newTupDesc, values, isnull);
|
||||
|
||||
/* The heap rewrite module does the rest */
|
||||
rewrite_heap_tuple(rwstate, tuple, copiedTuple);
|
||||
|
||||
heap_freetuple(copiedTuple);
|
||||
return heap_form_tuple(newTupDesc, values, isnull);
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
|||
|
|
@ -621,9 +621,9 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
|
|||
uint32 options = HEAP_INSERT_SKIP_FSM;
|
||||
|
||||
/*
|
||||
* While rewriting the heap for VACUUM FULL / CLUSTER, make sure data
|
||||
* for the TOAST table are not logically decoded. The main heap is
|
||||
* WAL-logged as XLOG FPI records, which are not logically decoded.
|
||||
* While rewriting the heap for REPACK, make sure data for the TOAST
|
||||
* table are not logically decoded. The main heap is WAL-logged as
|
||||
* XLOG FPI records, which are not logically decoded.
|
||||
*/
|
||||
options |= HEAP_INSERT_NO_LOGICAL;
|
||||
|
||||
|
|
|
|||
|
|
@ -1343,16 +1343,19 @@ CREATE VIEW pg_stat_progress_repack AS
|
|||
WHEN 2 THEN 'index scanning heap'
|
||||
WHEN 3 THEN 'sorting tuples'
|
||||
WHEN 4 THEN 'writing new heap'
|
||||
WHEN 5 THEN 'swapping relation files'
|
||||
WHEN 6 THEN 'rebuilding index'
|
||||
WHEN 7 THEN 'performing final cleanup'
|
||||
WHEN 5 THEN 'catch-up'
|
||||
WHEN 6 THEN 'swapping relation files'
|
||||
WHEN 7 THEN 'rebuilding index'
|
||||
WHEN 8 THEN 'performing final cleanup'
|
||||
END AS phase,
|
||||
CAST(S.param3 AS oid) AS repack_index_relid,
|
||||
S.param4 AS heap_tuples_scanned,
|
||||
S.param5 AS heap_tuples_written,
|
||||
S.param6 AS heap_blks_total,
|
||||
S.param7 AS heap_blks_scanned,
|
||||
S.param8 AS index_rebuild_count
|
||||
S.param5 AS heap_tuples_inserted,
|
||||
S.param6 AS heap_tuples_updated,
|
||||
S.param7 AS heap_tuples_deleted,
|
||||
S.param8 AS heap_blks_total,
|
||||
S.param9 AS heap_blks_scanned,
|
||||
S.param10 AS index_rebuild_count
|
||||
FROM pg_stat_get_progress_info('REPACK') AS S
|
||||
LEFT JOIN pg_database D ON S.datid = D.oid;
|
||||
|
||||
|
|
@ -1370,7 +1373,7 @@ CREATE VIEW pg_stat_progress_cluster AS
|
|||
phase,
|
||||
repack_index_relid AS cluster_index_relid,
|
||||
heap_tuples_scanned,
|
||||
heap_tuples_written,
|
||||
heap_tuples_inserted + heap_tuples_updated AS heap_tuples_written,
|
||||
heap_blks_total,
|
||||
heap_blks_scanned,
|
||||
index_rebuild_count
|
||||
|
|
|
|||
|
|
@ -51,6 +51,7 @@ OBJS = \
|
|||
propgraphcmds.o \
|
||||
publicationcmds.o \
|
||||
repack.o \
|
||||
repack_worker.o \
|
||||
schemacmds.o \
|
||||
seclabel.o \
|
||||
sequence.o \
|
||||
|
|
|
|||
|
|
@ -893,6 +893,7 @@ static void
|
|||
refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
|
||||
{
|
||||
finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
|
||||
true, /* reindex */
|
||||
RecentXmin, ReadNextMultiXactId(), relpersistence);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -39,6 +39,7 @@ backend_sources += files(
|
|||
'propgraphcmds.c',
|
||||
'publicationcmds.c',
|
||||
'repack.c',
|
||||
'repack_worker.c',
|
||||
'schemacmds.c',
|
||||
'seclabel.c',
|
||||
'sequence.c',
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
533
src/backend/commands/repack_worker.c
Normal file
533
src/backend/commands/repack_worker.c
Normal file
|
|
@ -0,0 +1,533 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* repack_worker.c
|
||||
* Implementation of the background worker for ad-hoc logical decoding
|
||||
* during REPACK (CONCURRENTLY).
|
||||
*
|
||||
*
|
||||
* Copyright (c) 2026, PostgreSQL Global Development Group
|
||||
*
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/backend/commands/repack_worker.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/table.h"
|
||||
#include "access/xlog_internal.h"
|
||||
#include "access/xlogutils.h"
|
||||
#include "access/xlogwait.h"
|
||||
#include "commands/repack.h"
|
||||
#include "commands/repack_internal.h"
|
||||
#include "libpq/pqmq.h"
|
||||
#include "replication/snapbuild.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/proc.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
#include "utils/memutils.h"
|
||||
|
||||
#define REPL_PLUGIN_NAME "pgrepack"
|
||||
|
||||
static void RepackWorkerShutdown(int code, Datum arg);
|
||||
static LogicalDecodingContext *repack_setup_logical_decoding(Oid relid);
|
||||
static void repack_cleanup_logical_decoding(LogicalDecodingContext *ctx);
|
||||
static void export_initial_snapshot(Snapshot snapshot,
|
||||
DecodingWorkerShared *shared);
|
||||
static bool decode_concurrent_changes(LogicalDecodingContext *ctx,
|
||||
DecodingWorkerShared *shared);
|
||||
|
||||
/* Is this process a REPACK worker? */
|
||||
static bool am_repack_worker = false;
|
||||
|
||||
/* The WAL segment being decoded. */
|
||||
static XLogSegNo repack_current_segment = 0;
|
||||
|
||||
/*
|
||||
* Keep track of the table we're processing, to skip logical decoding of data
|
||||
* from other relations.
|
||||
*/
|
||||
static RelFileLocator repacked_rel_locator = {.relNumber = InvalidOid};
|
||||
static RelFileLocator repacked_rel_toast_locator = {.relNumber = InvalidOid};
|
||||
|
||||
|
||||
/* REPACK decoding worker entry point */
|
||||
void
|
||||
RepackWorkerMain(Datum main_arg)
|
||||
{
|
||||
dsm_segment *seg;
|
||||
DecodingWorkerShared *shared;
|
||||
shm_mq *mq;
|
||||
shm_mq_handle *mqh;
|
||||
LogicalDecodingContext *decoding_ctx;
|
||||
SharedFileSet *sfs;
|
||||
Snapshot snapshot;
|
||||
|
||||
am_repack_worker = true;
|
||||
|
||||
/*
|
||||
* Override the default bgworker_die() with die() so we can use
|
||||
* CHECK_FOR_INTERRUPTS().
|
||||
*/
|
||||
pqsignal(SIGTERM, die);
|
||||
BackgroundWorkerUnblockSignals();
|
||||
|
||||
seg = dsm_attach(DatumGetUInt32(main_arg));
|
||||
if (seg == NULL)
|
||||
ereport(ERROR,
|
||||
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("could not map dynamic shared memory segment"));
|
||||
|
||||
shared = (DecodingWorkerShared *) dsm_segment_address(seg);
|
||||
shared->dsm_seg = seg;
|
||||
|
||||
/* Arrange to signal the leader if we exit. */
|
||||
before_shmem_exit(RepackWorkerShutdown, PointerGetDatum(shared));
|
||||
|
||||
/*
|
||||
* Join locking group - see the comments around the call of
|
||||
* start_repack_decoding_worker().
|
||||
*/
|
||||
if (!BecomeLockGroupMember(shared->backend_proc, shared->backend_pid))
|
||||
return; /* The leader is not running anymore. */
|
||||
|
||||
/*
|
||||
* Setup a queue to send error messages to the backend that launched this
|
||||
* worker.
|
||||
*/
|
||||
mq = (shm_mq *) (char *) BUFFERALIGN(shared->error_queue);
|
||||
shm_mq_set_sender(mq, MyProc);
|
||||
mqh = shm_mq_attach(mq, seg, NULL);
|
||||
pq_redirect_to_shm_mq(seg, mqh);
|
||||
pq_set_parallel_leader(shared->backend_pid,
|
||||
shared->backend_proc_number);
|
||||
|
||||
/* Connect to the database. */
|
||||
BackgroundWorkerInitializeConnectionByOid(shared->dbid, shared->roleid, 0);
|
||||
|
||||
/*
|
||||
* Transaction is needed to open relation, and it also provides us with a
|
||||
* resource owner.
|
||||
*/
|
||||
StartTransactionCommand();
|
||||
|
||||
shared = (DecodingWorkerShared *) dsm_segment_address(seg);
|
||||
|
||||
/*
|
||||
* Not sure the spinlock is needed here - the backend should not change
|
||||
* anything in the shared memory until we have serialized the snapshot.
|
||||
*/
|
||||
SpinLockAcquire(&shared->mutex);
|
||||
Assert(!XLogRecPtrIsValid(shared->lsn_upto));
|
||||
sfs = &shared->sfs;
|
||||
SpinLockRelease(&shared->mutex);
|
||||
|
||||
SharedFileSetAttach(sfs, seg);
|
||||
|
||||
/*
|
||||
* Prepare to capture the concurrent data changes ourselves.
|
||||
*/
|
||||
decoding_ctx = repack_setup_logical_decoding(shared->relid);
|
||||
|
||||
/* Announce that we're ready. */
|
||||
SpinLockAcquire(&shared->mutex);
|
||||
shared->initialized = true;
|
||||
SpinLockRelease(&shared->mutex);
|
||||
ConditionVariableSignal(&shared->cv);
|
||||
|
||||
/* There doesn't seem to a nice API to set these */
|
||||
XactIsoLevel = XACT_REPEATABLE_READ;
|
||||
XactReadOnly = true;
|
||||
|
||||
/* Build the initial snapshot and export it. */
|
||||
snapshot = SnapBuildInitialSnapshot(decoding_ctx->snapshot_builder);
|
||||
export_initial_snapshot(snapshot, shared);
|
||||
|
||||
/*
|
||||
* Only historic snapshots should be used now. Do not let us restrict the
|
||||
* progress of xmin horizon.
|
||||
*/
|
||||
InvalidateCatalogSnapshot();
|
||||
|
||||
for (;;)
|
||||
{
|
||||
bool stop = decode_concurrent_changes(decoding_ctx, shared);
|
||||
|
||||
if (stop)
|
||||
break;
|
||||
|
||||
}
|
||||
|
||||
/* Cleanup. */
|
||||
repack_cleanup_logical_decoding(decoding_ctx);
|
||||
CommitTransactionCommand();
|
||||
}
|
||||
|
||||
/*
|
||||
* See ParallelWorkerShutdown for details.
|
||||
*/
|
||||
static void
|
||||
RepackWorkerShutdown(int code, Datum arg)
|
||||
{
|
||||
DecodingWorkerShared *shared = (DecodingWorkerShared *) DatumGetPointer(arg);
|
||||
|
||||
SendProcSignal(shared->backend_pid,
|
||||
PROCSIG_REPACK_MESSAGE,
|
||||
shared->backend_proc_number);
|
||||
|
||||
dsm_detach(shared->dsm_seg);
|
||||
}
|
||||
|
||||
bool
|
||||
AmRepackWorker(void)
|
||||
{
|
||||
return am_repack_worker;
|
||||
}
|
||||
|
||||
/*
|
||||
* This function is much like pg_create_logical_replication_slot() except that
|
||||
* the new slot is neither released (if anyone else could read changes from
|
||||
* our slot, we could miss changes other backends do while we copy the
|
||||
* existing data into temporary table), nor persisted (it's easier to handle
|
||||
* crash by restarting all the work from scratch).
|
||||
*/
|
||||
static LogicalDecodingContext *
|
||||
repack_setup_logical_decoding(Oid relid)
|
||||
{
|
||||
Relation rel;
|
||||
Oid toastrelid;
|
||||
LogicalDecodingContext *ctx;
|
||||
NameData slotname;
|
||||
RepackDecodingState *dstate;
|
||||
MemoryContext oldcxt;
|
||||
|
||||
/*
|
||||
* REPACK CONCURRENTLY is not allowed in a transaction block, so this
|
||||
* should never fire.
|
||||
*/
|
||||
Assert(!TransactionIdIsValid(GetTopTransactionIdIfAny()));
|
||||
|
||||
/*
|
||||
* Make sure we can use logical decoding.
|
||||
*/
|
||||
CheckSlotPermissions();
|
||||
CheckLogicalDecodingRequirements();
|
||||
|
||||
/*
|
||||
* A single backend should not execute multiple REPACK commands at a time,
|
||||
* so use PID to make the slot unique.
|
||||
*
|
||||
* RS_TEMPORARY so that the slot gets cleaned up on ERROR.
|
||||
*/
|
||||
snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid);
|
||||
ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, false,
|
||||
false);
|
||||
|
||||
EnsureLogicalDecodingEnabled();
|
||||
|
||||
/*
|
||||
* Neither prepare_write nor do_write callback nor update_progress is
|
||||
* useful for us.
|
||||
*/
|
||||
ctx = CreateInitDecodingContext(REPL_PLUGIN_NAME,
|
||||
NIL,
|
||||
true,
|
||||
InvalidXLogRecPtr,
|
||||
XL_ROUTINE(.page_read = read_local_xlog_page,
|
||||
.segment_open = wal_segment_open,
|
||||
.segment_close = wal_segment_close),
|
||||
NULL, NULL, NULL);
|
||||
|
||||
/*
|
||||
* We don't have control on setting fast_forward, so at least check it.
|
||||
*/
|
||||
Assert(!ctx->fast_forward);
|
||||
|
||||
/* Avoid logical decoding of other relations. */
|
||||
rel = table_open(relid, AccessShareLock);
|
||||
repacked_rel_locator = rel->rd_locator;
|
||||
toastrelid = rel->rd_rel->reltoastrelid;
|
||||
if (OidIsValid(toastrelid))
|
||||
{
|
||||
Relation toastrel;
|
||||
|
||||
/* Avoid logical decoding of other TOAST relations. */
|
||||
toastrel = table_open(toastrelid, AccessShareLock);
|
||||
repacked_rel_toast_locator = toastrel->rd_locator;
|
||||
table_close(toastrel, AccessShareLock);
|
||||
}
|
||||
table_close(rel, AccessShareLock);
|
||||
|
||||
DecodingContextFindStartpoint(ctx);
|
||||
|
||||
/*
|
||||
* decode_concurrent_changes() needs non-blocking callback.
|
||||
*/
|
||||
ctx->reader->routine.page_read = read_local_xlog_page_no_wait;
|
||||
|
||||
/* Some WAL records should have been read. */
|
||||
Assert(ctx->reader->EndRecPtr != InvalidXLogRecPtr);
|
||||
|
||||
/*
|
||||
* Initialize repack_current_segment so that we can notice WAL segment
|
||||
* boundaries.
|
||||
*/
|
||||
XLByteToSeg(ctx->reader->EndRecPtr, repack_current_segment,
|
||||
wal_segment_size);
|
||||
|
||||
/* Our private state belongs to the decoding context. */
|
||||
oldcxt = MemoryContextSwitchTo(ctx->context);
|
||||
|
||||
/*
|
||||
* read_local_xlog_page_no_wait() needs to be able to indicate the end of
|
||||
* WAL.
|
||||
*/
|
||||
ctx->reader->private_data = palloc0_object(ReadLocalXLogPageNoWaitPrivate);
|
||||
dstate = palloc0_object(RepackDecodingState);
|
||||
MemoryContextSwitchTo(oldcxt);
|
||||
|
||||
#ifdef USE_ASSERT_CHECKING
|
||||
dstate->relid = relid;
|
||||
#endif
|
||||
|
||||
dstate->change_cxt = AllocSetContextCreate(ctx->context,
|
||||
"REPACK - change",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
|
||||
/* The file will be set as soon as we have it opened. */
|
||||
dstate->file = NULL;
|
||||
|
||||
/*
|
||||
* Memory context and resource owner for long-lived resources.
|
||||
*/
|
||||
dstate->worker_cxt = CurrentMemoryContext;
|
||||
dstate->worker_resowner = CurrentResourceOwner;
|
||||
|
||||
ctx->output_writer_private = dstate;
|
||||
|
||||
return ctx;
|
||||
}
|
||||
|
||||
static void
|
||||
repack_cleanup_logical_decoding(LogicalDecodingContext *ctx)
|
||||
{
|
||||
RepackDecodingState *dstate;
|
||||
|
||||
dstate = (RepackDecodingState *) ctx->output_writer_private;
|
||||
if (dstate->slot)
|
||||
ExecDropSingleTupleTableSlot(dstate->slot);
|
||||
|
||||
FreeDecodingContext(ctx);
|
||||
ReplicationSlotDropAcquired();
|
||||
}
|
||||
|
||||
/*
|
||||
* Make snapshot available to the backend that launched the decoding worker.
|
||||
*/
|
||||
static void
|
||||
export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared)
|
||||
{
|
||||
char fname[MAXPGPATH];
|
||||
BufFile *file;
|
||||
Size snap_size;
|
||||
char *snap_space;
|
||||
|
||||
snap_size = EstimateSnapshotSpace(snapshot);
|
||||
snap_space = (char *) palloc(snap_size);
|
||||
SerializeSnapshot(snapshot, snap_space);
|
||||
|
||||
DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
|
||||
file = BufFileCreateFileSet(&shared->sfs.fs, fname);
|
||||
/* To make restoration easier, write the snapshot size first. */
|
||||
BufFileWrite(file, &snap_size, sizeof(snap_size));
|
||||
BufFileWrite(file, snap_space, snap_size);
|
||||
pfree(snap_space);
|
||||
BufFileClose(file);
|
||||
|
||||
/* Increase the counter to tell the backend that the file is available. */
|
||||
SpinLockAcquire(&shared->mutex);
|
||||
shared->last_exported++;
|
||||
SpinLockRelease(&shared->mutex);
|
||||
ConditionVariableSignal(&shared->cv);
|
||||
}
|
||||
|
||||
/*
|
||||
* Decode logical changes from the WAL sequence and store them to a file.
|
||||
*
|
||||
* If true is returned, there is no more work for the worker.
|
||||
*/
|
||||
static bool
|
||||
decode_concurrent_changes(LogicalDecodingContext *ctx,
|
||||
DecodingWorkerShared *shared)
|
||||
{
|
||||
RepackDecodingState *dstate;
|
||||
XLogRecPtr lsn_upto;
|
||||
bool done;
|
||||
char fname[MAXPGPATH];
|
||||
|
||||
dstate = (RepackDecodingState *) ctx->output_writer_private;
|
||||
|
||||
/* Open the output file. */
|
||||
DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
|
||||
dstate->file = BufFileCreateFileSet(&shared->sfs.fs, fname);
|
||||
|
||||
SpinLockAcquire(&shared->mutex);
|
||||
lsn_upto = shared->lsn_upto;
|
||||
done = shared->done;
|
||||
SpinLockRelease(&shared->mutex);
|
||||
|
||||
while (true)
|
||||
{
|
||||
XLogRecord *record;
|
||||
XLogSegNo segno_new;
|
||||
char *errm = NULL;
|
||||
XLogRecPtr end_lsn;
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
record = XLogReadRecord(ctx->reader, &errm);
|
||||
if (record)
|
||||
{
|
||||
LogicalDecodingProcessRecord(ctx, ctx->reader);
|
||||
|
||||
/*
|
||||
* If WAL segment boundary has been crossed, inform the decoding
|
||||
* system that the catalog_xmin can advance.
|
||||
*/
|
||||
end_lsn = ctx->reader->EndRecPtr;
|
||||
XLByteToSeg(end_lsn, segno_new, wal_segment_size);
|
||||
if (segno_new != repack_current_segment)
|
||||
{
|
||||
LogicalConfirmReceivedLocation(end_lsn);
|
||||
elog(DEBUG1, "REPACK: confirmed receive location %X/%X",
|
||||
(uint32) (end_lsn >> 32), (uint32) end_lsn);
|
||||
repack_current_segment = segno_new;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ReadLocalXLogPageNoWaitPrivate *priv;
|
||||
|
||||
if (errm)
|
||||
ereport(ERROR,
|
||||
errmsg("%s", errm));
|
||||
|
||||
/*
|
||||
* In the decoding loop we do not want to get blocked when there
|
||||
* is no more WAL available, otherwise the loop would become
|
||||
* uninterruptible.
|
||||
*/
|
||||
priv = (ReadLocalXLogPageNoWaitPrivate *) ctx->reader->private_data;
|
||||
if (priv->end_of_wal)
|
||||
/* Do not miss the end of WAL condition next time. */
|
||||
priv->end_of_wal = false;
|
||||
else
|
||||
ereport(ERROR,
|
||||
errmsg("could not read WAL record"));
|
||||
}
|
||||
|
||||
/*
|
||||
* Whether we could read new record or not, keep checking if
|
||||
* 'lsn_upto' was specified.
|
||||
*/
|
||||
if (!XLogRecPtrIsValid(lsn_upto))
|
||||
{
|
||||
SpinLockAcquire(&shared->mutex);
|
||||
lsn_upto = shared->lsn_upto;
|
||||
/* 'done' should be set at the same time as 'lsn_upto' */
|
||||
done = shared->done;
|
||||
SpinLockRelease(&shared->mutex);
|
||||
}
|
||||
if (XLogRecPtrIsValid(lsn_upto) &&
|
||||
ctx->reader->EndRecPtr >= lsn_upto)
|
||||
break;
|
||||
|
||||
if (record == NULL)
|
||||
{
|
||||
int64 timeout = 0;
|
||||
WaitLSNResult res;
|
||||
|
||||
/*
|
||||
* Before we retry reading, wait until new WAL is flushed.
|
||||
*
|
||||
* There is a race condition such that the backend executing
|
||||
* REPACK determines 'lsn_upto', but before it sets the shared
|
||||
* variable, we reach the end of WAL. In that case we'd need to
|
||||
* wait until the next WAL flush (unrelated to REPACK). Although
|
||||
* that should not be a problem in a busy system, it might be
|
||||
* noticeable in other cases, including regression tests (which
|
||||
* are not necessarily executed in parallel). Therefore it makes
|
||||
* sense to use timeout.
|
||||
*
|
||||
* If lsn_upto is valid, WAL records having LSN lower than that
|
||||
* should already have been flushed to disk.
|
||||
*/
|
||||
if (!XLogRecPtrIsValid(lsn_upto))
|
||||
timeout = 100L;
|
||||
res = WaitForLSN(WAIT_LSN_TYPE_PRIMARY_FLUSH,
|
||||
ctx->reader->EndRecPtr + 1,
|
||||
timeout);
|
||||
if (res != WAIT_LSN_RESULT_SUCCESS &&
|
||||
res != WAIT_LSN_RESULT_TIMEOUT)
|
||||
ereport(ERROR,
|
||||
errmsg("waiting for WAL failed"));
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Close the file so we can make it available to the backend.
|
||||
*/
|
||||
BufFileClose(dstate->file);
|
||||
dstate->file = NULL;
|
||||
SpinLockAcquire(&shared->mutex);
|
||||
shared->lsn_upto = InvalidXLogRecPtr;
|
||||
shared->last_exported++;
|
||||
SpinLockRelease(&shared->mutex);
|
||||
ConditionVariableSignal(&shared->cv);
|
||||
|
||||
return done;
|
||||
}
|
||||
|
||||
/*
|
||||
* Does the WAL record contain a data change that this backend does not need
|
||||
* to decode on behalf of REPACK (CONCURRENTLY)?
|
||||
*/
|
||||
bool
|
||||
change_useless_for_repack(XLogRecordBuffer *buf)
|
||||
{
|
||||
XLogReaderState *r = buf->record;
|
||||
RelFileLocator locator;
|
||||
|
||||
/* TOAST locator should not be set unless the main is. */
|
||||
Assert(!OidIsValid(repacked_rel_toast_locator.relNumber) ||
|
||||
OidIsValid(repacked_rel_locator.relNumber));
|
||||
|
||||
/*
|
||||
* Backends not involved in REPACK (CONCURRENTLY) should not do the
|
||||
* filtering.
|
||||
*/
|
||||
if (!OidIsValid(repacked_rel_locator.relNumber))
|
||||
return false;
|
||||
|
||||
/*
|
||||
* If the record does not contain the block 0, it's probably not INSERT /
|
||||
* UPDATE / DELETE. In any case, we do not have enough information to
|
||||
* filter the change out.
|
||||
*/
|
||||
if (!XLogRecGetBlockTagExtended(r, 0, &locator, NULL, NULL, NULL))
|
||||
return false;
|
||||
|
||||
/*
|
||||
* Decode the change if it belongs to the table we are repacking, or if it
|
||||
* belongs to its TOAST relation.
|
||||
*/
|
||||
if (RelFileLocatorEquals(locator, repacked_rel_locator))
|
||||
return false;
|
||||
if (OidIsValid(repacked_rel_toast_locator.relNumber) &&
|
||||
RelFileLocatorEquals(locator, repacked_rel_toast_locator))
|
||||
return false;
|
||||
|
||||
/* Filter out changes of other tables. */
|
||||
return true;
|
||||
}
|
||||
|
|
@ -6058,6 +6058,7 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode,
|
|||
finish_heap_swap(tab->relid, OIDNewHeap,
|
||||
false, false, true,
|
||||
!OidIsValid(tab->newTableSpace),
|
||||
true, /* reindex */
|
||||
RecentXmin,
|
||||
ReadNextMultiXactId(),
|
||||
persistence);
|
||||
|
|
|
|||
|
|
@ -127,7 +127,7 @@ static void vac_truncate_clog(TransactionId frozenXID,
|
|||
TransactionId lastSaneFrozenXid,
|
||||
MultiXactId lastSaneMinMulti);
|
||||
static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams params,
|
||||
BufferAccessStrategy bstrategy);
|
||||
BufferAccessStrategy bstrategy, bool isTopLevel);
|
||||
static double compute_parallel_delay(void);
|
||||
static VacOptValue get_vacoptval_from_boolean(DefElem *def);
|
||||
static bool vac_tid_reaped(ItemPointer itemptr, void *state);
|
||||
|
|
@ -630,7 +630,8 @@ vacuum(List *relations, const VacuumParams *params, BufferAccessStrategy bstrate
|
|||
|
||||
if (params->options & VACOPT_VACUUM)
|
||||
{
|
||||
if (!vacuum_rel(vrel->oid, vrel->relation, *params, bstrategy))
|
||||
if (!vacuum_rel(vrel->oid, vrel->relation, *params, bstrategy,
|
||||
isTopLevel))
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
@ -2004,7 +2005,7 @@ vac_truncate_clog(TransactionId frozenXID,
|
|||
*/
|
||||
static bool
|
||||
vacuum_rel(Oid relid, RangeVar *relation, VacuumParams params,
|
||||
BufferAccessStrategy bstrategy)
|
||||
BufferAccessStrategy bstrategy, bool isTopLevel)
|
||||
{
|
||||
LOCKMODE lmode;
|
||||
Relation rel;
|
||||
|
|
@ -2295,7 +2296,7 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams params,
|
|||
|
||||
/* VACUUM FULL is a variant of REPACK; see repack.c */
|
||||
cluster_rel(REPACK_COMMAND_VACUUMFULL, rel, InvalidOid,
|
||||
&cluster_params);
|
||||
&cluster_params, isTopLevel);
|
||||
/* cluster_rel closes the relation, but keeps lock */
|
||||
|
||||
rel = NULL;
|
||||
|
|
@ -2338,7 +2339,8 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams params,
|
|||
toast_vacuum_params.options |= VACOPT_PROCESS_MAIN;
|
||||
toast_vacuum_params.toast_parent = relid;
|
||||
|
||||
vacuum_rel(toast_relid, NULL, toast_vacuum_params, bstrategy);
|
||||
vacuum_rel(toast_relid, NULL, toast_vacuum_params, bstrategy,
|
||||
isTopLevel);
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@
|
|||
#include "postgres.h"
|
||||
|
||||
#include "access/parallel.h"
|
||||
#include "commands/repack.h"
|
||||
#include "libpq/libpq.h"
|
||||
#include "libpq/pqformat.h"
|
||||
#include "libpq/pqmq.h"
|
||||
|
|
@ -177,6 +178,10 @@ mq_putmessage(char msgtype, const char *s, size_t len)
|
|||
SendProcSignal(pq_mq_parallel_leader_pid,
|
||||
PROCSIG_PARALLEL_APPLY_MESSAGE,
|
||||
pq_mq_parallel_leader_proc_number);
|
||||
else if (AmRepackWorker())
|
||||
SendProcSignal(pq_mq_parallel_leader_pid,
|
||||
PROCSIG_REPACK_MESSAGE,
|
||||
pq_mq_parallel_leader_proc_number);
|
||||
else
|
||||
{
|
||||
Assert(IsParallelWorker());
|
||||
|
|
|
|||
|
|
@ -219,5 +219,6 @@ pg_test_mod_args = pg_mod_args + {
|
|||
subdir('jit/llvm')
|
||||
subdir('replication/libpqwalreceiver')
|
||||
subdir('replication/pgoutput')
|
||||
subdir('replication/pgrepack')
|
||||
subdir('snowball')
|
||||
subdir('utils/mb/conversion_procs')
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@
|
|||
#include "postgres.h"
|
||||
|
||||
#include "access/parallel.h"
|
||||
#include "commands/repack.h"
|
||||
#include "libpq/pqsignal.h"
|
||||
#include "miscadmin.h"
|
||||
#include "pgstat.h"
|
||||
|
|
@ -146,6 +147,10 @@ static const struct
|
|||
.fn_name = "ParallelWorkerMain",
|
||||
.fn_addr = ParallelWorkerMain
|
||||
},
|
||||
{
|
||||
.fn_name = "RepackWorkerMain",
|
||||
.fn_addr = RepackWorkerMain
|
||||
},
|
||||
{
|
||||
.fn_name = "SequenceSyncWorkerMain",
|
||||
.fn_addr = SequenceSyncWorkerMain
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@
|
|||
#include "access/xlogreader.h"
|
||||
#include "access/xlogrecord.h"
|
||||
#include "catalog/pg_control.h"
|
||||
#include "commands/repack.h"
|
||||
#include "replication/decode.h"
|
||||
#include "replication/logical.h"
|
||||
#include "replication/message.h"
|
||||
|
|
@ -436,7 +437,8 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
|||
{
|
||||
case XLOG_HEAP2_MULTI_INSERT:
|
||||
if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
|
||||
!ctx->fast_forward)
|
||||
!ctx->fast_forward &&
|
||||
!change_useless_for_repack(buf))
|
||||
DecodeMultiInsert(ctx, buf);
|
||||
break;
|
||||
case XLOG_HEAP2_NEW_CID:
|
||||
|
|
@ -498,7 +500,8 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
|||
{
|
||||
case XLOG_HEAP_INSERT:
|
||||
if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
|
||||
!ctx->fast_forward)
|
||||
!ctx->fast_forward &&
|
||||
!change_useless_for_repack(buf))
|
||||
DecodeInsert(ctx, buf);
|
||||
break;
|
||||
|
||||
|
|
@ -510,19 +513,22 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
|||
case XLOG_HEAP_HOT_UPDATE:
|
||||
case XLOG_HEAP_UPDATE:
|
||||
if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
|
||||
!ctx->fast_forward)
|
||||
!ctx->fast_forward &&
|
||||
!change_useless_for_repack(buf))
|
||||
DecodeUpdate(ctx, buf);
|
||||
break;
|
||||
|
||||
case XLOG_HEAP_DELETE:
|
||||
if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
|
||||
!ctx->fast_forward)
|
||||
!ctx->fast_forward &&
|
||||
!change_useless_for_repack(buf))
|
||||
DecodeDelete(ctx, buf);
|
||||
break;
|
||||
|
||||
case XLOG_HEAP_TRUNCATE:
|
||||
if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
|
||||
!ctx->fast_forward)
|
||||
!ctx->fast_forward &&
|
||||
!change_useless_for_repack(buf))
|
||||
DecodeTruncate(ctx, buf);
|
||||
break;
|
||||
|
||||
|
|
@ -538,7 +544,8 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
|||
|
||||
case XLOG_HEAP_CONFIRM:
|
||||
if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
|
||||
!ctx->fast_forward)
|
||||
!ctx->fast_forward &&
|
||||
!change_useless_for_repack(buf))
|
||||
DecodeSpecConfirm(ctx, buf);
|
||||
break;
|
||||
|
||||
|
|
@ -1035,6 +1042,15 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
|||
|
||||
xlrec = (xl_heap_delete *) XLogRecGetData(r);
|
||||
|
||||
/*
|
||||
* Skip changes that were marked as ignorable at origin.
|
||||
*
|
||||
* (This is used for changes that affect relations not visible to other
|
||||
* transactions, such as the transient table during concurrent repack.)
|
||||
*/
|
||||
if (xlrec->flags & XLH_DELETE_NO_LOGICAL)
|
||||
return;
|
||||
|
||||
/* only interested in our database */
|
||||
XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
|
||||
if (target_locator.dbOid != ctx->slot->data.database)
|
||||
|
|
|
|||
32
src/backend/replication/pgrepack/Makefile
Normal file
32
src/backend/replication/pgrepack/Makefile
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
#-------------------------------------------------------------------------
|
||||
#
|
||||
# Makefile--
|
||||
# Makefile for src/backend/replication/pgrepack
|
||||
#
|
||||
# IDENTIFICATION
|
||||
# src/backend/replication/pgrepack
|
||||
#
|
||||
#-------------------------------------------------------------------------
|
||||
|
||||
subdir = src/backend/replication/pgrepack
|
||||
top_builddir = ../../../..
|
||||
include $(top_builddir)/src/Makefile.global
|
||||
|
||||
OBJS = \
|
||||
$(WIN32RES) \
|
||||
pgrepack.o
|
||||
PGFILEDESC = "pgrepack - logical replication output plugin for REPACK"
|
||||
NAME = pgrepack
|
||||
|
||||
all: all-shared-lib
|
||||
|
||||
include $(top_srcdir)/src/Makefile.shlib
|
||||
|
||||
install: all installdirs install-lib
|
||||
|
||||
installdirs: installdirs-lib
|
||||
|
||||
uninstall: uninstall-lib
|
||||
|
||||
clean distclean: clean-lib
|
||||
rm -f $(OBJS)
|
||||
18
src/backend/replication/pgrepack/meson.build
Normal file
18
src/backend/replication/pgrepack/meson.build
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
# Copyright (c) 2026, PostgreSQL Global Development Group
|
||||
|
||||
pgrepack_sources = files(
|
||||
'pgrepack.c',
|
||||
)
|
||||
|
||||
if host_system == 'windows'
|
||||
pgrepack_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
|
||||
'--NAME', 'pgrepack',
|
||||
'--FILEDESC', 'pgrepack - logical replication output plugin for REPACK',])
|
||||
endif
|
||||
|
||||
pgrepack = shared_module('pgrepack',
|
||||
pgrepack_sources,
|
||||
kwargs: pg_mod_args,
|
||||
)
|
||||
|
||||
backend_targets += pgrepack
|
||||
287
src/backend/replication/pgrepack/pgrepack.c
Normal file
287
src/backend/replication/pgrepack/pgrepack.c
Normal file
|
|
@ -0,0 +1,287 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* pgrepack.c
|
||||
* Logical Replication output plugin for REPACK command
|
||||
*
|
||||
* Copyright (c) 2026, PostgreSQL Global Development Group
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/backend/replication/pgrepack/pgrepack.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/detoast.h"
|
||||
#include "commands/repack_internal.h"
|
||||
#include "replication/snapbuild.h"
|
||||
#include "utils/memutils.h"
|
||||
|
||||
PG_MODULE_MAGIC;
|
||||
|
||||
static void repack_startup(LogicalDecodingContext *ctx,
|
||||
OutputPluginOptions *opt, bool is_init);
|
||||
static void repack_shutdown(LogicalDecodingContext *ctx);
|
||||
static void repack_begin_txn(LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn);
|
||||
static void repack_commit_txn(LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
|
||||
static void repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||
Relation rel, ReorderBufferChange *change);
|
||||
static void repack_store_change(LogicalDecodingContext *ctx, Relation relation,
|
||||
ConcurrentChangeKind kind, HeapTuple tuple);
|
||||
|
||||
void
|
||||
_PG_output_plugin_init(OutputPluginCallbacks *cb)
|
||||
{
|
||||
cb->startup_cb = repack_startup;
|
||||
cb->begin_cb = repack_begin_txn;
|
||||
cb->change_cb = repack_process_change;
|
||||
cb->commit_cb = repack_commit_txn;
|
||||
cb->shutdown_cb = repack_shutdown;
|
||||
}
|
||||
|
||||
|
||||
/* initialize this plugin */
|
||||
static void
|
||||
repack_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
|
||||
bool is_init)
|
||||
{
|
||||
ctx->output_plugin_private = NULL;
|
||||
|
||||
/* Probably unnecessary, as we don't use the SQL interface ... */
|
||||
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
|
||||
|
||||
if (ctx->output_plugin_options != NIL)
|
||||
{
|
||||
ereport(ERROR,
|
||||
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("this plugin does not expect any options"));
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
repack_shutdown(LogicalDecodingContext *ctx)
|
||||
{
|
||||
}
|
||||
|
||||
/*
|
||||
* As we don't release the slot during processing of particular table, there's
|
||||
* no room for SQL interface, even for debugging purposes. Therefore we need
|
||||
* neither OutputPluginPrepareWrite() nor OutputPluginWrite() in the plugin
|
||||
* callbacks. (Although we might want to write custom callbacks, this API
|
||||
* seems to be unnecessarily generic for our purposes.)
|
||||
*/
|
||||
|
||||
/* BEGIN callback */
|
||||
static void
|
||||
repack_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
|
||||
{
|
||||
}
|
||||
|
||||
/* COMMIT callback */
|
||||
static void
|
||||
repack_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||
XLogRecPtr commit_lsn)
|
||||
{
|
||||
}
|
||||
|
||||
/*
|
||||
* Callback for individual changed tuples
|
||||
*/
|
||||
static void
|
||||
repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||
Relation relation, ReorderBufferChange *change)
|
||||
{
|
||||
RepackDecodingState *private PG_USED_FOR_ASSERTS_ONLY =
|
||||
(RepackDecodingState *) ctx->output_writer_private;
|
||||
|
||||
/* Changes of other relation should not have been decoded. */
|
||||
Assert(RelationGetRelid(relation) == private->relid);
|
||||
|
||||
/* Decode entry depending on its type */
|
||||
switch (change->action)
|
||||
{
|
||||
case REORDER_BUFFER_CHANGE_INSERT:
|
||||
{
|
||||
HeapTuple newtuple;
|
||||
|
||||
newtuple = change->data.tp.newtuple;
|
||||
|
||||
/*
|
||||
* Identity checks in the main function should have made this
|
||||
* impossible.
|
||||
*/
|
||||
if (newtuple == NULL)
|
||||
elog(ERROR, "incomplete insert info");
|
||||
|
||||
repack_store_change(ctx, relation, CHANGE_INSERT, newtuple);
|
||||
}
|
||||
break;
|
||||
case REORDER_BUFFER_CHANGE_UPDATE:
|
||||
{
|
||||
HeapTuple oldtuple,
|
||||
newtuple;
|
||||
|
||||
oldtuple = change->data.tp.oldtuple;
|
||||
newtuple = change->data.tp.newtuple;
|
||||
|
||||
if (newtuple == NULL)
|
||||
elog(ERROR, "incomplete update info");
|
||||
|
||||
if (oldtuple != NULL)
|
||||
repack_store_change(ctx, relation, CHANGE_UPDATE_OLD, oldtuple);
|
||||
|
||||
repack_store_change(ctx, relation, CHANGE_UPDATE_NEW, newtuple);
|
||||
}
|
||||
break;
|
||||
case REORDER_BUFFER_CHANGE_DELETE:
|
||||
{
|
||||
HeapTuple oldtuple;
|
||||
|
||||
oldtuple = change->data.tp.oldtuple;
|
||||
|
||||
if (oldtuple == NULL)
|
||||
elog(ERROR, "incomplete delete info");
|
||||
|
||||
repack_store_change(ctx, relation, CHANGE_DELETE, oldtuple);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
|
||||
/*
|
||||
* Should not come here. This includes TRUNCATE of the table being
|
||||
* processed. heap_decode() cannot check the file locator easily,
|
||||
* but we assume that TRUNCATE uses AccessExclusiveLock on the
|
||||
* table so it should not occur during REPACK (CONCURRENTLY).
|
||||
*/
|
||||
Assert(false);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Write the given tuple, with the given change kind, to the repack spill
|
||||
* file. Later, the repack decoding worker can read these and replay
|
||||
* the operations on the new copy of the table.
|
||||
*
|
||||
* For each change affecting the table being repacked, we store enough
|
||||
* information about each tuple in it, so that it can be replayed in the
|
||||
* new copy of the table.
|
||||
*/
|
||||
static void
|
||||
repack_store_change(LogicalDecodingContext *ctx, Relation relation,
|
||||
ConcurrentChangeKind kind, HeapTuple tuple)
|
||||
{
|
||||
RepackDecodingState *dstate;
|
||||
MemoryContext oldcxt;
|
||||
BufFile *file;
|
||||
List *attrs_ext = NIL;
|
||||
int natt_ext;
|
||||
|
||||
dstate = (RepackDecodingState *) ctx->output_writer_private;
|
||||
file = dstate->file;
|
||||
|
||||
/* Store the change kind. */
|
||||
BufFileWrite(file, &kind, 1);
|
||||
|
||||
/* Use a frequently-reset context to avoid dealing with leaks manually */
|
||||
oldcxt = MemoryContextSwitchTo(dstate->change_cxt);
|
||||
|
||||
/*
|
||||
* If the tuple contains "external indirect" attributes, we need to write
|
||||
* the contents to the file because we have no control over that memory.
|
||||
*/
|
||||
if (HeapTupleHasExternal(tuple))
|
||||
{
|
||||
TupleDesc desc = RelationGetDescr(relation);
|
||||
TupleTableSlot *slot;
|
||||
|
||||
/* Initialize the slot, if not done already */
|
||||
if (dstate->slot == NULL)
|
||||
{
|
||||
ResourceOwner saveResourceOwner;
|
||||
|
||||
MemoryContextSwitchTo(dstate->worker_cxt);
|
||||
saveResourceOwner = CurrentResourceOwner;
|
||||
CurrentResourceOwner = dstate->worker_resowner;
|
||||
dstate->slot = MakeSingleTupleTableSlot(desc, &TTSOpsHeapTuple);
|
||||
MemoryContextSwitchTo(dstate->change_cxt);
|
||||
CurrentResourceOwner = saveResourceOwner;
|
||||
}
|
||||
|
||||
slot = dstate->slot;
|
||||
ExecStoreHeapTuple(tuple, slot, false);
|
||||
|
||||
/*
|
||||
* Loop over all attributes, and find out which ones we need to spill
|
||||
* separately, to wit: each one that's a non-null varlena and stored
|
||||
* out of line.
|
||||
*/
|
||||
for (int i = 0; i < desc->natts; i++)
|
||||
{
|
||||
CompactAttribute *attr = TupleDescCompactAttr(desc, i);
|
||||
varlena *varlen;
|
||||
|
||||
if (attr->attisdropped || attr->attlen != -1 ||
|
||||
slot_attisnull(slot, i + 1))
|
||||
continue;
|
||||
|
||||
slot_getsomeattrs(slot, i + 1);
|
||||
|
||||
/*
|
||||
* This is a non-null varlena datum, but we only care if it's
|
||||
* out-of-line
|
||||
*/
|
||||
varlen = (varlena *) DatumGetPointer(slot->tts_values[i]);
|
||||
if (!VARATT_IS_EXTERNAL(varlen))
|
||||
continue;
|
||||
|
||||
/*
|
||||
* We spill any indirect-external attributes separately from the
|
||||
* heap tuple. Anything else is written as is.
|
||||
*/
|
||||
if (VARATT_IS_EXTERNAL_INDIRECT(varlen))
|
||||
attrs_ext = lappend(attrs_ext, varlen);
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Logical decoding should not produce "external expanded"
|
||||
* attributes (those actually should never appear on disk), so
|
||||
* only TOASTed attribute can be seen here.
|
||||
*
|
||||
* We get here if the table has external values but only
|
||||
* in-line values are being updated now.
|
||||
*/
|
||||
Assert(VARATT_IS_EXTERNAL_ONDISK(varlen));
|
||||
}
|
||||
}
|
||||
|
||||
ExecClearTuple(slot);
|
||||
}
|
||||
|
||||
/*
|
||||
* First, write the original heap tuple, prefixed by its length. Note
|
||||
* that the external-toast tag for each toasted attribute will be present
|
||||
* in what we write, so that we know where to restore each one later.
|
||||
*/
|
||||
BufFileWrite(file, &tuple->t_len, sizeof(tuple->t_len));
|
||||
BufFileWrite(file, tuple->t_data, tuple->t_len);
|
||||
|
||||
/* Then, write the number of external attributes we found. */
|
||||
natt_ext = list_length(attrs_ext);
|
||||
BufFileWrite(file, &natt_ext, sizeof(natt_ext));
|
||||
|
||||
/* Finally, the attributes themselves, if any */
|
||||
foreach_ptr(varlena, attr_val, attrs_ext)
|
||||
{
|
||||
attr_val = detoast_external_attr(attr_val);
|
||||
BufFileWrite(file, attr_val, VARSIZE_ANY(attr_val));
|
||||
/* These attributes could be large, so free them right away */
|
||||
pfree(attr_val);
|
||||
}
|
||||
|
||||
/* Cleanup. */
|
||||
MemoryContextSwitchTo(oldcxt);
|
||||
MemoryContextReset(dstate->change_cxt);
|
||||
}
|
||||
|
|
@ -707,6 +707,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
|
|||
if (CheckProcSignal(PROCSIG_PARALLEL_APPLY_MESSAGE))
|
||||
HandleParallelApplyMessageInterrupt();
|
||||
|
||||
if (CheckProcSignal(PROCSIG_REPACK_MESSAGE))
|
||||
HandleRepackMessageInterrupt();
|
||||
|
||||
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT))
|
||||
HandleRecoveryConflictInterrupt();
|
||||
|
||||
|
|
|
|||
|
|
@ -3578,6 +3578,9 @@ ProcessInterrupts(void)
|
|||
|
||||
if (ParallelApplyMessagePending)
|
||||
ProcessParallelApplyMessages();
|
||||
|
||||
if (RepackMessagePending)
|
||||
ProcessRepackMessages();
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
|||
|
|
@ -156,6 +156,7 @@ RECOVERY_CONFLICT_SNAPSHOT "Waiting for recovery conflict resolution for a vacuu
|
|||
RECOVERY_CONFLICT_TABLESPACE "Waiting for recovery conflict resolution for dropping a tablespace."
|
||||
RECOVERY_END_COMMAND "Waiting for <xref linkend="guc-recovery-end-command"/> to complete."
|
||||
RECOVERY_PAUSE "Waiting for recovery to be resumed."
|
||||
REPACK_WORKER_EXPORT "Waiting for decoding worker to export a new output file."
|
||||
REPLICATION_ORIGIN_DROP "Waiting for a replication origin to become inactive so it can be dropped."
|
||||
REPLICATION_SLOT_DROP "Waiting for a replication slot to become inactive so it can be dropped."
|
||||
RESTORE_COMMAND "Waiting for <xref linkend="guc-restore-command"/> to complete."
|
||||
|
|
|
|||
|
|
@ -5236,8 +5236,8 @@ match_previous_words(int pattern_id,
|
|||
* one word, so the above test is correct.
|
||||
*/
|
||||
if (ends_with(prev_wd, '(') || ends_with(prev_wd, ','))
|
||||
COMPLETE_WITH("ANALYZE", "VERBOSE");
|
||||
else if (TailMatches("ANALYZE", "VERBOSE"))
|
||||
COMPLETE_WITH("ANALYZE", "CONCURRENTLY", "VERBOSE");
|
||||
else if (TailMatches("ANALYZE", "CONCURRENTLY", "VERBOSE"))
|
||||
COMPLETE_WITH("ON", "OFF");
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -104,6 +104,8 @@
|
|||
#define XLH_DELETE_CONTAINS_OLD_KEY (1<<2)
|
||||
#define XLH_DELETE_IS_SUPER (1<<3)
|
||||
#define XLH_DELETE_IS_PARTITION_MOVE (1<<4)
|
||||
/* See heap_delete() */
|
||||
#define XLH_DELETE_NO_LOGICAL (1<<5)
|
||||
|
||||
/* convenience macro for checking whether any form of old tuple was logged */
|
||||
#define XLH_DELETE_CONTAINS_OLD \
|
||||
|
|
|
|||
|
|
@ -284,9 +284,10 @@ typedef struct TM_IndexDeleteOp
|
|||
|
||||
/* "options" flag bits for table_tuple_delete */
|
||||
#define TABLE_DELETE_CHANGING_PARTITION (1 << 0)
|
||||
#define TABLE_DELETE_NO_LOGICAL (1 << 1)
|
||||
|
||||
/* "options" flag bits for table_tuple_update */
|
||||
/* XXX none at present */
|
||||
#define TABLE_UPDATE_NO_LOGICAL (1 << 0)
|
||||
|
||||
/* flag bits for table_tuple_lock */
|
||||
/* Follow tuples whose update is in progress if lock modes don't conflict */
|
||||
|
|
@ -662,6 +663,7 @@ typedef struct TableAmRoutine
|
|||
Relation OldIndex,
|
||||
bool use_sort,
|
||||
TransactionId OldestXmin,
|
||||
Snapshot snapshot,
|
||||
TransactionId *xid_cutoff,
|
||||
MultiXactId *multi_cutoff,
|
||||
double *num_tuples,
|
||||
|
|
@ -1563,7 +1565,12 @@ table_tuple_delete(Relation rel, ItemPointer tid, CommandId cid,
|
|||
* cmax/cmin if successful)
|
||||
* options - bitmask of options. No values are currently recognized.
|
||||
* crosscheck - if not InvalidSnapshot, also check old tuple against this
|
||||
* wait - true if should wait for any conflicting update to commit/abort
|
||||
* options - These allow the caller to specify options that may change the
|
||||
* behavior of the AM. The AM will ignore options that it does not support.
|
||||
* TABLE_UPDATE_WAIT -- set if should wait for any conflicting update to
|
||||
* commit/abort
|
||||
* TABLE_UPDATE_NO_LOGICAL -- force-disables the emitting of logical
|
||||
* decoding information for the tuple.
|
||||
*
|
||||
* Output parameters:
|
||||
* slot - newly constructed tuple data to store
|
||||
|
|
@ -1725,6 +1732,8 @@ table_relation_copy_data(Relation rel, const RelFileLocator *newrlocator)
|
|||
* not needed for the relation's AM
|
||||
* - *xid_cutoff - ditto
|
||||
* - *multi_cutoff - ditto
|
||||
* - snapshot - if != NULL, ignore data changes done by transactions that this
|
||||
* (MVCC) snapshot considers still in-progress or in the future.
|
||||
*
|
||||
* Output parameters:
|
||||
* - *xid_cutoff - rel's new relfrozenxid value, may be invalid
|
||||
|
|
@ -1737,6 +1746,7 @@ table_relation_copy_for_cluster(Relation OldTable, Relation NewTable,
|
|||
Relation OldIndex,
|
||||
bool use_sort,
|
||||
TransactionId OldestXmin,
|
||||
Snapshot snapshot,
|
||||
TransactionId *xid_cutoff,
|
||||
MultiXactId *multi_cutoff,
|
||||
double *num_tuples,
|
||||
|
|
@ -1745,6 +1755,7 @@ table_relation_copy_for_cluster(Relation OldTable, Relation NewTable,
|
|||
{
|
||||
OldTable->rd_tableam->relation_copy_for_cluster(OldTable, NewTable, OldIndex,
|
||||
use_sort, OldestXmin,
|
||||
snapshot,
|
||||
xid_cutoff, multi_cutoff,
|
||||
num_tuples, tups_vacuumed,
|
||||
tups_recently_dead);
|
||||
|
|
|
|||
|
|
@ -86,10 +86,12 @@
|
|||
#define PROGRESS_REPACK_PHASE 1
|
||||
#define PROGRESS_REPACK_INDEX_RELID 2
|
||||
#define PROGRESS_REPACK_HEAP_TUPLES_SCANNED 3
|
||||
#define PROGRESS_REPACK_HEAP_TUPLES_WRITTEN 4
|
||||
#define PROGRESS_REPACK_TOTAL_HEAP_BLKS 5
|
||||
#define PROGRESS_REPACK_HEAP_BLKS_SCANNED 6
|
||||
#define PROGRESS_REPACK_INDEX_REBUILD_COUNT 7
|
||||
#define PROGRESS_REPACK_HEAP_TUPLES_INSERTED 4
|
||||
#define PROGRESS_REPACK_HEAP_TUPLES_UPDATED 5
|
||||
#define PROGRESS_REPACK_HEAP_TUPLES_DELETED 6
|
||||
#define PROGRESS_REPACK_TOTAL_HEAP_BLKS 7
|
||||
#define PROGRESS_REPACK_HEAP_BLKS_SCANNED 8
|
||||
#define PROGRESS_REPACK_INDEX_REBUILD_COUNT 9
|
||||
|
||||
/*
|
||||
* Phases of repack (as advertised via PROGRESS_REPACK_PHASE).
|
||||
|
|
@ -98,9 +100,10 @@
|
|||
#define PROGRESS_REPACK_PHASE_INDEX_SCAN_HEAP 2
|
||||
#define PROGRESS_REPACK_PHASE_SORT_TUPLES 3
|
||||
#define PROGRESS_REPACK_PHASE_WRITE_NEW_HEAP 4
|
||||
#define PROGRESS_REPACK_PHASE_SWAP_REL_FILES 5
|
||||
#define PROGRESS_REPACK_PHASE_REBUILD_INDEX 6
|
||||
#define PROGRESS_REPACK_PHASE_FINAL_CLEANUP 7
|
||||
#define PROGRESS_REPACK_PHASE_CATCH_UP 5
|
||||
#define PROGRESS_REPACK_PHASE_SWAP_REL_FILES 6
|
||||
#define PROGRESS_REPACK_PHASE_REBUILD_INDEX 7
|
||||
#define PROGRESS_REPACK_PHASE_FINAL_CLEANUP 8
|
||||
|
||||
/* Progress parameters for CREATE INDEX */
|
||||
/* 3, 4 and 5 reserved for "waitfor" metrics */
|
||||
|
|
|
|||
|
|
@ -13,6 +13,8 @@
|
|||
#ifndef REPACK_H
|
||||
#define REPACK_H
|
||||
|
||||
#include <signal.h>
|
||||
|
||||
#include "nodes/parsenodes.h"
|
||||
#include "parser/parse_node.h"
|
||||
#include "storage/lockdefs.h"
|
||||
|
|
@ -25,6 +27,7 @@
|
|||
#define CLUOPT_RECHECK_ISCLUSTERED 0x04 /* recheck relation state for
|
||||
* indisclustered */
|
||||
#define CLUOPT_ANALYZE 0x08 /* do an ANALYZE */
|
||||
#define CLUOPT_CONCURRENT 0x10 /* allow concurrent data changes */
|
||||
|
||||
/* options for CLUSTER */
|
||||
typedef struct ClusterParams
|
||||
|
|
@ -32,11 +35,13 @@ typedef struct ClusterParams
|
|||
uint32 options; /* bitmask of CLUOPT_* */
|
||||
} ClusterParams;
|
||||
|
||||
extern PGDLLIMPORT volatile sig_atomic_t RepackMessagePending;
|
||||
|
||||
|
||||
extern void ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel);
|
||||
|
||||
extern void cluster_rel(RepackCommand command, Relation OldHeap, Oid indexOid,
|
||||
ClusterParams *params);
|
||||
ClusterParams *params, bool isTopLevel);
|
||||
extern void check_index_is_clusterable(Relation OldHeap, Oid indexOid,
|
||||
LOCKMODE lockmode);
|
||||
extern void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal);
|
||||
|
|
@ -48,8 +53,16 @@ extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
|
|||
bool swap_toast_by_content,
|
||||
bool check_constraints,
|
||||
bool is_internal,
|
||||
bool reindex,
|
||||
TransactionId frozenXid,
|
||||
MultiXactId cutoffMulti,
|
||||
char newrelpersistence);
|
||||
|
||||
extern void HandleRepackMessageInterrupt(void);
|
||||
extern void ProcessRepackMessages(void);
|
||||
|
||||
/* in repack_worker.c */
|
||||
extern void RepackWorkerMain(Datum main_arg);
|
||||
extern bool AmRepackWorker(void);
|
||||
|
||||
#endif /* REPACK_H */
|
||||
|
|
|
|||
125
src/include/commands/repack_internal.h
Normal file
125
src/include/commands/repack_internal.h
Normal file
|
|
@ -0,0 +1,125 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* repack_internal.h
|
||||
* header for REPACK internals
|
||||
*
|
||||
* Copyright (c) 2026, PostgreSQL Global Development Group
|
||||
*
|
||||
* src/include/commands/repack_internal.h
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef REPACK_INTERNAL_H
|
||||
#define REPACK_INTERNAL_H
|
||||
|
||||
#include "nodes/execnodes.h"
|
||||
#include "replication/decode.h"
|
||||
#include "postmaster/bgworker.h"
|
||||
#include "replication/logical.h"
|
||||
#include "storage/buffile.h"
|
||||
#include "storage/sharedfileset.h"
|
||||
#include "storage/shm_mq.h"
|
||||
#include "utils/resowner.h"
|
||||
|
||||
/*
|
||||
* The type of a change stored in the output files.
|
||||
*/
|
||||
typedef char ConcurrentChangeKind;
|
||||
|
||||
#define CHANGE_INSERT 'i'
|
||||
#define CHANGE_UPDATE_OLD 'u'
|
||||
#define CHANGE_UPDATE_NEW 'U'
|
||||
#define CHANGE_DELETE 'd'
|
||||
|
||||
/*
|
||||
* Logical decoding state.
|
||||
*
|
||||
* The output plugin uses it to store the data changes that it decodes from
|
||||
* WAL while the table contents is being copied to a new storage.
|
||||
*/
|
||||
typedef struct RepackDecodingState
|
||||
{
|
||||
#ifdef USE_ASSERT_CHECKING
|
||||
/* The relation whose changes we're decoding. */
|
||||
Oid relid;
|
||||
#endif
|
||||
|
||||
/* Per-change memory context. */
|
||||
MemoryContext change_cxt;
|
||||
|
||||
/* A tuple slot used to pass tuples back and forth */
|
||||
TupleTableSlot *slot;
|
||||
|
||||
/*
|
||||
* Memory context and resource owner of the decoding worker's transaction.
|
||||
*/
|
||||
MemoryContext worker_cxt;
|
||||
ResourceOwner worker_resowner;
|
||||
|
||||
/* The current output file. */
|
||||
BufFile *file;
|
||||
} RepackDecodingState;
|
||||
|
||||
/*
|
||||
* Shared memory used for communication between the backend running REPACK and
|
||||
* the worker that performs logical decoding of data changes.
|
||||
*/
|
||||
typedef struct DecodingWorkerShared
|
||||
{
|
||||
/* Is the decoding initialized? */
|
||||
bool initialized;
|
||||
|
||||
/*
|
||||
* Once the worker has reached this LSN, it should close the current
|
||||
* output file and either create a new one or exit, according to the field
|
||||
* 'done'. If the value is InvalidXLogRecPtr, the worker should decode all
|
||||
* the WAL available and keep checking this field. It is ok if the worker
|
||||
* had already decoded records whose LSN is >= lsn_upto before this field
|
||||
* has been set.
|
||||
*/
|
||||
XLogRecPtr lsn_upto;
|
||||
|
||||
/* Exit after closing the current file? */
|
||||
bool done;
|
||||
|
||||
/* The output is stored here. */
|
||||
SharedFileSet sfs;
|
||||
|
||||
/* Number of the last file exported by the worker. */
|
||||
int last_exported;
|
||||
|
||||
/* Synchronize access to the fields above. */
|
||||
slock_t mutex;
|
||||
|
||||
/* Database to connect to. */
|
||||
Oid dbid;
|
||||
|
||||
/* Role to connect as. */
|
||||
Oid roleid;
|
||||
|
||||
/* Relation from which data changes to decode. */
|
||||
Oid relid;
|
||||
|
||||
/* CV the backend waits on */
|
||||
ConditionVariable cv;
|
||||
|
||||
/* Info to signal the backend. */
|
||||
PGPROC *backend_proc;
|
||||
pid_t backend_pid;
|
||||
ProcNumber backend_proc_number;
|
||||
dsm_segment *dsm_seg;
|
||||
|
||||
/*
|
||||
* Memory the queue is located in.
|
||||
*
|
||||
* For considerations on the value see the comments of
|
||||
* PARALLEL_ERROR_QUEUE_SIZE.
|
||||
*/
|
||||
#define REPACK_ERROR_QUEUE_SIZE 16384
|
||||
char error_queue[FLEXIBLE_ARRAY_MEMBER];
|
||||
} DecodingWorkerShared;
|
||||
|
||||
extern void DecodingWorkerFileName(char *fname, Oid relid, uint32 seq);
|
||||
|
||||
|
||||
#endif /* REPACK_INTERNAL_H */
|
||||
|
|
@ -32,4 +32,8 @@ extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf
|
|||
extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
|
||||
XLogReaderState *record);
|
||||
|
||||
/* in commands/repack_worker.c */
|
||||
extern bool change_useless_for_repack(XLogRecordBuffer *buf);
|
||||
|
||||
|
||||
#endif
|
||||
|
|
|
|||
|
|
@ -36,8 +36,8 @@ typedef int LOCKMODE;
|
|||
#define AccessShareLock 1 /* SELECT */
|
||||
#define RowShareLock 2 /* SELECT FOR UPDATE/FOR SHARE */
|
||||
#define RowExclusiveLock 3 /* INSERT, UPDATE, DELETE */
|
||||
#define ShareUpdateExclusiveLock 4 /* VACUUM (non-FULL), ANALYZE, CREATE
|
||||
* INDEX CONCURRENTLY */
|
||||
#define ShareUpdateExclusiveLock 4 /* VACUUM (non-exclusive), ANALYZE, CREATE
|
||||
* INDEX CONCURRENTLY, REPACK CONCURRENTLY */
|
||||
#define ShareLock 5 /* CREATE INDEX (WITHOUT CONCURRENTLY) */
|
||||
#define ShareRowExclusiveLock 6 /* like EXCLUSIVE MODE, but allows ROW
|
||||
* SHARE */
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ typedef enum
|
|||
PROCSIG_BARRIER, /* global barrier interrupt */
|
||||
PROCSIG_LOG_MEMORY_CONTEXT, /* ask backend to log the memory contexts */
|
||||
PROCSIG_PARALLEL_APPLY_MESSAGE, /* Message from parallel apply workers */
|
||||
PROCSIG_REPACK_MESSAGE, /* Message from repack worker */
|
||||
PROCSIG_RECOVERY_CONFLICT, /* backend is blocking recovery, check
|
||||
* PGPROC->pendingRecoveryConflicts for the
|
||||
* reason */
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ ifneq (,$(findstring backend,$(subdir)))
|
|||
ifeq (,$(findstring conversion_procs,$(subdir)))
|
||||
ifeq (,$(findstring libpqwalreceiver,$(subdir)))
|
||||
ifeq (,$(findstring replication/pgoutput,$(subdir)))
|
||||
ifeq (,$(findstring replication/pgrepack,$(subdir)))
|
||||
ifeq (,$(findstring snowball,$(subdir)))
|
||||
override CPPFLAGS+= -DBUILDING_DLL
|
||||
endif
|
||||
|
|
@ -23,6 +24,7 @@ endif
|
|||
endif
|
||||
endif
|
||||
endif
|
||||
endif
|
||||
|
||||
ifneq (,$(findstring src/common,$(subdir)))
|
||||
override CPPFLAGS+= -DBUILDING_DLL
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ ifneq (,$(findstring backend,$(subdir)))
|
|||
ifeq (,$(findstring conversion_procs,$(subdir)))
|
||||
ifeq (,$(findstring libpqwalreceiver,$(subdir)))
|
||||
ifeq (,$(findstring replication/pgoutput,$(subdir)))
|
||||
ifeq (,$(findstring replication/pgrepack,$(subdir)))
|
||||
ifeq (,$(findstring snowball,$(subdir)))
|
||||
override CPPFLAGS+= -DBUILDING_DLL
|
||||
endif
|
||||
|
|
@ -21,6 +22,7 @@ endif
|
|||
endif
|
||||
endif
|
||||
endif
|
||||
endif
|
||||
|
||||
ifneq (,$(findstring src/common,$(subdir)))
|
||||
override CPPFLAGS+= -DBUILDING_DLL
|
||||
|
|
|
|||
|
|
@ -14,6 +14,8 @@ REGRESS_OPTS = --dlpath=$(top_builddir)/src/test/regress
|
|||
|
||||
ISOLATION = basic \
|
||||
inplace \
|
||||
repack \
|
||||
repack_toast \
|
||||
syscache-update-pruned \
|
||||
heap_lock_update
|
||||
|
||||
|
|
|
|||
113
src/test/modules/injection_points/expected/repack.out
Normal file
113
src/test/modules/injection_points/expected/repack.out
Normal file
|
|
@ -0,0 +1,113 @@
|
|||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: wait_before_lock change_existing change_new change_subxact1 change_subxact2 check2 wakeup_before_lock check1
|
||||
injection_points_attach
|
||||
-----------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step wait_before_lock:
|
||||
REPACK (CONCURRENTLY) repack_test USING INDEX repack_test_pkey;
|
||||
<waiting ...>
|
||||
step change_existing:
|
||||
UPDATE repack_test SET i=10 where i=1;
|
||||
UPDATE repack_test SET j=20 where i=2;
|
||||
UPDATE repack_test SET i=30 where i=3;
|
||||
UPDATE repack_test SET i=40 where i=30;
|
||||
DELETE FROM repack_test WHERE i=4;
|
||||
|
||||
step change_new:
|
||||
INSERT INTO repack_test(i, j) VALUES (5, 5), (6, 6), (7, 7), (8, 8);
|
||||
UPDATE repack_test SET i=50 where i=5;
|
||||
UPDATE repack_test SET j=60 where i=6;
|
||||
DELETE FROM repack_test WHERE i=7;
|
||||
|
||||
step change_subxact1:
|
||||
BEGIN;
|
||||
INSERT INTO repack_test(i, j) VALUES (100, 100);
|
||||
SAVEPOINT s1;
|
||||
UPDATE repack_test SET i=101 where i=100;
|
||||
SAVEPOINT s2;
|
||||
UPDATE repack_test SET i=102 where i=101;
|
||||
COMMIT;
|
||||
|
||||
step change_subxact2:
|
||||
BEGIN;
|
||||
SAVEPOINT s1;
|
||||
INSERT INTO repack_test(i, j) VALUES (110, 110);
|
||||
ROLLBACK TO SAVEPOINT s1;
|
||||
INSERT INTO repack_test(i, j) VALUES (110, 111);
|
||||
COMMIT;
|
||||
|
||||
step check2:
|
||||
INSERT INTO relfilenodes(node)
|
||||
SELECT relfilenode FROM pg_class WHERE relname='repack_test';
|
||||
|
||||
SELECT i, j FROM repack_test ORDER BY i, j;
|
||||
|
||||
INSERT INTO data_s2(i, j)
|
||||
SELECT i, j FROM repack_test;
|
||||
|
||||
i| j
|
||||
---+---
|
||||
2| 20
|
||||
6| 60
|
||||
8| 8
|
||||
10| 1
|
||||
40| 3
|
||||
50| 5
|
||||
102|100
|
||||
110|111
|
||||
(8 rows)
|
||||
|
||||
step wakeup_before_lock:
|
||||
SELECT injection_points_wakeup('repack-concurrently-before-lock');
|
||||
|
||||
injection_points_wakeup
|
||||
-----------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step wait_before_lock: <... completed>
|
||||
step check1:
|
||||
INSERT INTO relfilenodes(node)
|
||||
SELECT relfilenode FROM pg_class WHERE relname='repack_test';
|
||||
|
||||
SELECT count(DISTINCT node) FROM relfilenodes;
|
||||
|
||||
SELECT i, j FROM repack_test ORDER BY i, j;
|
||||
|
||||
INSERT INTO data_s1(i, j)
|
||||
SELECT i, j FROM repack_test;
|
||||
|
||||
SELECT count(*)
|
||||
FROM data_s1 d1 FULL JOIN data_s2 d2 USING (i, j)
|
||||
WHERE d1.i ISNULL OR d2.i ISNULL;
|
||||
|
||||
count
|
||||
-----
|
||||
2
|
||||
(1 row)
|
||||
|
||||
i| j
|
||||
---+---
|
||||
2| 20
|
||||
6| 60
|
||||
8| 8
|
||||
10| 1
|
||||
40| 3
|
||||
50| 5
|
||||
102|100
|
||||
110|111
|
||||
(8 rows)
|
||||
|
||||
count
|
||||
-----
|
||||
0
|
||||
(1 row)
|
||||
|
||||
injection_points_detach
|
||||
-----------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
65
src/test/modules/injection_points/expected/repack_toast.out
Normal file
65
src/test/modules/injection_points/expected/repack_toast.out
Normal file
|
|
@ -0,0 +1,65 @@
|
|||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: wait_before_lock change check2 wakeup_before_lock check1
|
||||
injection_points_attach
|
||||
-----------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step wait_before_lock:
|
||||
REPACK (CONCURRENTLY) repack_test;
|
||||
<waiting ...>
|
||||
step change:
|
||||
UPDATE repack_test SET j=get_long_string() where i=2;
|
||||
DELETE FROM repack_test WHERE i=3;
|
||||
INSERT INTO repack_test(i, j) VALUES (4, get_long_string());
|
||||
UPDATE repack_test SET i=3 where i=1;
|
||||
|
||||
step check2:
|
||||
INSERT INTO relfilenodes(node)
|
||||
SELECT c2.relfilenode
|
||||
FROM pg_class c1 JOIN pg_class c2 ON c2.oid = c1.oid OR c2.oid = c1.reltoastrelid
|
||||
WHERE c1.relname='repack_test';
|
||||
|
||||
INSERT INTO data_s2(i, j)
|
||||
SELECT i, j FROM repack_test;
|
||||
|
||||
step wakeup_before_lock:
|
||||
SELECT injection_points_wakeup('repack-concurrently-before-lock');
|
||||
|
||||
injection_points_wakeup
|
||||
-----------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step wait_before_lock: <... completed>
|
||||
step check1:
|
||||
INSERT INTO relfilenodes(node)
|
||||
SELECT c2.relfilenode
|
||||
FROM pg_class c1 JOIN pg_class c2 ON c2.oid = c1.oid OR c2.oid = c1.reltoastrelid
|
||||
WHERE c1.relname='repack_test';
|
||||
|
||||
SELECT count(DISTINCT node) FROM relfilenodes;
|
||||
|
||||
INSERT INTO data_s1(i, j)
|
||||
SELECT i, j FROM repack_test;
|
||||
|
||||
SELECT count(*)
|
||||
FROM data_s1 d1 FULL JOIN data_s2 d2 USING (i, j)
|
||||
WHERE d1.i ISNULL OR d2.i ISNULL;
|
||||
|
||||
count
|
||||
-----
|
||||
4
|
||||
(1 row)
|
||||
|
||||
count
|
||||
-----
|
||||
0
|
||||
(1 row)
|
||||
|
||||
injection_points_detach
|
||||
-----------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
|
|
@ -45,6 +45,8 @@ tests += {
|
|||
'specs': [
|
||||
'basic',
|
||||
'inplace',
|
||||
'repack',
|
||||
'repack_toast',
|
||||
'syscache-update-pruned',
|
||||
'heap_lock_update',
|
||||
],
|
||||
|
|
|
|||
142
src/test/modules/injection_points/specs/repack.spec
Normal file
142
src/test/modules/injection_points/specs/repack.spec
Normal file
|
|
@ -0,0 +1,142 @@
|
|||
# REPACK (CONCURRENTLY) ... USING INDEX ...;
|
||||
setup
|
||||
{
|
||||
CREATE EXTENSION injection_points;
|
||||
|
||||
CREATE TABLE repack_test(i int PRIMARY KEY, j int);
|
||||
INSERT INTO repack_test(i, j) VALUES (1, 1), (2, 2), (3, 3), (4, 4);
|
||||
|
||||
CREATE TABLE relfilenodes(node oid);
|
||||
|
||||
CREATE TABLE data_s1(i int, j int);
|
||||
CREATE TABLE data_s2(i int, j int);
|
||||
}
|
||||
|
||||
teardown
|
||||
{
|
||||
DROP TABLE repack_test;
|
||||
DROP EXTENSION injection_points;
|
||||
|
||||
DROP TABLE relfilenodes;
|
||||
DROP TABLE data_s1;
|
||||
DROP TABLE data_s2;
|
||||
}
|
||||
|
||||
session s1
|
||||
setup
|
||||
{
|
||||
SELECT injection_points_set_local();
|
||||
SELECT injection_points_attach('repack-concurrently-before-lock', 'wait');
|
||||
}
|
||||
# Perform the initial load and wait for s2 to do some data changes.
|
||||
step wait_before_lock
|
||||
{
|
||||
REPACK (CONCURRENTLY) repack_test USING INDEX repack_test_pkey;
|
||||
}
|
||||
# Check the table from the perspective of s1.
|
||||
#
|
||||
# Besides the contents, we also check that relfilenode has changed.
|
||||
|
||||
# Have each session write the contents into a table and use FULL JOIN to check
|
||||
# if the outputs are identical.
|
||||
step check1
|
||||
{
|
||||
INSERT INTO relfilenodes(node)
|
||||
SELECT relfilenode FROM pg_class WHERE relname='repack_test';
|
||||
|
||||
SELECT count(DISTINCT node) FROM relfilenodes;
|
||||
|
||||
SELECT i, j FROM repack_test ORDER BY i, j;
|
||||
|
||||
INSERT INTO data_s1(i, j)
|
||||
SELECT i, j FROM repack_test;
|
||||
|
||||
SELECT count(*)
|
||||
FROM data_s1 d1 FULL JOIN data_s2 d2 USING (i, j)
|
||||
WHERE d1.i ISNULL OR d2.i ISNULL;
|
||||
}
|
||||
teardown
|
||||
{
|
||||
SELECT injection_points_detach('repack-concurrently-before-lock');
|
||||
}
|
||||
|
||||
session s2
|
||||
# Change the existing data. UPDATE changes both key and non-key columns. Also
|
||||
# update one row twice to test whether tuple version generated by this session
|
||||
# can be found.
|
||||
step change_existing
|
||||
{
|
||||
UPDATE repack_test SET i=10 where i=1;
|
||||
UPDATE repack_test SET j=20 where i=2;
|
||||
UPDATE repack_test SET i=30 where i=3;
|
||||
UPDATE repack_test SET i=40 where i=30;
|
||||
DELETE FROM repack_test WHERE i=4;
|
||||
}
|
||||
# Insert new rows and UPDATE / DELETE some of them. Again, update both key and
|
||||
# non-key column.
|
||||
step change_new
|
||||
{
|
||||
INSERT INTO repack_test(i, j) VALUES (5, 5), (6, 6), (7, 7), (8, 8);
|
||||
UPDATE repack_test SET i=50 where i=5;
|
||||
UPDATE repack_test SET j=60 where i=6;
|
||||
DELETE FROM repack_test WHERE i=7;
|
||||
}
|
||||
|
||||
# When applying concurrent data changes, we should see the effects of an
|
||||
# in-progress subtransaction.
|
||||
#
|
||||
# XXX Not sure this test is useful now - it was designed for the patch that
|
||||
# preserves tuple visibility and which therefore modifies
|
||||
# TransactionIdIsCurrentTransactionId().
|
||||
step change_subxact1
|
||||
{
|
||||
BEGIN;
|
||||
INSERT INTO repack_test(i, j) VALUES (100, 100);
|
||||
SAVEPOINT s1;
|
||||
UPDATE repack_test SET i=101 where i=100;
|
||||
SAVEPOINT s2;
|
||||
UPDATE repack_test SET i=102 where i=101;
|
||||
COMMIT;
|
||||
}
|
||||
|
||||
# When applying concurrent data changes, we should not see the effects of a
|
||||
# rolled back subtransaction.
|
||||
#
|
||||
# XXX Is this test useful? See above.
|
||||
step change_subxact2
|
||||
{
|
||||
BEGIN;
|
||||
SAVEPOINT s1;
|
||||
INSERT INTO repack_test(i, j) VALUES (110, 110);
|
||||
ROLLBACK TO SAVEPOINT s1;
|
||||
INSERT INTO repack_test(i, j) VALUES (110, 111);
|
||||
COMMIT;
|
||||
}
|
||||
|
||||
# Check the table from the perspective of s2.
|
||||
step check2
|
||||
{
|
||||
INSERT INTO relfilenodes(node)
|
||||
SELECT relfilenode FROM pg_class WHERE relname='repack_test';
|
||||
|
||||
SELECT i, j FROM repack_test ORDER BY i, j;
|
||||
|
||||
INSERT INTO data_s2(i, j)
|
||||
SELECT i, j FROM repack_test;
|
||||
}
|
||||
step wakeup_before_lock
|
||||
{
|
||||
SELECT injection_points_wakeup('repack-concurrently-before-lock');
|
||||
}
|
||||
|
||||
# Test if data changes introduced while one session is performing REPACK
|
||||
# CONCURRENTLY find their way into the table.
|
||||
permutation
|
||||
wait_before_lock
|
||||
change_existing
|
||||
change_new
|
||||
change_subxact1
|
||||
change_subxact2
|
||||
check2
|
||||
wakeup_before_lock
|
||||
check1
|
||||
112
src/test/modules/injection_points/specs/repack_toast.spec
Normal file
112
src/test/modules/injection_points/specs/repack_toast.spec
Normal file
|
|
@ -0,0 +1,112 @@
|
|||
# REPACK (CONCURRENTLY);
|
||||
#
|
||||
# Test handling of TOAST. At the same time, no tuplesort.
|
||||
setup
|
||||
{
|
||||
CREATE EXTENSION injection_points;
|
||||
|
||||
-- Return a string that needs to be TOASTed.
|
||||
CREATE FUNCTION get_long_string()
|
||||
RETURNS text
|
||||
LANGUAGE sql as $$
|
||||
SELECT string_agg(chr(65 + trunc(25 * random())::int), '')
|
||||
FROM generate_series(1, 2048) s(x);
|
||||
$$;
|
||||
|
||||
CREATE TABLE repack_test(i int PRIMARY KEY, j text);
|
||||
INSERT INTO repack_test(i, j) VALUES (1, get_long_string()),
|
||||
(2, get_long_string()), (3, get_long_string());
|
||||
|
||||
CREATE TABLE relfilenodes(node oid);
|
||||
|
||||
CREATE TABLE data_s1(i int, j text);
|
||||
CREATE TABLE data_s2(i int, j text);
|
||||
}
|
||||
|
||||
teardown
|
||||
{
|
||||
DROP TABLE repack_test;
|
||||
DROP EXTENSION injection_points;
|
||||
DROP FUNCTION get_long_string();
|
||||
|
||||
DROP TABLE relfilenodes;
|
||||
DROP TABLE data_s1;
|
||||
DROP TABLE data_s2;
|
||||
}
|
||||
|
||||
session s1
|
||||
setup
|
||||
{
|
||||
SELECT injection_points_set_local();
|
||||
SELECT injection_points_attach('repack-concurrently-before-lock', 'wait');
|
||||
}
|
||||
# Perform the initial load and wait for s2 to do some data changes.
|
||||
step wait_before_lock
|
||||
{
|
||||
REPACK (CONCURRENTLY) repack_test;
|
||||
}
|
||||
# Check the table from the perspective of s1.
|
||||
#
|
||||
# Besides the contents, we also check that relfilenode has changed.
|
||||
|
||||
# Have each session write the contents into a table and use FULL JOIN to check
|
||||
# if the outputs are identical.
|
||||
step check1
|
||||
{
|
||||
INSERT INTO relfilenodes(node)
|
||||
SELECT c2.relfilenode
|
||||
FROM pg_class c1 JOIN pg_class c2 ON c2.oid = c1.oid OR c2.oid = c1.reltoastrelid
|
||||
WHERE c1.relname='repack_test';
|
||||
|
||||
SELECT count(DISTINCT node) FROM relfilenodes;
|
||||
|
||||
INSERT INTO data_s1(i, j)
|
||||
SELECT i, j FROM repack_test;
|
||||
|
||||
SELECT count(*)
|
||||
FROM data_s1 d1 FULL JOIN data_s2 d2 USING (i, j)
|
||||
WHERE d1.i ISNULL OR d2.i ISNULL;
|
||||
}
|
||||
teardown
|
||||
{
|
||||
SELECT injection_points_detach('repack-concurrently-before-lock');
|
||||
}
|
||||
|
||||
session s2
|
||||
step change
|
||||
# Separately test UPDATE of both plain ("i") and TOASTed ("j") attribute. In
|
||||
# the first case, the new tuple we get from reorderbuffer.c contains "j" as a
|
||||
# TOAST pointer, which we need to update so it points to the new heap. In the
|
||||
# latter case, we receive "j" as "external indirect" value - here we test that
|
||||
# the decoding worker writes the tuple to a file correctly and that the
|
||||
# backend executing REPACK manages to restore it.
|
||||
{
|
||||
UPDATE repack_test SET j=get_long_string() where i=2;
|
||||
DELETE FROM repack_test WHERE i=3;
|
||||
INSERT INTO repack_test(i, j) VALUES (4, get_long_string());
|
||||
UPDATE repack_test SET i=3 where i=1;
|
||||
}
|
||||
# Check the table from the perspective of s2.
|
||||
step check2
|
||||
{
|
||||
INSERT INTO relfilenodes(node)
|
||||
SELECT c2.relfilenode
|
||||
FROM pg_class c1 JOIN pg_class c2 ON c2.oid = c1.oid OR c2.oid = c1.reltoastrelid
|
||||
WHERE c1.relname='repack_test';
|
||||
|
||||
INSERT INTO data_s2(i, j)
|
||||
SELECT i, j FROM repack_test;
|
||||
}
|
||||
step wakeup_before_lock
|
||||
{
|
||||
SELECT injection_points_wakeup('repack-concurrently-before-lock');
|
||||
}
|
||||
|
||||
# Test if data changes introduced while one session is performing REPACK
|
||||
# CONCURRENTLY find their way into the table.
|
||||
permutation
|
||||
wait_before_lock
|
||||
change
|
||||
check2
|
||||
wakeup_before_lock
|
||||
check1
|
||||
|
|
@ -537,6 +537,10 @@ SELECT relname, old.level, old.relkind, old.relfilenode = new.relfilenode FROM o
|
|||
clstrpart33 | 2 | r | f
|
||||
(7 rows)
|
||||
|
||||
-- CONCURRENTLY doesn't like partitioned tables
|
||||
REPACK (CONCURRENTLY) clstrpart;
|
||||
ERROR: REPACK (CONCURRENTLY) is not supported for partitioned tables
|
||||
HINT: Consider running the command on individual partitions.
|
||||
DROP TABLE clstrpart;
|
||||
-- Ownership of partitions is checked
|
||||
CREATE TABLE ptnowner(i int unique) PARTITION BY LIST (i);
|
||||
|
|
@ -802,6 +806,10 @@ ORDER BY o.relname;
|
|||
clstr_3
|
||||
(2 rows)
|
||||
|
||||
-- concurrently
|
||||
REPACK (CONCURRENTLY) pg_class;
|
||||
ERROR: cannot repack relation "pg_class"
|
||||
HINT: REPACK CONCURRENTLY is not supported for catalog relations.
|
||||
-- clean up
|
||||
DROP TABLE clustertest;
|
||||
DROP TABLE clstr_1;
|
||||
|
|
|
|||
|
|
@ -2021,7 +2021,7 @@ pg_stat_progress_cluster| SELECT pid,
|
|||
phase,
|
||||
repack_index_relid AS cluster_index_relid,
|
||||
heap_tuples_scanned,
|
||||
heap_tuples_written,
|
||||
(heap_tuples_inserted + heap_tuples_updated) AS heap_tuples_written,
|
||||
heap_blks_total,
|
||||
heap_blks_scanned,
|
||||
index_rebuild_count
|
||||
|
|
@ -2136,17 +2136,20 @@ pg_stat_progress_repack| SELECT s.pid,
|
|||
WHEN 2 THEN 'index scanning heap'::text
|
||||
WHEN 3 THEN 'sorting tuples'::text
|
||||
WHEN 4 THEN 'writing new heap'::text
|
||||
WHEN 5 THEN 'swapping relation files'::text
|
||||
WHEN 6 THEN 'rebuilding index'::text
|
||||
WHEN 7 THEN 'performing final cleanup'::text
|
||||
WHEN 5 THEN 'catch-up'::text
|
||||
WHEN 6 THEN 'swapping relation files'::text
|
||||
WHEN 7 THEN 'rebuilding index'::text
|
||||
WHEN 8 THEN 'performing final cleanup'::text
|
||||
ELSE NULL::text
|
||||
END AS phase,
|
||||
(s.param3)::oid AS repack_index_relid,
|
||||
s.param4 AS heap_tuples_scanned,
|
||||
s.param5 AS heap_tuples_written,
|
||||
s.param6 AS heap_blks_total,
|
||||
s.param7 AS heap_blks_scanned,
|
||||
s.param8 AS index_rebuild_count
|
||||
s.param5 AS heap_tuples_inserted,
|
||||
s.param6 AS heap_tuples_updated,
|
||||
s.param7 AS heap_tuples_deleted,
|
||||
s.param8 AS heap_blks_total,
|
||||
s.param9 AS heap_blks_scanned,
|
||||
s.param10 AS index_rebuild_count
|
||||
FROM (pg_stat_get_progress_info('REPACK'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
|
||||
LEFT JOIN pg_database d ON ((s.datid = d.oid)));
|
||||
pg_stat_progress_vacuum| SELECT s.pid,
|
||||
|
|
|
|||
|
|
@ -248,6 +248,9 @@ REPACK clstrpart;
|
|||
CREATE TEMP TABLE new_cluster_info AS SELECT relname, level, relfilenode, relkind FROM pg_partition_tree('clstrpart'::regclass) AS tree JOIN pg_class c ON c.oid=tree.relid ;
|
||||
SELECT relname, old.level, old.relkind, old.relfilenode = new.relfilenode FROM old_cluster_info AS old JOIN new_cluster_info AS new USING (relname) ORDER BY relname COLLATE "C";
|
||||
|
||||
-- CONCURRENTLY doesn't like partitioned tables
|
||||
REPACK (CONCURRENTLY) clstrpart;
|
||||
|
||||
DROP TABLE clstrpart;
|
||||
|
||||
-- Ownership of partitions is checked
|
||||
|
|
@ -383,6 +386,9 @@ JOIN relnodes_new n ON o.relname = n.relname
|
|||
WHERE o.relfilenode <> n.relfilenode
|
||||
ORDER BY o.relname;
|
||||
|
||||
-- concurrently
|
||||
REPACK (CONCURRENTLY) pg_class;
|
||||
|
||||
-- clean up
|
||||
DROP TABLE clustertest;
|
||||
DROP TABLE clstr_1;
|
||||
|
|
|
|||
|
|
@ -430,6 +430,7 @@ CatCacheHeader
|
|||
CatalogId
|
||||
CatalogIdMapEntry
|
||||
CatalogIndexState
|
||||
ChangeContext
|
||||
ChangeVarNodes_callback
|
||||
ChangeVarNodes_context
|
||||
ChannelName
|
||||
|
|
@ -509,6 +510,7 @@ CompressFileHandle
|
|||
CompressionLocation
|
||||
CompressorState
|
||||
ComputeXidHorizonsResult
|
||||
ConcurrentChangeKind
|
||||
ConditionVariable
|
||||
ConditionVariableMinimallyPadded
|
||||
ConditionalStack
|
||||
|
|
@ -655,6 +657,8 @@ DeclareCursorStmt
|
|||
DecodedBkpBlock
|
||||
DecodedXLogRecord
|
||||
DecodingOutputState
|
||||
DecodingWorker
|
||||
DecodingWorkerShared
|
||||
DefElem
|
||||
DefElemAction
|
||||
DefaultACLInfo
|
||||
|
|
@ -1318,6 +1322,7 @@ IndexElem
|
|||
IndexFetchHeapData
|
||||
IndexFetchTableData
|
||||
IndexInfo
|
||||
IndexInsertState
|
||||
IndexList
|
||||
IndexOnlyScan
|
||||
IndexOnlyScanState
|
||||
|
|
@ -2635,6 +2640,7 @@ ReorderBufferTupleCidKey
|
|||
ReorderBufferUpdateProgressTxnCB
|
||||
ReorderTuple
|
||||
RepackCommand
|
||||
RepackDecodingState
|
||||
RepackStmt
|
||||
ReparameterizeForeignPathByChild_function
|
||||
ReplOriginId
|
||||
|
|
|
|||
Loading…
Reference in a new issue