mirror of
https://github.com/postgres/postgres.git
synced 2026-04-15 22:10:45 -04:00
Allocate separate DSM chunk for parallel Index[Only]Scan instrumentation
Previously, parallel index and index-only scans packed the parallel scan descriptor and shared instrumentation (for EXPLAIN ANALYZE) into a single DSM allocation. Since scans may be instrumented without being parallel-aware, and vice versa, using separate DSM chunks -- each with its own TOC key -- is cleaner. A future commit will extend this pattern to other scan node types. Author: Melanie Plageman <melanieplageman@gmail.com> Reviewed-by: Tomas Vondra <tomas@vondra.me> Discussion: https://postgr.es/m/flat/a177a6dd-240b-455a-8f25-aca0b1c08c6e%40vondra.me
This commit is contained in:
parent
43222b8e53
commit
dd78e69cfc
10 changed files with 202 additions and 154 deletions
|
|
@ -463,43 +463,26 @@ index_restrpos(IndexScanDesc scan)
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* index_parallelscan_estimate - estimate shared memory for parallel scan
|
* Estimates the shared memory needed for parallel scan, including any
|
||||||
*
|
* AM-specific parallel scan state.
|
||||||
* When instrument=true, estimate includes SharedIndexScanInstrumentation
|
|
||||||
* space. When parallel_aware=true, estimate includes whatever space the
|
|
||||||
* index AM's amestimateparallelscan routine requested when called.
|
|
||||||
*/
|
*/
|
||||||
Size
|
Size
|
||||||
index_parallelscan_estimate(Relation indexRelation, int nkeys, int norderbys,
|
index_parallelscan_estimate(Relation indexRelation, int nkeys, int norderbys,
|
||||||
Snapshot snapshot, bool instrument,
|
Snapshot snapshot)
|
||||||
bool parallel_aware, int nworkers)
|
|
||||||
{
|
{
|
||||||
Size nbytes;
|
Size nbytes;
|
||||||
|
|
||||||
Assert(instrument || parallel_aware);
|
|
||||||
|
|
||||||
RELATION_CHECKS;
|
RELATION_CHECKS;
|
||||||
|
|
||||||
nbytes = offsetof(ParallelIndexScanDescData, ps_snapshot_data);
|
nbytes = offsetof(ParallelIndexScanDescData, ps_snapshot_data);
|
||||||
nbytes = add_size(nbytes, EstimateSnapshotSpace(snapshot));
|
nbytes = add_size(nbytes, EstimateSnapshotSpace(snapshot));
|
||||||
nbytes = MAXALIGN(nbytes);
|
nbytes = MAXALIGN(nbytes);
|
||||||
|
|
||||||
if (instrument)
|
|
||||||
{
|
|
||||||
Size sharedinfosz;
|
|
||||||
|
|
||||||
sharedinfosz = offsetof(SharedIndexScanInstrumentation, winstrument) +
|
|
||||||
nworkers * sizeof(IndexScanInstrumentation);
|
|
||||||
nbytes = add_size(nbytes, sharedinfosz);
|
|
||||||
nbytes = MAXALIGN(nbytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If parallel scan index AM interface can't be used (or index AM provides
|
* If parallel scan index AM interface can't be used (or index AM provides
|
||||||
* no such interface), assume there is no AM-specific data needed
|
* no such interface), assume there is no AM-specific data needed
|
||||||
*/
|
*/
|
||||||
if (parallel_aware &&
|
if (indexRelation->rd_indam->amestimateparallelscan != NULL)
|
||||||
indexRelation->rd_indam->amestimateparallelscan != NULL)
|
|
||||||
nbytes = add_size(nbytes,
|
nbytes = add_size(nbytes,
|
||||||
indexRelation->rd_indam->amestimateparallelscan(indexRelation,
|
indexRelation->rd_indam->amestimateparallelscan(indexRelation,
|
||||||
nkeys,
|
nkeys,
|
||||||
|
|
@ -520,15 +503,11 @@ index_parallelscan_estimate(Relation indexRelation, int nkeys, int norderbys,
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
index_parallelscan_initialize(Relation heapRelation, Relation indexRelation,
|
index_parallelscan_initialize(Relation heapRelation, Relation indexRelation,
|
||||||
Snapshot snapshot, bool instrument,
|
Snapshot snapshot,
|
||||||
bool parallel_aware, int nworkers,
|
|
||||||
SharedIndexScanInstrumentation **sharedinfo,
|
|
||||||
ParallelIndexScanDesc target)
|
ParallelIndexScanDesc target)
|
||||||
{
|
{
|
||||||
Size offset;
|
Size offset;
|
||||||
|
|
||||||
Assert(instrument || parallel_aware);
|
|
||||||
|
|
||||||
RELATION_CHECKS;
|
RELATION_CHECKS;
|
||||||
|
|
||||||
offset = add_size(offsetof(ParallelIndexScanDescData, ps_snapshot_data),
|
offset = add_size(offsetof(ParallelIndexScanDescData, ps_snapshot_data),
|
||||||
|
|
@ -537,29 +516,11 @@ index_parallelscan_initialize(Relation heapRelation, Relation indexRelation,
|
||||||
|
|
||||||
target->ps_locator = heapRelation->rd_locator;
|
target->ps_locator = heapRelation->rd_locator;
|
||||||
target->ps_indexlocator = indexRelation->rd_locator;
|
target->ps_indexlocator = indexRelation->rd_locator;
|
||||||
target->ps_offset_ins = 0;
|
|
||||||
target->ps_offset_am = 0;
|
target->ps_offset_am = 0;
|
||||||
SerializeSnapshot(snapshot, target->ps_snapshot_data);
|
SerializeSnapshot(snapshot, target->ps_snapshot_data);
|
||||||
|
|
||||||
if (instrument)
|
|
||||||
{
|
|
||||||
Size sharedinfosz;
|
|
||||||
|
|
||||||
target->ps_offset_ins = offset;
|
|
||||||
sharedinfosz = offsetof(SharedIndexScanInstrumentation, winstrument) +
|
|
||||||
nworkers * sizeof(IndexScanInstrumentation);
|
|
||||||
offset = add_size(offset, sharedinfosz);
|
|
||||||
offset = MAXALIGN(offset);
|
|
||||||
|
|
||||||
/* Set leader's *sharedinfo pointer, and initialize stats */
|
|
||||||
*sharedinfo = (SharedIndexScanInstrumentation *)
|
|
||||||
OffsetToPointer(target, target->ps_offset_ins);
|
|
||||||
memset(*sharedinfo, 0, sharedinfosz);
|
|
||||||
(*sharedinfo)->num_workers = nworkers;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* aminitparallelscan is optional; assume no-op if not provided by AM */
|
/* aminitparallelscan is optional; assume no-op if not provided by AM */
|
||||||
if (parallel_aware && indexRelation->rd_indam->aminitparallelscan != NULL)
|
if (indexRelation->rd_indam->aminitparallelscan != NULL)
|
||||||
{
|
{
|
||||||
void *amtarget;
|
void *amtarget;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -259,14 +259,20 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
|
||||||
e->pcxt);
|
e->pcxt);
|
||||||
break;
|
break;
|
||||||
case T_IndexScanState:
|
case T_IndexScanState:
|
||||||
|
if (planstate->plan->parallel_aware)
|
||||||
|
ExecIndexScanEstimate((IndexScanState *) planstate,
|
||||||
|
e->pcxt);
|
||||||
/* even when not parallel-aware, for EXPLAIN ANALYZE */
|
/* even when not parallel-aware, for EXPLAIN ANALYZE */
|
||||||
ExecIndexScanEstimate((IndexScanState *) planstate,
|
ExecIndexScanInstrumentEstimate((IndexScanState *) planstate,
|
||||||
e->pcxt);
|
e->pcxt);
|
||||||
break;
|
break;
|
||||||
case T_IndexOnlyScanState:
|
case T_IndexOnlyScanState:
|
||||||
|
if (planstate->plan->parallel_aware)
|
||||||
|
ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
|
||||||
|
e->pcxt);
|
||||||
/* even when not parallel-aware, for EXPLAIN ANALYZE */
|
/* even when not parallel-aware, for EXPLAIN ANALYZE */
|
||||||
ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
|
ExecIndexOnlyScanInstrumentEstimate((IndexOnlyScanState *) planstate,
|
||||||
e->pcxt);
|
e->pcxt);
|
||||||
break;
|
break;
|
||||||
case T_BitmapIndexScanState:
|
case T_BitmapIndexScanState:
|
||||||
/* even when not parallel-aware, for EXPLAIN ANALYZE */
|
/* even when not parallel-aware, for EXPLAIN ANALYZE */
|
||||||
|
|
@ -493,13 +499,20 @@ ExecParallelInitializeDSM(PlanState *planstate,
|
||||||
d->pcxt);
|
d->pcxt);
|
||||||
break;
|
break;
|
||||||
case T_IndexScanState:
|
case T_IndexScanState:
|
||||||
|
if (planstate->plan->parallel_aware)
|
||||||
|
ExecIndexScanInitializeDSM((IndexScanState *) planstate,
|
||||||
|
d->pcxt);
|
||||||
/* even when not parallel-aware, for EXPLAIN ANALYZE */
|
/* even when not parallel-aware, for EXPLAIN ANALYZE */
|
||||||
ExecIndexScanInitializeDSM((IndexScanState *) planstate, d->pcxt);
|
ExecIndexScanInstrumentInitDSM((IndexScanState *) planstate,
|
||||||
|
d->pcxt);
|
||||||
break;
|
break;
|
||||||
case T_IndexOnlyScanState:
|
case T_IndexOnlyScanState:
|
||||||
|
if (planstate->plan->parallel_aware)
|
||||||
|
ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
|
||||||
|
d->pcxt);
|
||||||
/* even when not parallel-aware, for EXPLAIN ANALYZE */
|
/* even when not parallel-aware, for EXPLAIN ANALYZE */
|
||||||
ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
|
ExecIndexOnlyScanInstrumentInitDSM((IndexOnlyScanState *) planstate,
|
||||||
d->pcxt);
|
d->pcxt);
|
||||||
break;
|
break;
|
||||||
case T_BitmapIndexScanState:
|
case T_BitmapIndexScanState:
|
||||||
/* even when not parallel-aware, for EXPLAIN ANALYZE */
|
/* even when not parallel-aware, for EXPLAIN ANALYZE */
|
||||||
|
|
@ -1371,13 +1384,20 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
|
||||||
ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
|
ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
|
||||||
break;
|
break;
|
||||||
case T_IndexScanState:
|
case T_IndexScanState:
|
||||||
|
if (planstate->plan->parallel_aware)
|
||||||
|
ExecIndexScanInitializeWorker((IndexScanState *) planstate,
|
||||||
|
pwcxt);
|
||||||
/* even when not parallel-aware, for EXPLAIN ANALYZE */
|
/* even when not parallel-aware, for EXPLAIN ANALYZE */
|
||||||
ExecIndexScanInitializeWorker((IndexScanState *) planstate, pwcxt);
|
ExecIndexScanInstrumentInitWorker((IndexScanState *) planstate,
|
||||||
|
pwcxt);
|
||||||
break;
|
break;
|
||||||
case T_IndexOnlyScanState:
|
case T_IndexOnlyScanState:
|
||||||
|
if (planstate->plan->parallel_aware)
|
||||||
|
ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
|
||||||
|
pwcxt);
|
||||||
/* even when not parallel-aware, for EXPLAIN ANALYZE */
|
/* even when not parallel-aware, for EXPLAIN ANALYZE */
|
||||||
ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
|
ExecIndexOnlyScanInstrumentInitWorker((IndexOnlyScanState *) planstate,
|
||||||
pwcxt);
|
pwcxt);
|
||||||
break;
|
break;
|
||||||
case T_BitmapIndexScanState:
|
case T_BitmapIndexScanState:
|
||||||
/* even when not parallel-aware, for EXPLAIN ANALYZE */
|
/* even when not parallel-aware, for EXPLAIN ANALYZE */
|
||||||
|
|
|
||||||
|
|
@ -394,7 +394,9 @@ ExecBitmapIndexScanInitializeDSM(BitmapIndexScanState *node,
|
||||||
node->biss_SharedInfo =
|
node->biss_SharedInfo =
|
||||||
(SharedIndexScanInstrumentation *) shm_toc_allocate(pcxt->toc,
|
(SharedIndexScanInstrumentation *) shm_toc_allocate(pcxt->toc,
|
||||||
size);
|
size);
|
||||||
shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
|
shm_toc_insert(pcxt->toc,
|
||||||
|
node->ss.ps.plan->plan_node_id +
|
||||||
|
PARALLEL_KEY_SCAN_INSTRUMENT_OFFSET,
|
||||||
node->biss_SharedInfo);
|
node->biss_SharedInfo);
|
||||||
|
|
||||||
/* Each per-worker area must start out as zeroes */
|
/* Each per-worker area must start out as zeroes */
|
||||||
|
|
@ -417,7 +419,10 @@ ExecBitmapIndexScanInitializeWorker(BitmapIndexScanState *node,
|
||||||
return;
|
return;
|
||||||
|
|
||||||
node->biss_SharedInfo = (SharedIndexScanInstrumentation *)
|
node->biss_SharedInfo = (SharedIndexScanInstrumentation *)
|
||||||
shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
|
shm_toc_lookup(pwcxt->toc,
|
||||||
|
node->ss.ps.plan->plan_node_id +
|
||||||
|
PARALLEL_KEY_SCAN_INSTRUMENT_OFFSET,
|
||||||
|
false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ----------------------------------------------------------------
|
/* ----------------------------------------------------------------
|
||||||
|
|
|
||||||
|
|
@ -736,21 +736,11 @@ ExecIndexOnlyScanEstimate(IndexOnlyScanState *node,
|
||||||
ParallelContext *pcxt)
|
ParallelContext *pcxt)
|
||||||
{
|
{
|
||||||
EState *estate = node->ss.ps.state;
|
EState *estate = node->ss.ps.state;
|
||||||
bool instrument = (node->ss.ps.instrument != NULL);
|
|
||||||
bool parallel_aware = node->ss.ps.plan->parallel_aware;
|
|
||||||
|
|
||||||
if (!instrument && !parallel_aware)
|
|
||||||
{
|
|
||||||
/* No DSM required by the scan */
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
node->ioss_PscanLen = index_parallelscan_estimate(node->ioss_RelationDesc,
|
node->ioss_PscanLen = index_parallelscan_estimate(node->ioss_RelationDesc,
|
||||||
node->ioss_NumScanKeys,
|
node->ioss_NumScanKeys,
|
||||||
node->ioss_NumOrderByKeys,
|
node->ioss_NumOrderByKeys,
|
||||||
estate->es_snapshot,
|
estate->es_snapshot);
|
||||||
instrument, parallel_aware,
|
|
||||||
pcxt->nworkers);
|
|
||||||
shm_toc_estimate_chunk(&pcxt->estimator, node->ioss_PscanLen);
|
shm_toc_estimate_chunk(&pcxt->estimator, node->ioss_PscanLen);
|
||||||
shm_toc_estimate_keys(&pcxt->estimator, 1);
|
shm_toc_estimate_keys(&pcxt->estimator, 1);
|
||||||
}
|
}
|
||||||
|
|
@ -767,29 +757,14 @@ ExecIndexOnlyScanInitializeDSM(IndexOnlyScanState *node,
|
||||||
{
|
{
|
||||||
EState *estate = node->ss.ps.state;
|
EState *estate = node->ss.ps.state;
|
||||||
ParallelIndexScanDesc piscan;
|
ParallelIndexScanDesc piscan;
|
||||||
bool instrument = node->ss.ps.instrument != NULL;
|
|
||||||
bool parallel_aware = node->ss.ps.plan->parallel_aware;
|
|
||||||
|
|
||||||
if (!instrument && !parallel_aware)
|
|
||||||
{
|
|
||||||
/* No DSM required by the scan */
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
piscan = shm_toc_allocate(pcxt->toc, node->ioss_PscanLen);
|
piscan = shm_toc_allocate(pcxt->toc, node->ioss_PscanLen);
|
||||||
index_parallelscan_initialize(node->ss.ss_currentRelation,
|
index_parallelscan_initialize(node->ss.ss_currentRelation,
|
||||||
node->ioss_RelationDesc,
|
node->ioss_RelationDesc,
|
||||||
estate->es_snapshot,
|
estate->es_snapshot,
|
||||||
instrument, parallel_aware, pcxt->nworkers,
|
piscan);
|
||||||
&node->ioss_SharedInfo, piscan);
|
|
||||||
shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, piscan);
|
shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, piscan);
|
||||||
|
|
||||||
if (!parallel_aware)
|
|
||||||
{
|
|
||||||
/* Only here to initialize SharedInfo in DSM */
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
node->ioss_ScanDesc =
|
node->ioss_ScanDesc =
|
||||||
index_beginscan_parallel(node->ss.ss_currentRelation,
|
index_beginscan_parallel(node->ss.ss_currentRelation,
|
||||||
node->ioss_RelationDesc,
|
node->ioss_RelationDesc,
|
||||||
|
|
@ -837,27 +812,9 @@ ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node,
|
||||||
ParallelWorkerContext *pwcxt)
|
ParallelWorkerContext *pwcxt)
|
||||||
{
|
{
|
||||||
ParallelIndexScanDesc piscan;
|
ParallelIndexScanDesc piscan;
|
||||||
bool instrument = node->ss.ps.instrument != NULL;
|
|
||||||
bool parallel_aware = node->ss.ps.plan->parallel_aware;
|
|
||||||
|
|
||||||
if (!instrument && !parallel_aware)
|
|
||||||
{
|
|
||||||
/* No DSM required by the scan */
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
piscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
|
piscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
|
||||||
|
|
||||||
if (instrument)
|
|
||||||
node->ioss_SharedInfo = (SharedIndexScanInstrumentation *)
|
|
||||||
OffsetToPointer(piscan, piscan->ps_offset_ins);
|
|
||||||
|
|
||||||
if (!parallel_aware)
|
|
||||||
{
|
|
||||||
/* Only here to set up worker node's SharedInfo */
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
node->ioss_ScanDesc =
|
node->ioss_ScanDesc =
|
||||||
index_beginscan_parallel(node->ss.ss_currentRelation,
|
index_beginscan_parallel(node->ss.ss_currentRelation,
|
||||||
node->ioss_RelationDesc,
|
node->ioss_RelationDesc,
|
||||||
|
|
@ -879,6 +836,73 @@ ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node,
|
||||||
node->ioss_OrderByKeys, node->ioss_NumOrderByKeys);
|
node->ioss_OrderByKeys, node->ioss_NumOrderByKeys);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Compute the amount of space we'll need for the shared instrumentation and
|
||||||
|
* inform pcxt->estimator.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ExecIndexOnlyScanInstrumentEstimate(IndexOnlyScanState *node,
|
||||||
|
ParallelContext *pcxt)
|
||||||
|
{
|
||||||
|
Size size;
|
||||||
|
|
||||||
|
if (!node->ss.ps.instrument || pcxt->nworkers == 0)
|
||||||
|
return;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This size calculation is trivial enough that we don't bother saving it
|
||||||
|
* in the IndexOnlyScanState. We'll recalculate the needed size in
|
||||||
|
* ExecIndexOnlyScanInstrumentInitDSM().
|
||||||
|
*/
|
||||||
|
size = offsetof(SharedIndexScanInstrumentation, winstrument) +
|
||||||
|
pcxt->nworkers * sizeof(IndexScanInstrumentation);
|
||||||
|
shm_toc_estimate_chunk(&pcxt->estimator, size);
|
||||||
|
shm_toc_estimate_keys(&pcxt->estimator, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Set up parallel index-only scan instrumentation.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ExecIndexOnlyScanInstrumentInitDSM(IndexOnlyScanState *node,
|
||||||
|
ParallelContext *pcxt)
|
||||||
|
{
|
||||||
|
Size size;
|
||||||
|
|
||||||
|
if (!node->ss.ps.instrument || pcxt->nworkers == 0)
|
||||||
|
return;
|
||||||
|
|
||||||
|
size = offsetof(SharedIndexScanInstrumentation, winstrument) +
|
||||||
|
pcxt->nworkers * sizeof(IndexScanInstrumentation);
|
||||||
|
node->ioss_SharedInfo =
|
||||||
|
(SharedIndexScanInstrumentation *) shm_toc_allocate(pcxt->toc, size);
|
||||||
|
|
||||||
|
/* Each per-worker area must start out as zeroes */
|
||||||
|
memset(node->ioss_SharedInfo, 0, size);
|
||||||
|
node->ioss_SharedInfo->num_workers = pcxt->nworkers;
|
||||||
|
shm_toc_insert(pcxt->toc,
|
||||||
|
node->ss.ps.plan->plan_node_id +
|
||||||
|
PARALLEL_KEY_SCAN_INSTRUMENT_OFFSET,
|
||||||
|
node->ioss_SharedInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Look up and save the location of the shared instrumentation.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ExecIndexOnlyScanInstrumentInitWorker(IndexOnlyScanState *node,
|
||||||
|
ParallelWorkerContext *pwcxt)
|
||||||
|
{
|
||||||
|
if (!node->ss.ps.instrument)
|
||||||
|
return;
|
||||||
|
|
||||||
|
node->ioss_SharedInfo = (SharedIndexScanInstrumentation *)
|
||||||
|
shm_toc_lookup(pwcxt->toc,
|
||||||
|
node->ss.ps.plan->plan_node_id +
|
||||||
|
PARALLEL_KEY_SCAN_INSTRUMENT_OFFSET,
|
||||||
|
false);
|
||||||
|
}
|
||||||
|
|
||||||
/* ----------------------------------------------------------------
|
/* ----------------------------------------------------------------
|
||||||
* ExecIndexOnlyScanRetrieveInstrumentation
|
* ExecIndexOnlyScanRetrieveInstrumentation
|
||||||
*
|
*
|
||||||
|
|
|
||||||
|
|
@ -1674,21 +1674,11 @@ ExecIndexScanEstimate(IndexScanState *node,
|
||||||
ParallelContext *pcxt)
|
ParallelContext *pcxt)
|
||||||
{
|
{
|
||||||
EState *estate = node->ss.ps.state;
|
EState *estate = node->ss.ps.state;
|
||||||
bool instrument = node->ss.ps.instrument != NULL;
|
|
||||||
bool parallel_aware = node->ss.ps.plan->parallel_aware;
|
|
||||||
|
|
||||||
if (!instrument && !parallel_aware)
|
|
||||||
{
|
|
||||||
/* No DSM required by the scan */
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
node->iss_PscanLen = index_parallelscan_estimate(node->iss_RelationDesc,
|
node->iss_PscanLen = index_parallelscan_estimate(node->iss_RelationDesc,
|
||||||
node->iss_NumScanKeys,
|
node->iss_NumScanKeys,
|
||||||
node->iss_NumOrderByKeys,
|
node->iss_NumOrderByKeys,
|
||||||
estate->es_snapshot,
|
estate->es_snapshot);
|
||||||
instrument, parallel_aware,
|
|
||||||
pcxt->nworkers);
|
|
||||||
shm_toc_estimate_chunk(&pcxt->estimator, node->iss_PscanLen);
|
shm_toc_estimate_chunk(&pcxt->estimator, node->iss_PscanLen);
|
||||||
shm_toc_estimate_keys(&pcxt->estimator, 1);
|
shm_toc_estimate_keys(&pcxt->estimator, 1);
|
||||||
}
|
}
|
||||||
|
|
@ -1705,29 +1695,14 @@ ExecIndexScanInitializeDSM(IndexScanState *node,
|
||||||
{
|
{
|
||||||
EState *estate = node->ss.ps.state;
|
EState *estate = node->ss.ps.state;
|
||||||
ParallelIndexScanDesc piscan;
|
ParallelIndexScanDesc piscan;
|
||||||
bool instrument = node->ss.ps.instrument != NULL;
|
|
||||||
bool parallel_aware = node->ss.ps.plan->parallel_aware;
|
|
||||||
|
|
||||||
if (!instrument && !parallel_aware)
|
|
||||||
{
|
|
||||||
/* No DSM required by the scan */
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
piscan = shm_toc_allocate(pcxt->toc, node->iss_PscanLen);
|
piscan = shm_toc_allocate(pcxt->toc, node->iss_PscanLen);
|
||||||
index_parallelscan_initialize(node->ss.ss_currentRelation,
|
index_parallelscan_initialize(node->ss.ss_currentRelation,
|
||||||
node->iss_RelationDesc,
|
node->iss_RelationDesc,
|
||||||
estate->es_snapshot,
|
estate->es_snapshot,
|
||||||
instrument, parallel_aware, pcxt->nworkers,
|
piscan);
|
||||||
&node->iss_SharedInfo, piscan);
|
|
||||||
shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, piscan);
|
shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, piscan);
|
||||||
|
|
||||||
if (!parallel_aware)
|
|
||||||
{
|
|
||||||
/* Only here to initialize SharedInfo in DSM */
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
node->iss_ScanDesc =
|
node->iss_ScanDesc =
|
||||||
index_beginscan_parallel(node->ss.ss_currentRelation,
|
index_beginscan_parallel(node->ss.ss_currentRelation,
|
||||||
node->iss_RelationDesc,
|
node->iss_RelationDesc,
|
||||||
|
|
@ -1773,27 +1748,9 @@ ExecIndexScanInitializeWorker(IndexScanState *node,
|
||||||
ParallelWorkerContext *pwcxt)
|
ParallelWorkerContext *pwcxt)
|
||||||
{
|
{
|
||||||
ParallelIndexScanDesc piscan;
|
ParallelIndexScanDesc piscan;
|
||||||
bool instrument = node->ss.ps.instrument != NULL;
|
|
||||||
bool parallel_aware = node->ss.ps.plan->parallel_aware;
|
|
||||||
|
|
||||||
if (!instrument && !parallel_aware)
|
|
||||||
{
|
|
||||||
/* No DSM required by the scan */
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
piscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
|
piscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
|
||||||
|
|
||||||
if (instrument)
|
|
||||||
node->iss_SharedInfo = (SharedIndexScanInstrumentation *)
|
|
||||||
OffsetToPointer(piscan, piscan->ps_offset_ins);
|
|
||||||
|
|
||||||
if (!parallel_aware)
|
|
||||||
{
|
|
||||||
/* Only here to set up worker node's SharedInfo */
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
node->iss_ScanDesc =
|
node->iss_ScanDesc =
|
||||||
index_beginscan_parallel(node->ss.ss_currentRelation,
|
index_beginscan_parallel(node->ss.ss_currentRelation,
|
||||||
node->iss_RelationDesc,
|
node->iss_RelationDesc,
|
||||||
|
|
@ -1814,6 +1771,73 @@ ExecIndexScanInitializeWorker(IndexScanState *node,
|
||||||
node->iss_OrderByKeys, node->iss_NumOrderByKeys);
|
node->iss_OrderByKeys, node->iss_NumOrderByKeys);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Compute the amount of space we'll need for the shared instrumentation and
|
||||||
|
* inform pcxt->estimator.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ExecIndexScanInstrumentEstimate(IndexScanState *node,
|
||||||
|
ParallelContext *pcxt)
|
||||||
|
{
|
||||||
|
Size size;
|
||||||
|
|
||||||
|
if (!node->ss.ps.instrument || pcxt->nworkers == 0)
|
||||||
|
return;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This size calculation is trivial enough that we don't bother saving it
|
||||||
|
* in the IndexScanState. We'll recalculate the needed size in
|
||||||
|
* ExecIndexScanInstrumentInitDSM().
|
||||||
|
*/
|
||||||
|
size = offsetof(SharedIndexScanInstrumentation, winstrument) +
|
||||||
|
pcxt->nworkers * sizeof(IndexScanInstrumentation);
|
||||||
|
shm_toc_estimate_chunk(&pcxt->estimator, size);
|
||||||
|
shm_toc_estimate_keys(&pcxt->estimator, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Set up parallel index scan instrumentation.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ExecIndexScanInstrumentInitDSM(IndexScanState *node,
|
||||||
|
ParallelContext *pcxt)
|
||||||
|
{
|
||||||
|
Size size;
|
||||||
|
|
||||||
|
if (!node->ss.ps.instrument || pcxt->nworkers == 0)
|
||||||
|
return;
|
||||||
|
|
||||||
|
size = offsetof(SharedIndexScanInstrumentation, winstrument) +
|
||||||
|
pcxt->nworkers * sizeof(IndexScanInstrumentation);
|
||||||
|
node->iss_SharedInfo =
|
||||||
|
(SharedIndexScanInstrumentation *) shm_toc_allocate(pcxt->toc, size);
|
||||||
|
|
||||||
|
/* Each per-worker area must start out as zeroes */
|
||||||
|
memset(node->iss_SharedInfo, 0, size);
|
||||||
|
node->iss_SharedInfo->num_workers = pcxt->nworkers;
|
||||||
|
shm_toc_insert(pcxt->toc,
|
||||||
|
node->ss.ps.plan->plan_node_id +
|
||||||
|
PARALLEL_KEY_SCAN_INSTRUMENT_OFFSET,
|
||||||
|
node->iss_SharedInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Look up and save the location of the shared instrumentation.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ExecIndexScanInstrumentInitWorker(IndexScanState *node,
|
||||||
|
ParallelWorkerContext *pwcxt)
|
||||||
|
{
|
||||||
|
if (!node->ss.ps.instrument)
|
||||||
|
return;
|
||||||
|
|
||||||
|
node->iss_SharedInfo = (SharedIndexScanInstrumentation *)
|
||||||
|
shm_toc_lookup(pwcxt->toc,
|
||||||
|
node->ss.ps.plan->plan_node_id +
|
||||||
|
PARALLEL_KEY_SCAN_INSTRUMENT_OFFSET,
|
||||||
|
false);
|
||||||
|
}
|
||||||
|
|
||||||
/* ----------------------------------------------------------------
|
/* ----------------------------------------------------------------
|
||||||
* ExecIndexScanRetrieveInstrumentation
|
* ExecIndexScanRetrieveInstrumentation
|
||||||
*
|
*
|
||||||
|
|
|
||||||
|
|
@ -171,14 +171,9 @@ extern void index_endscan(IndexScanDesc scan);
|
||||||
extern void index_markpos(IndexScanDesc scan);
|
extern void index_markpos(IndexScanDesc scan);
|
||||||
extern void index_restrpos(IndexScanDesc scan);
|
extern void index_restrpos(IndexScanDesc scan);
|
||||||
extern Size index_parallelscan_estimate(Relation indexRelation,
|
extern Size index_parallelscan_estimate(Relation indexRelation,
|
||||||
int nkeys, int norderbys, Snapshot snapshot,
|
int nkeys, int norderbys, Snapshot snapshot);
|
||||||
bool instrument, bool parallel_aware,
|
|
||||||
int nworkers);
|
|
||||||
extern void index_parallelscan_initialize(Relation heapRelation,
|
extern void index_parallelscan_initialize(Relation heapRelation,
|
||||||
Relation indexRelation, Snapshot snapshot,
|
Relation indexRelation, Snapshot snapshot,
|
||||||
bool instrument, bool parallel_aware,
|
|
||||||
int nworkers,
|
|
||||||
SharedIndexScanInstrumentation **sharedinfo,
|
|
||||||
ParallelIndexScanDesc target);
|
ParallelIndexScanDesc target);
|
||||||
extern void index_parallelrescan(IndexScanDesc scan);
|
extern void index_parallelrescan(IndexScanDesc scan);
|
||||||
extern IndexScanDesc index_beginscan_parallel(Relation heaprel,
|
extern IndexScanDesc index_beginscan_parallel(Relation heaprel,
|
||||||
|
|
|
||||||
|
|
@ -203,7 +203,6 @@ typedef struct ParallelIndexScanDescData
|
||||||
{
|
{
|
||||||
RelFileLocator ps_locator; /* physical table relation to scan */
|
RelFileLocator ps_locator; /* physical table relation to scan */
|
||||||
RelFileLocator ps_indexlocator; /* physical index relation to scan */
|
RelFileLocator ps_indexlocator; /* physical index relation to scan */
|
||||||
Size ps_offset_ins; /* Offset to SharedIndexScanInstrumentation */
|
|
||||||
Size ps_offset_am; /* Offset to am-specific structure */
|
Size ps_offset_am; /* Offset to am-specific structure */
|
||||||
char ps_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
|
char ps_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
|
||||||
} ParallelIndexScanDescData;
|
} ParallelIndexScanDescData;
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,14 @@
|
||||||
#ifndef INSTRUMENT_NODE_H
|
#ifndef INSTRUMENT_NODE_H
|
||||||
#define INSTRUMENT_NODE_H
|
#define INSTRUMENT_NODE_H
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Offset added to plan_node_id to create a second TOC key for per-worker scan
|
||||||
|
* instrumentation. Instrumentation and parallel-awareness are independent, so
|
||||||
|
* separate DSM chunks let each be allocated and initialized only when needed.
|
||||||
|
* In the future, if nodes need more DSM allocations, we would need a more
|
||||||
|
* robust system.
|
||||||
|
*/
|
||||||
|
#define PARALLEL_KEY_SCAN_INSTRUMENT_OFFSET UINT64CONST(0xD000000000000000)
|
||||||
|
|
||||||
/* ---------------------
|
/* ---------------------
|
||||||
* Instrumentation information for aggregate function execution
|
* Instrumentation information for aggregate function execution
|
||||||
|
|
|
||||||
|
|
@ -32,6 +32,12 @@ extern void ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node,
|
||||||
ParallelContext *pcxt);
|
ParallelContext *pcxt);
|
||||||
extern void ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node,
|
extern void ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node,
|
||||||
ParallelWorkerContext *pwcxt);
|
ParallelWorkerContext *pwcxt);
|
||||||
|
extern void ExecIndexOnlyScanInstrumentEstimate(IndexOnlyScanState *node,
|
||||||
|
ParallelContext *pcxt);
|
||||||
|
extern void ExecIndexOnlyScanInstrumentInitDSM(IndexOnlyScanState *node,
|
||||||
|
ParallelContext *pcxt);
|
||||||
|
extern void ExecIndexOnlyScanInstrumentInitWorker(IndexOnlyScanState *node,
|
||||||
|
ParallelWorkerContext *pwcxt);
|
||||||
extern void ExecIndexOnlyScanRetrieveInstrumentation(IndexOnlyScanState *node);
|
extern void ExecIndexOnlyScanRetrieveInstrumentation(IndexOnlyScanState *node);
|
||||||
|
|
||||||
#endif /* NODEINDEXONLYSCAN_H */
|
#endif /* NODEINDEXONLYSCAN_H */
|
||||||
|
|
|
||||||
|
|
@ -28,6 +28,12 @@ extern void ExecIndexScanInitializeDSM(IndexScanState *node, ParallelContext *pc
|
||||||
extern void ExecIndexScanReInitializeDSM(IndexScanState *node, ParallelContext *pcxt);
|
extern void ExecIndexScanReInitializeDSM(IndexScanState *node, ParallelContext *pcxt);
|
||||||
extern void ExecIndexScanInitializeWorker(IndexScanState *node,
|
extern void ExecIndexScanInitializeWorker(IndexScanState *node,
|
||||||
ParallelWorkerContext *pwcxt);
|
ParallelWorkerContext *pwcxt);
|
||||||
|
extern void ExecIndexScanInstrumentEstimate(IndexScanState *node,
|
||||||
|
ParallelContext *pcxt);
|
||||||
|
extern void ExecIndexScanInstrumentInitDSM(IndexScanState *node,
|
||||||
|
ParallelContext *pcxt);
|
||||||
|
extern void ExecIndexScanInstrumentInitWorker(IndexScanState *node,
|
||||||
|
ParallelWorkerContext *pwcxt);
|
||||||
extern void ExecIndexScanRetrieveInstrumentation(IndexScanState *node);
|
extern void ExecIndexScanRetrieveInstrumentation(IndexScanState *node);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue