Allocate separate DSM chunk for parallel Index[Only]Scan instrumentation

Previously, parallel index and index-only scans packed the parallel scan
descriptor and shared instrumentation (for EXPLAIN ANALYZE) into a
single DSM allocation. Since scans may be instrumented without being
parallel-aware, and vice versa, using separate DSM chunks -- each with
its own TOC key -- is cleaner. A future commit will extend this pattern
to other scan node types.

Author: Melanie Plageman <melanieplageman@gmail.com>
Reviewed-by: Tomas Vondra <tomas@vondra.me>
Discussion: https://postgr.es/m/flat/a177a6dd-240b-455a-8f25-aca0b1c08c6e%40vondra.me
This commit is contained in:
Melanie Plageman 2026-04-06 19:10:19 -04:00
parent 43222b8e53
commit dd78e69cfc
10 changed files with 202 additions and 154 deletions

View file

@ -463,43 +463,26 @@ index_restrpos(IndexScanDesc scan)
}
/*
* index_parallelscan_estimate - estimate shared memory for parallel scan
*
* When instrument=true, estimate includes SharedIndexScanInstrumentation
* space. When parallel_aware=true, estimate includes whatever space the
* index AM's amestimateparallelscan routine requested when called.
* Estimates the shared memory needed for parallel scan, including any
* AM-specific parallel scan state.
*/
Size
index_parallelscan_estimate(Relation indexRelation, int nkeys, int norderbys,
Snapshot snapshot, bool instrument,
bool parallel_aware, int nworkers)
Snapshot snapshot)
{
Size nbytes;
Assert(instrument || parallel_aware);
RELATION_CHECKS;
nbytes = offsetof(ParallelIndexScanDescData, ps_snapshot_data);
nbytes = add_size(nbytes, EstimateSnapshotSpace(snapshot));
nbytes = MAXALIGN(nbytes);
if (instrument)
{
Size sharedinfosz;
sharedinfosz = offsetof(SharedIndexScanInstrumentation, winstrument) +
nworkers * sizeof(IndexScanInstrumentation);
nbytes = add_size(nbytes, sharedinfosz);
nbytes = MAXALIGN(nbytes);
}
/*
* If parallel scan index AM interface can't be used (or index AM provides
* no such interface), assume there is no AM-specific data needed
*/
if (parallel_aware &&
indexRelation->rd_indam->amestimateparallelscan != NULL)
if (indexRelation->rd_indam->amestimateparallelscan != NULL)
nbytes = add_size(nbytes,
indexRelation->rd_indam->amestimateparallelscan(indexRelation,
nkeys,
@ -520,15 +503,11 @@ index_parallelscan_estimate(Relation indexRelation, int nkeys, int norderbys,
*/
void
index_parallelscan_initialize(Relation heapRelation, Relation indexRelation,
Snapshot snapshot, bool instrument,
bool parallel_aware, int nworkers,
SharedIndexScanInstrumentation **sharedinfo,
Snapshot snapshot,
ParallelIndexScanDesc target)
{
Size offset;
Assert(instrument || parallel_aware);
RELATION_CHECKS;
offset = add_size(offsetof(ParallelIndexScanDescData, ps_snapshot_data),
@ -537,29 +516,11 @@ index_parallelscan_initialize(Relation heapRelation, Relation indexRelation,
target->ps_locator = heapRelation->rd_locator;
target->ps_indexlocator = indexRelation->rd_locator;
target->ps_offset_ins = 0;
target->ps_offset_am = 0;
SerializeSnapshot(snapshot, target->ps_snapshot_data);
if (instrument)
{
Size sharedinfosz;
target->ps_offset_ins = offset;
sharedinfosz = offsetof(SharedIndexScanInstrumentation, winstrument) +
nworkers * sizeof(IndexScanInstrumentation);
offset = add_size(offset, sharedinfosz);
offset = MAXALIGN(offset);
/* Set leader's *sharedinfo pointer, and initialize stats */
*sharedinfo = (SharedIndexScanInstrumentation *)
OffsetToPointer(target, target->ps_offset_ins);
memset(*sharedinfo, 0, sharedinfosz);
(*sharedinfo)->num_workers = nworkers;
}
/* aminitparallelscan is optional; assume no-op if not provided by AM */
if (parallel_aware && indexRelation->rd_indam->aminitparallelscan != NULL)
if (indexRelation->rd_indam->aminitparallelscan != NULL)
{
void *amtarget;

View file

@ -259,14 +259,20 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
e->pcxt);
break;
case T_IndexScanState:
if (planstate->plan->parallel_aware)
ExecIndexScanEstimate((IndexScanState *) planstate,
e->pcxt);
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecIndexScanEstimate((IndexScanState *) planstate,
e->pcxt);
ExecIndexScanInstrumentEstimate((IndexScanState *) planstate,
e->pcxt);
break;
case T_IndexOnlyScanState:
if (planstate->plan->parallel_aware)
ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
e->pcxt);
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
e->pcxt);
ExecIndexOnlyScanInstrumentEstimate((IndexOnlyScanState *) planstate,
e->pcxt);
break;
case T_BitmapIndexScanState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
@ -493,13 +499,20 @@ ExecParallelInitializeDSM(PlanState *planstate,
d->pcxt);
break;
case T_IndexScanState:
if (planstate->plan->parallel_aware)
ExecIndexScanInitializeDSM((IndexScanState *) planstate,
d->pcxt);
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecIndexScanInitializeDSM((IndexScanState *) planstate, d->pcxt);
ExecIndexScanInstrumentInitDSM((IndexScanState *) planstate,
d->pcxt);
break;
case T_IndexOnlyScanState:
if (planstate->plan->parallel_aware)
ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
d->pcxt);
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
d->pcxt);
ExecIndexOnlyScanInstrumentInitDSM((IndexOnlyScanState *) planstate,
d->pcxt);
break;
case T_BitmapIndexScanState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
@ -1371,13 +1384,20 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
break;
case T_IndexScanState:
if (planstate->plan->parallel_aware)
ExecIndexScanInitializeWorker((IndexScanState *) planstate,
pwcxt);
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecIndexScanInitializeWorker((IndexScanState *) planstate, pwcxt);
ExecIndexScanInstrumentInitWorker((IndexScanState *) planstate,
pwcxt);
break;
case T_IndexOnlyScanState:
if (planstate->plan->parallel_aware)
ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
pwcxt);
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
pwcxt);
ExecIndexOnlyScanInstrumentInitWorker((IndexOnlyScanState *) planstate,
pwcxt);
break;
case T_BitmapIndexScanState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */

View file

@ -394,7 +394,9 @@ ExecBitmapIndexScanInitializeDSM(BitmapIndexScanState *node,
node->biss_SharedInfo =
(SharedIndexScanInstrumentation *) shm_toc_allocate(pcxt->toc,
size);
shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
shm_toc_insert(pcxt->toc,
node->ss.ps.plan->plan_node_id +
PARALLEL_KEY_SCAN_INSTRUMENT_OFFSET,
node->biss_SharedInfo);
/* Each per-worker area must start out as zeroes */
@ -417,7 +419,10 @@ ExecBitmapIndexScanInitializeWorker(BitmapIndexScanState *node,
return;
node->biss_SharedInfo = (SharedIndexScanInstrumentation *)
shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
shm_toc_lookup(pwcxt->toc,
node->ss.ps.plan->plan_node_id +
PARALLEL_KEY_SCAN_INSTRUMENT_OFFSET,
false);
}
/* ----------------------------------------------------------------

View file

@ -736,21 +736,11 @@ ExecIndexOnlyScanEstimate(IndexOnlyScanState *node,
ParallelContext *pcxt)
{
EState *estate = node->ss.ps.state;
bool instrument = (node->ss.ps.instrument != NULL);
bool parallel_aware = node->ss.ps.plan->parallel_aware;
if (!instrument && !parallel_aware)
{
/* No DSM required by the scan */
return;
}
node->ioss_PscanLen = index_parallelscan_estimate(node->ioss_RelationDesc,
node->ioss_NumScanKeys,
node->ioss_NumOrderByKeys,
estate->es_snapshot,
instrument, parallel_aware,
pcxt->nworkers);
estate->es_snapshot);
shm_toc_estimate_chunk(&pcxt->estimator, node->ioss_PscanLen);
shm_toc_estimate_keys(&pcxt->estimator, 1);
}
@ -767,29 +757,14 @@ ExecIndexOnlyScanInitializeDSM(IndexOnlyScanState *node,
{
EState *estate = node->ss.ps.state;
ParallelIndexScanDesc piscan;
bool instrument = node->ss.ps.instrument != NULL;
bool parallel_aware = node->ss.ps.plan->parallel_aware;
if (!instrument && !parallel_aware)
{
/* No DSM required by the scan */
return;
}
piscan = shm_toc_allocate(pcxt->toc, node->ioss_PscanLen);
index_parallelscan_initialize(node->ss.ss_currentRelation,
node->ioss_RelationDesc,
estate->es_snapshot,
instrument, parallel_aware, pcxt->nworkers,
&node->ioss_SharedInfo, piscan);
piscan);
shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, piscan);
if (!parallel_aware)
{
/* Only here to initialize SharedInfo in DSM */
return;
}
node->ioss_ScanDesc =
index_beginscan_parallel(node->ss.ss_currentRelation,
node->ioss_RelationDesc,
@ -837,27 +812,9 @@ ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node,
ParallelWorkerContext *pwcxt)
{
ParallelIndexScanDesc piscan;
bool instrument = node->ss.ps.instrument != NULL;
bool parallel_aware = node->ss.ps.plan->parallel_aware;
if (!instrument && !parallel_aware)
{
/* No DSM required by the scan */
return;
}
piscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
if (instrument)
node->ioss_SharedInfo = (SharedIndexScanInstrumentation *)
OffsetToPointer(piscan, piscan->ps_offset_ins);
if (!parallel_aware)
{
/* Only here to set up worker node's SharedInfo */
return;
}
node->ioss_ScanDesc =
index_beginscan_parallel(node->ss.ss_currentRelation,
node->ioss_RelationDesc,
@ -879,6 +836,73 @@ ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node,
node->ioss_OrderByKeys, node->ioss_NumOrderByKeys);
}
/*
* Compute the amount of space we'll need for the shared instrumentation and
* inform pcxt->estimator.
*/
void
ExecIndexOnlyScanInstrumentEstimate(IndexOnlyScanState *node,
ParallelContext *pcxt)
{
Size size;
if (!node->ss.ps.instrument || pcxt->nworkers == 0)
return;
/*
* This size calculation is trivial enough that we don't bother saving it
* in the IndexOnlyScanState. We'll recalculate the needed size in
* ExecIndexOnlyScanInstrumentInitDSM().
*/
size = offsetof(SharedIndexScanInstrumentation, winstrument) +
pcxt->nworkers * sizeof(IndexScanInstrumentation);
shm_toc_estimate_chunk(&pcxt->estimator, size);
shm_toc_estimate_keys(&pcxt->estimator, 1);
}
/*
* Set up parallel index-only scan instrumentation.
*/
void
ExecIndexOnlyScanInstrumentInitDSM(IndexOnlyScanState *node,
ParallelContext *pcxt)
{
Size size;
if (!node->ss.ps.instrument || pcxt->nworkers == 0)
return;
size = offsetof(SharedIndexScanInstrumentation, winstrument) +
pcxt->nworkers * sizeof(IndexScanInstrumentation);
node->ioss_SharedInfo =
(SharedIndexScanInstrumentation *) shm_toc_allocate(pcxt->toc, size);
/* Each per-worker area must start out as zeroes */
memset(node->ioss_SharedInfo, 0, size);
node->ioss_SharedInfo->num_workers = pcxt->nworkers;
shm_toc_insert(pcxt->toc,
node->ss.ps.plan->plan_node_id +
PARALLEL_KEY_SCAN_INSTRUMENT_OFFSET,
node->ioss_SharedInfo);
}
/*
* Look up and save the location of the shared instrumentation.
*/
void
ExecIndexOnlyScanInstrumentInitWorker(IndexOnlyScanState *node,
ParallelWorkerContext *pwcxt)
{
if (!node->ss.ps.instrument)
return;
node->ioss_SharedInfo = (SharedIndexScanInstrumentation *)
shm_toc_lookup(pwcxt->toc,
node->ss.ps.plan->plan_node_id +
PARALLEL_KEY_SCAN_INSTRUMENT_OFFSET,
false);
}
/* ----------------------------------------------------------------
* ExecIndexOnlyScanRetrieveInstrumentation
*

View file

@ -1674,21 +1674,11 @@ ExecIndexScanEstimate(IndexScanState *node,
ParallelContext *pcxt)
{
EState *estate = node->ss.ps.state;
bool instrument = node->ss.ps.instrument != NULL;
bool parallel_aware = node->ss.ps.plan->parallel_aware;
if (!instrument && !parallel_aware)
{
/* No DSM required by the scan */
return;
}
node->iss_PscanLen = index_parallelscan_estimate(node->iss_RelationDesc,
node->iss_NumScanKeys,
node->iss_NumOrderByKeys,
estate->es_snapshot,
instrument, parallel_aware,
pcxt->nworkers);
estate->es_snapshot);
shm_toc_estimate_chunk(&pcxt->estimator, node->iss_PscanLen);
shm_toc_estimate_keys(&pcxt->estimator, 1);
}
@ -1705,29 +1695,14 @@ ExecIndexScanInitializeDSM(IndexScanState *node,
{
EState *estate = node->ss.ps.state;
ParallelIndexScanDesc piscan;
bool instrument = node->ss.ps.instrument != NULL;
bool parallel_aware = node->ss.ps.plan->parallel_aware;
if (!instrument && !parallel_aware)
{
/* No DSM required by the scan */
return;
}
piscan = shm_toc_allocate(pcxt->toc, node->iss_PscanLen);
index_parallelscan_initialize(node->ss.ss_currentRelation,
node->iss_RelationDesc,
estate->es_snapshot,
instrument, parallel_aware, pcxt->nworkers,
&node->iss_SharedInfo, piscan);
piscan);
shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, piscan);
if (!parallel_aware)
{
/* Only here to initialize SharedInfo in DSM */
return;
}
node->iss_ScanDesc =
index_beginscan_parallel(node->ss.ss_currentRelation,
node->iss_RelationDesc,
@ -1773,27 +1748,9 @@ ExecIndexScanInitializeWorker(IndexScanState *node,
ParallelWorkerContext *pwcxt)
{
ParallelIndexScanDesc piscan;
bool instrument = node->ss.ps.instrument != NULL;
bool parallel_aware = node->ss.ps.plan->parallel_aware;
if (!instrument && !parallel_aware)
{
/* No DSM required by the scan */
return;
}
piscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
if (instrument)
node->iss_SharedInfo = (SharedIndexScanInstrumentation *)
OffsetToPointer(piscan, piscan->ps_offset_ins);
if (!parallel_aware)
{
/* Only here to set up worker node's SharedInfo */
return;
}
node->iss_ScanDesc =
index_beginscan_parallel(node->ss.ss_currentRelation,
node->iss_RelationDesc,
@ -1814,6 +1771,73 @@ ExecIndexScanInitializeWorker(IndexScanState *node,
node->iss_OrderByKeys, node->iss_NumOrderByKeys);
}
/*
* Compute the amount of space we'll need for the shared instrumentation and
* inform pcxt->estimator.
*/
void
ExecIndexScanInstrumentEstimate(IndexScanState *node,
ParallelContext *pcxt)
{
Size size;
if (!node->ss.ps.instrument || pcxt->nworkers == 0)
return;
/*
* This size calculation is trivial enough that we don't bother saving it
* in the IndexScanState. We'll recalculate the needed size in
* ExecIndexScanInstrumentInitDSM().
*/
size = offsetof(SharedIndexScanInstrumentation, winstrument) +
pcxt->nworkers * sizeof(IndexScanInstrumentation);
shm_toc_estimate_chunk(&pcxt->estimator, size);
shm_toc_estimate_keys(&pcxt->estimator, 1);
}
/*
* Set up parallel index scan instrumentation.
*/
void
ExecIndexScanInstrumentInitDSM(IndexScanState *node,
ParallelContext *pcxt)
{
Size size;
if (!node->ss.ps.instrument || pcxt->nworkers == 0)
return;
size = offsetof(SharedIndexScanInstrumentation, winstrument) +
pcxt->nworkers * sizeof(IndexScanInstrumentation);
node->iss_SharedInfo =
(SharedIndexScanInstrumentation *) shm_toc_allocate(pcxt->toc, size);
/* Each per-worker area must start out as zeroes */
memset(node->iss_SharedInfo, 0, size);
node->iss_SharedInfo->num_workers = pcxt->nworkers;
shm_toc_insert(pcxt->toc,
node->ss.ps.plan->plan_node_id +
PARALLEL_KEY_SCAN_INSTRUMENT_OFFSET,
node->iss_SharedInfo);
}
/*
* Look up and save the location of the shared instrumentation.
*/
void
ExecIndexScanInstrumentInitWorker(IndexScanState *node,
ParallelWorkerContext *pwcxt)
{
if (!node->ss.ps.instrument)
return;
node->iss_SharedInfo = (SharedIndexScanInstrumentation *)
shm_toc_lookup(pwcxt->toc,
node->ss.ps.plan->plan_node_id +
PARALLEL_KEY_SCAN_INSTRUMENT_OFFSET,
false);
}
/* ----------------------------------------------------------------
* ExecIndexScanRetrieveInstrumentation
*

View file

@ -171,14 +171,9 @@ extern void index_endscan(IndexScanDesc scan);
extern void index_markpos(IndexScanDesc scan);
extern void index_restrpos(IndexScanDesc scan);
extern Size index_parallelscan_estimate(Relation indexRelation,
int nkeys, int norderbys, Snapshot snapshot,
bool instrument, bool parallel_aware,
int nworkers);
int nkeys, int norderbys, Snapshot snapshot);
extern void index_parallelscan_initialize(Relation heapRelation,
Relation indexRelation, Snapshot snapshot,
bool instrument, bool parallel_aware,
int nworkers,
SharedIndexScanInstrumentation **sharedinfo,
ParallelIndexScanDesc target);
extern void index_parallelrescan(IndexScanDesc scan);
extern IndexScanDesc index_beginscan_parallel(Relation heaprel,

View file

@ -203,7 +203,6 @@ typedef struct ParallelIndexScanDescData
{
RelFileLocator ps_locator; /* physical table relation to scan */
RelFileLocator ps_indexlocator; /* physical index relation to scan */
Size ps_offset_ins; /* Offset to SharedIndexScanInstrumentation */
Size ps_offset_am; /* Offset to am-specific structure */
char ps_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
} ParallelIndexScanDescData;

View file

@ -18,6 +18,14 @@
#ifndef INSTRUMENT_NODE_H
#define INSTRUMENT_NODE_H
/*
* Offset added to plan_node_id to create a second TOC key for per-worker scan
* instrumentation. Instrumentation and parallel-awareness are independent, so
* separate DSM chunks let each be allocated and initialized only when needed.
* In the future, if nodes need more DSM allocations, we would need a more
* robust system.
*/
#define PARALLEL_KEY_SCAN_INSTRUMENT_OFFSET UINT64CONST(0xD000000000000000)
/* ---------------------
* Instrumentation information for aggregate function execution

View file

@ -32,6 +32,12 @@ extern void ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node,
ParallelContext *pcxt);
extern void ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node,
ParallelWorkerContext *pwcxt);
extern void ExecIndexOnlyScanInstrumentEstimate(IndexOnlyScanState *node,
ParallelContext *pcxt);
extern void ExecIndexOnlyScanInstrumentInitDSM(IndexOnlyScanState *node,
ParallelContext *pcxt);
extern void ExecIndexOnlyScanInstrumentInitWorker(IndexOnlyScanState *node,
ParallelWorkerContext *pwcxt);
extern void ExecIndexOnlyScanRetrieveInstrumentation(IndexOnlyScanState *node);
#endif /* NODEINDEXONLYSCAN_H */

View file

@ -28,6 +28,12 @@ extern void ExecIndexScanInitializeDSM(IndexScanState *node, ParallelContext *pc
extern void ExecIndexScanReInitializeDSM(IndexScanState *node, ParallelContext *pcxt);
extern void ExecIndexScanInitializeWorker(IndexScanState *node,
ParallelWorkerContext *pwcxt);
extern void ExecIndexScanInstrumentEstimate(IndexScanState *node,
ParallelContext *pcxt);
extern void ExecIndexScanInstrumentInitDSM(IndexScanState *node,
ParallelContext *pcxt);
extern void ExecIndexScanInstrumentInitWorker(IndexScanState *node,
ParallelWorkerContext *pwcxt);
extern void ExecIndexScanRetrieveInstrumentation(IndexScanState *node);
/*