pg_stash_advice: Allow stashed advice to be persisted to disk.

If pg_stash_advice.persist = true, stashed advice will be written to
pg_stash_advice.tsv in the data directory, periodically and at
shutdown. On restart, stash modifications are locked out until this
file has been reloaded, but queries will not be, so there may be a
short window after startup during which previously-stashed advice is
not automatically applied.

Author: Robert Haas <rhaas@postgresql.org>
Co-authored-by: Lukas Fittl <lukas@fittl.com>
Discussion: https://postgr.es/m/CA+Tgmob87qsWa-VugofU6epuV0H5XjWZGMbQas4Q-ADKmvSyBg@mail.gmail.com
This commit is contained in:
Robert Haas 2026-04-07 10:11:25 -04:00
parent 29e7dbf5e4
commit c10edb102a
10 changed files with 1192 additions and 7 deletions

View file

@ -4,13 +4,15 @@ MODULE_big = pg_stash_advice
OBJS = \
$(WIN32RES) \
pg_stash_advice.o \
stashfuncs.o
stashfuncs.o \
stashpersist.o
EXTENSION = pg_stash_advice
DATA = pg_stash_advice--1.0.sql
PGFILEDESC = "pg_stash_advice - store and automatically apply plan advice"
REGRESS = pg_stash_advice pg_stash_advice_utf8
TAP_TESTS = 1
EXTRA_INSTALL = contrib/pg_plan_advice
ifdef USE_PGXS

View file

@ -2,7 +2,8 @@
pg_stash_advice_sources = files(
'pg_stash_advice.c',
'stashfuncs.c'
'stashfuncs.c',
'stashpersist.c'
)
if host_system == 'windows'
@ -34,4 +35,9 @@ tests += {
'pg_stash_advice_utf8',
],
},
'tap': {
'tests': [
't/001_persist.pl',
],
},
}

View file

@ -36,8 +36,14 @@ RETURNS SETOF record
AS 'MODULE_PATHNAME', 'pg_get_advice_stash_contents'
LANGUAGE C;
CREATE FUNCTION pg_start_stash_advice_worker()
RETURNS void
AS 'MODULE_PATHNAME', 'pg_start_stash_advice_worker'
LANGUAGE C STRICT;
REVOKE ALL ON FUNCTION pg_create_advice_stash(text) FROM PUBLIC;
REVOKE ALL ON FUNCTION pg_drop_advice_stash(text) FROM PUBLIC;
REVOKE ALL ON FUNCTION pg_get_advice_stash_contents(text) FROM PUBLIC;
REVOKE ALL ON FUNCTION pg_get_advice_stashes() FROM PUBLIC;
REVOKE ALL ON FUNCTION pg_set_stashed_advice(text, bigint, text) FROM PUBLIC;
REVOKE ALL ON FUNCTION pg_start_stash_advice_worker() FROM PUBLIC;

View file

@ -13,9 +13,11 @@
#include "common/hashfn.h"
#include "common/string.h"
#include "miscadmin.h"
#include "nodes/queryjumble.h"
#include "pg_plan_advice.h"
#include "pg_stash_advice.h"
#include "postmaster/bgworker.h"
#include "storage/dsm_registry.h"
#include "utils/guc.h"
#include "utils/memutils.h"
@ -41,8 +43,10 @@ static dshash_parameters pgsa_entry_dshash_parameters = {
LWTRANCHE_INVALID /* gets set at runtime */
};
/* GUC variable */
/* GUC variables */
static char *pg_stash_advice_stash_name = "";
bool pg_stash_advice_persist = true;
int pg_stash_advice_persist_interval = 30;
/* Shared memory pointers */
pgsa_shared_state *pgsa_state;
@ -87,6 +91,33 @@ _PG_init(void)
EnableQueryId();
/* Define our GUCs. */
if (process_shared_preload_libraries_in_progress)
DefineCustomBoolVariable("pg_stash_advice.persist",
"Save and restore advice stash contents across restarts.",
NULL,
&pg_stash_advice_persist,
true,
PGC_POSTMASTER,
0,
NULL,
NULL,
NULL);
else
pg_stash_advice_persist = false;
DefineCustomIntVariable("pg_stash_advice.persist_interval",
"Interval between advice stash saves, in seconds.",
NULL,
&pg_stash_advice_persist_interval,
30,
0,
3600,
PGC_SIGHUP,
GUC_UNIT_S,
NULL,
NULL,
NULL);
DefineCustomStringVariable("pg_stash_advice.stash_name",
"Name of the advice stash to be used in this session.",
NULL,
@ -100,6 +131,10 @@ _PG_init(void)
MarkGUCPrefixReserved("pg_stash_advice");
/* Start the background worker for persistence, if enabled. */
if (pg_stash_advice_persist)
pgsa_start_worker();
/* Tell pg_plan_advice that we want to provide advice strings. */
add_advisor_fn =
load_external_function("pg_plan_advice", "pg_plan_advice_add_advisor",
@ -131,6 +166,10 @@ pgsa_advisor(PlannerGlobal *glob, Query *parse,
if (unlikely(pgsa_entry_dshash == NULL))
pgsa_attach();
/* If stash data is still being restored from disk, ignore. */
if (pg_atomic_unlocked_test_flag(&pgsa_state->stashes_ready))
return NULL;
/*
* Translate pg_stash_advice.stash_name to an integer ID.
*
@ -279,6 +318,19 @@ pgsa_attach(void)
MemoryContextSwitchTo(oldcontext);
}
/*
* Error out if the stashes have not been loaded from disk yet.
*/
void
pgsa_check_lockout(void)
{
if (pg_atomic_unlocked_test_flag(&pgsa_state->stashes_ready))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("stash modifications are not allowed because \"%s\" has not been loaded yet",
PGSA_DUMP_FILE)));
}
/*
* Check whether an advice stash name is legal, and signal an error if not.
*
@ -383,6 +435,9 @@ pgsa_create_stash(char *stash_name)
errmsg("advice stash \"%s\" already exists", stash_name));
stash->pgsa_stash_id = pgsa_state->next_stash_id++;
dshash_release_lock(pgsa_stash_dshash, stash);
/* Bump change count. */
pg_atomic_add_fetch_u64(&pgsa_state->change_count, 1);
}
/*
@ -423,6 +478,9 @@ pgsa_clear_advice_string(char *stash_name, int64 queryId)
/* Now we free the advice string as well, if there was one. */
if (old_dp != InvalidDsaPointer)
dsa_free(pgsa_dsa_area, old_dp);
/* Bump change count. */
pg_atomic_add_fetch_u64(&pgsa_state->change_count, 1);
}
/*
@ -464,6 +522,43 @@ pgsa_drop_stash(char *stash_name)
}
}
dshash_seq_term(&iterator);
/* Bump change count. */
pg_atomic_add_fetch_u64(&pgsa_state->change_count, 1);
}
/*
* Remove all stashes and entries from shared memory.
*
* This is intended to be called before reloading from a dump file, so that
* a failed previous attempt doesn't leave stale data behind.
*/
void
pgsa_reset_all_stashes(void)
{
dshash_seq_status iter;
pgsa_entry *entry;
Assert(LWLockHeldByMeInMode(&pgsa_state->lock, LW_EXCLUSIVE));
/* Remove all stashes. */
dshash_seq_init(&iter, pgsa_stash_dshash, true);
while (dshash_seq_next(&iter) != NULL)
dshash_delete_current(&iter);
dshash_seq_term(&iter);
/* Remove all entries. */
dshash_seq_init(&iter, pgsa_entry_dshash, true);
while ((entry = dshash_seq_next(&iter)) != NULL)
{
if (entry->advice_string != InvalidDsaPointer)
dsa_free(pgsa_dsa_area, entry->advice_string);
dshash_delete_current(&iter);
}
dshash_seq_term(&iter);
/* Reset the stash ID counter. */
pgsa_state->next_stash_id = UINT64CONST(1);
}
/*
@ -483,6 +578,23 @@ pgsa_init_shared_state(void *ptr, void *arg)
state->area = DSA_HANDLE_INVALID;
state->stash_hash = DSHASH_HANDLE_INVALID;
state->entry_hash = DSHASH_HANDLE_INVALID;
state->bgworker_pid = InvalidPid;
pg_atomic_init_flag(&state->stashes_ready);
pg_atomic_init_u64(&state->change_count, 0);
/*
* If this module was loaded via shared_preload_libraries, then
* pg_stash_advice_persist is a GUC variable. If it's true, that means
* that we should lock out manual stash modifications until the dump file
* has been successfully loaded. If it's false, there's nothing to load,
* so we set stashes_ready immediately.
*
* If this module was not loaded via shared_preload_libraries, then
* pg_stash_advice_persist is not a GUC variable, but it will be false,
* which leads to the correct behavior.
*/
if (!pg_stash_advice_persist)
pg_atomic_test_set_flag(&state->stashes_ready);
}
/*
@ -602,4 +714,60 @@ pgsa_set_advice_string(char *stash_name, int64 queryId, char *advice_string)
*/
if (DsaPointerIsValid(old_dp))
dsa_free(pgsa_dsa_area, old_dp);
/* Bump change count. */
pg_atomic_add_fetch_u64(&pgsa_state->change_count, 1);
}
/*
* Start our worker process.
*/
void
pgsa_start_worker(void)
{
BackgroundWorker worker = {0};
BackgroundWorkerHandle *handle;
BgwHandleStatus status;
pid_t pid;
worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
worker.bgw_start_time = BgWorkerStart_ConsistentState;
worker.bgw_restart_time = BGW_DEFAULT_RESTART_INTERVAL;
strcpy(worker.bgw_library_name, "pg_stash_advice");
strcpy(worker.bgw_function_name, "pg_stash_advice_worker_main");
strcpy(worker.bgw_name, "pg_stash_advice worker");
strcpy(worker.bgw_type, "pg_stash_advice worker");
/*
* If process_shared_preload_libraries_in_progress = true, we may be in
* the postmaster, in which case this will really register the worker, or
* we may be in a child process in an EXEC_BACKEND build, in which case it
* will silently do nothing (which is the correct behavior).
*/
if (process_shared_preload_libraries_in_progress)
{
RegisterBackgroundWorker(&worker);
return;
}
/*
* If process_shared_preload_libraries_in_progress = false, we're being
* asked to start the worker after system startup time. In other words,
* unless this is single-user mode, we're not in the postmaster, so we
* should use RegisterDynamicBackgroundWorker and then wait for startup to
* complete. (If we do happen to be in single-user mode, this will error
* out, which is fine.)
*/
worker.bgw_notify_pid = MyProcPid;
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("could not register background process"),
errhint("You may need to increase \"max_worker_processes\".")));
status = WaitForBackgroundWorkerStartup(handle, &pid);
if (status != BGWH_STARTED)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("could not start background process"),
errhint("More details may be available in the server log.")));
}

View file

@ -22,6 +22,8 @@
#include "lib/dshash.h"
#include "storage/lwlock.h"
#define PGSA_DUMP_FILE "pg_stash_advice.tsv"
/*
* The key that we use to find a particular stash entry.
*/
@ -62,6 +64,9 @@ typedef struct pgsa_shared_state
dsa_handle area;
dshash_table_handle stash_hash;
dshash_table_handle entry_hash;
pid_t bgworker_pid;
pg_atomic_flag stashes_ready;
pg_atomic_uint64 change_count;
} pgsa_shared_state;
/* For stash ID -> stash name hash table */
@ -86,14 +91,21 @@ extern dsa_area *pgsa_dsa_area;
extern dshash_table *pgsa_stash_dshash;
extern dshash_table *pgsa_entry_dshash;
/* GUC variables */
extern bool pg_stash_advice_persist;
extern int pg_stash_advice_persist_interval;
/* Function prototypes */
extern void pgsa_attach(void);
extern void pgsa_check_lockout(void);
extern void pgsa_check_stash_name(char *stash_name);
extern void pgsa_clear_advice_string(char *stash_name, int64 queryId);
extern void pgsa_create_stash(char *stash_name);
extern void pgsa_drop_stash(char *stash_name);
extern uint64 pgsa_lookup_stash_id(char *stash_name);
extern void pgsa_reset_all_stashes(void);
extern void pgsa_set_advice_string(char *stash_name, int64 queryId,
char *advice_string);
extern void pgsa_start_worker(void);
#endif

View file

@ -14,6 +14,7 @@
#include "common/hashfn.h"
#include "fmgr.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "pg_stash_advice.h"
#include "utils/builtins.h"
#include "utils/tuplestore.h"
@ -23,6 +24,7 @@ PG_FUNCTION_INFO_V1(pg_drop_advice_stash);
PG_FUNCTION_INFO_V1(pg_get_advice_stash_contents);
PG_FUNCTION_INFO_V1(pg_get_advice_stashes);
PG_FUNCTION_INFO_V1(pg_set_stashed_advice);
PG_FUNCTION_INFO_V1(pg_start_stash_advice_worker);
typedef struct pgsa_stash_count
{
@ -53,6 +55,7 @@ pg_create_advice_stash(PG_FUNCTION_ARGS)
pgsa_check_stash_name(stash_name);
if (unlikely(pgsa_entry_dshash == NULL))
pgsa_attach();
pgsa_check_lockout();
LWLockAcquire(&pgsa_state->lock, LW_EXCLUSIVE);
pgsa_create_stash(stash_name);
LWLockRelease(&pgsa_state->lock);
@ -70,6 +73,7 @@ pg_drop_advice_stash(PG_FUNCTION_ARGS)
pgsa_check_stash_name(stash_name);
if (unlikely(pgsa_entry_dshash == NULL))
pgsa_attach();
pgsa_check_lockout();
LWLockAcquire(&pgsa_state->lock, LW_EXCLUSIVE);
pgsa_drop_stash(stash_name);
LWLockRelease(&pgsa_state->lock);
@ -94,6 +98,10 @@ pg_get_advice_stashes(PG_FUNCTION_ARGS)
if (unlikely(pgsa_entry_dshash == NULL))
pgsa_attach();
/* If stash data is still being restored from disk, ignore. */
if (pg_atomic_unlocked_test_flag(&pgsa_state->stashes_ready))
return (Datum) 0;
/* Tally up the number of entries per stash. */
chash = pgsa_stash_count_table_create(CurrentMemoryContext, 64, NULL);
dshash_seq_init(&iterator, pgsa_entry_dshash, true);
@ -154,6 +162,10 @@ pg_get_advice_stash_contents(PG_FUNCTION_ARGS)
if (unlikely(pgsa_entry_dshash == NULL))
pgsa_attach();
/* If stash data is still being restored from disk, ignore. */
if (pg_atomic_unlocked_test_flag(&pgsa_state->stashes_ready))
return (Datum) 0;
/* User can pass NULL for all stashes, or the name of a specific stash. */
if (!PG_ARGISNULL(0))
{
@ -286,6 +298,9 @@ pg_set_stashed_advice(PG_FUNCTION_ARGS)
if (unlikely(pgsa_entry_dshash == NULL))
pgsa_attach();
/* Don't allow writes if stash data is still being restored from disk. */
pgsa_check_lockout();
/* Now call the appropriate function to do the real work. */
if (PG_ARGISNULL(2))
{
@ -304,3 +319,29 @@ pg_set_stashed_advice(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
/*
* SQL-callable function to start the persistence background worker.
*/
Datum
pg_start_stash_advice_worker(PG_FUNCTION_ARGS)
{
pid_t pid;
if (unlikely(pgsa_entry_dshash == NULL))
pgsa_attach();
LWLockAcquire(&pgsa_state->lock, LW_SHARED);
pid = pgsa_state->bgworker_pid;
LWLockRelease(&pgsa_state->lock);
if (pid != InvalidPid)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("pg_stash_advice worker is already running under PID %d",
(int) pid)));
pgsa_start_worker();
PG_RETURN_VOID();
}

View file

@ -0,0 +1,800 @@
/*-------------------------------------------------------------------------
*
* stashpersist.c
* Persistence support for pg_stash_advice.
*
* Copyright (c) 2016-2026, PostgreSQL Global Development Group
*
* contrib/pg_stash_advice/stashpersist.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <sys/stat.h>
#include "common/hashfn.h"
#include "miscadmin.h"
#include "pg_stash_advice.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/proc.h"
#include "storage/procsignal.h"
#include "utils/backend_status.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/timestamp.h"
typedef struct pgsa_writer_context
{
char pathname[MAXPGPATH];
FILE *file;
pgsa_stash_name_table_hash *nhash;
StringInfoData buf;
int entries_written;
} pgsa_writer_context;
/*
* A parsed entry line, with pointers into the slurp buffer.
*/
typedef struct pgsa_saved_entry
{
char *stash_name;
int64 queryId;
char *advice_string;
} pgsa_saved_entry;
/*
* simplehash for detecting duplicate stash names during parsing.
* Keyed by stash name (char *), pointing into the slurp buffer.
*/
typedef struct pgsa_saved_stash
{
uint32 status;
char *name;
} pgsa_saved_stash;
#define SH_PREFIX pgsa_saved_stash_table
#define SH_ELEMENT_TYPE pgsa_saved_stash
#define SH_KEY_TYPE char *
#define SH_KEY name
#define SH_HASH_KEY(tb, key) hash_bytes((const unsigned char *) (key), strlen(key))
#define SH_EQUAL(tb, a, b) (strcmp(a, b) == 0)
#define SH_SCOPE static inline
#define SH_DEFINE
#define SH_DECLARE
#include "lib/simplehash.h"
extern PGDLLEXPORT void pg_stash_advice_worker_main(Datum main_arg);
static void pgsa_append_tsv_escaped_string(StringInfo buf, const char *str);
static void pgsa_detach_shmem(int code, Datum arg);
static char *pgsa_next_tsv_field(char **cursor);
static void pgsa_read_from_disk(void);
static void pgsa_restore_entries(pgsa_saved_entry *entries, int num_entries);
static void pgsa_restore_stashes(pgsa_saved_stash_table_hash *saved_stashes);
static void pgsa_unescape_tsv_field(char *str, const char *filename,
unsigned lineno);
static void pgsa_write_entries(pgsa_writer_context *wctx);
pg_noreturn static void pgsa_write_error(pgsa_writer_context *wctx);
static void pgsa_write_stashes(pgsa_writer_context *wctx);
static void pgsa_write_to_disk(void);
/*
* Background worker entry point for pg_stash_advice persistence.
*
* On startup, if load_from_disk_pending is set, we load previously saved
* stash data from disk. Then we enter a loop, periodically checking whether
* any changes have been made (via the change_count atomic counter) and
* writing them to disk. On shutdown, we perform a final write.
*/
PGDLLEXPORT void
pg_stash_advice_worker_main(Datum main_arg)
{
uint64 last_change_count;
TimestampTz last_write_time = 0;
/* Establish signal handlers; once that's done, unblock signals. */
pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
BackgroundWorkerUnblockSignals();
/* Log a debug message */
ereport(DEBUG1,
errmsg("pg_stash_advice worker started"));
/* Set up session user so pgstat can report it. */
InitializeSessionUserIdStandalone();
/* Report this worker in pg_stat_activity. */
pgstat_beinit();
pgstat_bestart_initial();
pgstat_bestart_final();
/* Attach to shared memory structures. */
pgsa_attach();
/* Set on-detach hook so that our PID will be cleared on exit. */
before_shmem_exit(pgsa_detach_shmem, 0);
/*
* Store our PID in shared memory, unless there's already another worker
* running, in which case just exit.
*/
LWLockAcquire(&pgsa_state->lock, LW_EXCLUSIVE);
if (pgsa_state->bgworker_pid != InvalidPid)
{
LWLockRelease(&pgsa_state->lock);
ereport(LOG,
(errmsg("pg_stash_advice worker is already running under PID %d",
(int) pgsa_state->bgworker_pid)));
return;
}
pgsa_state->bgworker_pid = MyProcPid;
LWLockRelease(&pgsa_state->lock);
/*
* If pg_stash_advice.persist was set to true during
* process_shared_preload_libraries() and the data has not yet been
* successfully loaded, load it now.
*/
if (pg_atomic_unlocked_test_flag(&pgsa_state->stashes_ready))
{
pgsa_read_from_disk();
pg_atomic_test_set_flag(&pgsa_state->stashes_ready);
}
/* Note the current change count so we can detect future changes. */
last_change_count = pg_atomic_read_u64(&pgsa_state->change_count);
/* Periodically write to disk until terminated. */
while (!ShutdownRequestPending)
{
/* In case of a SIGHUP, just reload the configuration. */
if (ConfigReloadPending)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
}
if (pg_stash_advice_persist_interval <= 0)
{
/* Only writing at shutdown, so just wait forever. */
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
-1L,
PG_WAIT_EXTENSION);
}
else
{
TimestampTz next_write_time;
long delay_in_ms;
uint64 current_change_count;
/* Compute when the next write should happen. */
next_write_time =
TimestampTzPlusMilliseconds(last_write_time,
pg_stash_advice_persist_interval * 1000);
delay_in_ms =
TimestampDifferenceMilliseconds(GetCurrentTimestamp(),
next_write_time);
/*
* When we reach next_write_time, we always update last_write_time
* (which is really the time at which we last considered writing),
* but we only actually write to disk if something has changed.
*/
if (delay_in_ms <= 0)
{
current_change_count =
pg_atomic_read_u64(&pgsa_state->change_count);
if (current_change_count != last_change_count)
{
pgsa_write_to_disk();
last_change_count = current_change_count;
}
last_write_time = GetCurrentTimestamp();
continue;
}
/* Sleep until the next write time. */
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
delay_in_ms,
PG_WAIT_EXTENSION);
}
ResetLatch(MyLatch);
}
/* Write one last time before exiting. */
pgsa_write_to_disk();
}
/*
* Clear our PID from shared memory on exit.
*/
static void
pgsa_detach_shmem(int code, Datum arg)
{
LWLockAcquire(&pgsa_state->lock, LW_EXCLUSIVE);
if (pgsa_state->bgworker_pid == MyProcPid)
pgsa_state->bgworker_pid = InvalidPid;
LWLockRelease(&pgsa_state->lock);
}
/*
* Load advice stash data from a dump file on disk, if there is one.
*/
static void
pgsa_read_from_disk(void)
{
struct stat statbuf;
FILE *file;
char *filebuf;
size_t nread;
char *p;
unsigned lineno;
pgsa_saved_stash_table_hash *saved_stashes;
int num_stashes = 0;
pgsa_saved_entry *entries;
int num_entries = 0;
int max_entries = 64;
MemoryContext tmpcxt;
MemoryContext oldcxt;
Assert(pgsa_entry_dshash != NULL);
/*
* Clear any existing shared memory state.
*
* Normally, there won't be any, but if this function was called before
* and failed after beginning to apply changes to shared memory, then we
* need to get rid of any entries created at that time before trying
* again.
*/
LWLockAcquire(&pgsa_state->lock, LW_EXCLUSIVE);
pgsa_reset_all_stashes();
LWLockRelease(&pgsa_state->lock);
/* Open the dump file. If it doesn't exist, we're done. */
file = AllocateFile(PGSA_DUMP_FILE, "r");
if (!file)
{
if (errno == ENOENT)
return;
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\": %m", PGSA_DUMP_FILE)));
}
/* Use a temporary context for all parse-phase allocations. */
tmpcxt = AllocSetContextCreate(CurrentMemoryContext,
"pg_stash_advice load",
ALLOCSET_DEFAULT_SIZES);
oldcxt = MemoryContextSwitchTo(tmpcxt);
/* Figure out how long the file is. */
if (fstat(fileno(file), &statbuf) != 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not stat file \"%s\": %m", PGSA_DUMP_FILE)));
/*
* Slurp the entire file into memory all at once.
*
* We could avoid this by reading the file incrementally and applying
* changes to pgsa_stash_dshash and pgsa_entry_dshash as we go. Given the
* lockout mechanism implemented by stashes_ready, that shouldn't have any
* user-visible behavioral consequences, but it would consume shared
* memory to no benefit. It seems better to buffer everything in private
* memory first, and then only apply the changes once the file has been
* successfully parsed in its entirety.
*
* That also has the advantage of possibly being more future-proof: if we
* decide to remove the stashes_ready mechanism in the future, or say
* allow for multiple save files, fully validating the file before
* applying any changes will become much more important.
*
* Of course, this approach does have one major disadvantage, which is
* that we'll temporarily use about twice as much memory as we're
* ultimately going to need, but that seems like it shouldn't be a problem
* in practice. If there's so much stashed advice that parsing the disk
* file runs us out of memory, something has gone terribly wrong. In that
* situation, there probably also isn't enough free memory for the
* workload that the advice is attempting to manipulate to run
* successfully.
*/
filebuf = palloc_extended(statbuf.st_size + 1, MCXT_ALLOC_HUGE);
nread = fread(filebuf, 1, statbuf.st_size, file);
if (ferror(file))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read file \"%s\": %m", PGSA_DUMP_FILE)));
FreeFile(file);
filebuf[nread] = '\0';
/* Initial memory allocations. */
saved_stashes = pgsa_saved_stash_table_create(tmpcxt, 64, NULL);
entries = palloc(max_entries * sizeof(pgsa_saved_entry));
/*
* For memory and CPU efficiency, we parse the file in place. The end of
* each line gets replaced with a NUL byte, and then the end of each field
* within a line gets the same treatment. The advice string is unescaped
* in place, and stash names and query IDs can't contain any special
* characters. All of the resulting pointers point right back into the
* buffer; we only need additional memory to grow the 'entries' array and
* the 'saved_stashes' hash table.
*/
for (p = filebuf, lineno = 1; *p != '\0'; lineno++)
{
char *cursor = p;
char *eol;
char *line_type;
/* Find end of line and NUL-terminate. */
eol = strchr(p, '\n');
if (eol != NULL)
{
*eol = '\0';
p = eol + 1;
if (eol > cursor && eol[-1] == '\r')
eol[-1] = '\0';
}
else
p += strlen(p);
/* Skip empty lines. */
if (*cursor == '\0')
continue;
/* First field is the type of line, either "stash" or "entry". */
line_type = pgsa_next_tsv_field(&cursor);
if (strcmp(line_type, "stash") == 0)
{
char *name;
bool found;
/* Second field should be the stash name. */
name = pgsa_next_tsv_field(&cursor);
if (name == NULL || *name == '\0')
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("syntax error in file \"%s\" line %u: expected stash name",
PGSA_DUMP_FILE, lineno)));
/* No further fields are expected. */
if (*cursor != '\0')
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("syntax error in file \"%s\" line %u: expected end of line",
PGSA_DUMP_FILE, lineno)));
/* Duplicate check. */
(void) pgsa_saved_stash_table_insert(saved_stashes, name, &found);
if (found)
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("syntax error in file \"%s\" line %u: duplicate stash name \"%s\"",
PGSA_DUMP_FILE, lineno, name)));
num_stashes++;
}
else if (strcmp(line_type, "entry") == 0)
{
char *stash_name;
char *queryid_str;
char *advice_str;
char *endptr;
int64 queryId;
/* Second field should be the stash name. */
stash_name = pgsa_next_tsv_field(&cursor);
if (stash_name == NULL)
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("syntax error in file \"%s\" line %u: expected stash name",
PGSA_DUMP_FILE, lineno)));
/* Third field should be the query ID. */
queryid_str = pgsa_next_tsv_field(&cursor);
if (queryid_str == NULL)
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("syntax error in file \"%s\" line %u: expected query ID",
PGSA_DUMP_FILE, lineno)));
/* Fourth field should be the advice string. */
advice_str = pgsa_next_tsv_field(&cursor);
if (advice_str == NULL)
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("syntax error in file \"%s\" line %u: expected advice string",
PGSA_DUMP_FILE, lineno)));
/* No further fields are expected. */
if (*cursor != '\0')
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("syntax error in file \"%s\" line %u: expected end of line",
PGSA_DUMP_FILE, lineno)));
/* Make sure the stash is one we've actually seen. */
if (pgsa_saved_stash_table_lookup(saved_stashes,
stash_name) == NULL)
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("syntax error in file \"%s\" line %u: unknown stash \"%s\"",
PGSA_DUMP_FILE, lineno, stash_name)));
/* Parse the query ID. */
errno = 0;
queryId = strtoll(queryid_str, &endptr, 10);
if (*endptr != '\0' || errno != 0 || queryid_str == endptr ||
queryId == 0)
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("syntax error in file \"%s\" line %u: invalid query ID \"%s\"",
PGSA_DUMP_FILE, lineno, queryid_str)));
/* Unescape the advice string. */
pgsa_unescape_tsv_field(advice_str, PGSA_DUMP_FILE, lineno);
/* Append to the entry array. */
if (num_entries >= max_entries)
{
max_entries *= 2;
entries = repalloc(entries,
max_entries * sizeof(pgsa_saved_entry));
}
entries[num_entries].stash_name = stash_name;
entries[num_entries].queryId = queryId;
entries[num_entries].advice_string = advice_str;
num_entries++;
}
else
{
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("syntax error in file \"%s\" line %u: unrecognized line type",
PGSA_DUMP_FILE, lineno)));
}
}
/*
* Parsing succeeded. Apply everything to shared memory.
*
* At this point, we know that the file we just read is fully valid, but
* it's still possible for this to fail if, for example, DSA memory cannot
* be allocated. If that happens, the worker will die, the postmaster will
* eventually restart it, and we'll try again after clearing any data that
* we did manage to put into shared memory. (Note that we call
* pgsa_reset_all_stashes() at the top of this function.)
*/
pgsa_restore_stashes(saved_stashes);
pgsa_restore_entries(entries, num_entries);
/* Hooray, it worked! Notify the user. */
ereport(LOG,
(errmsg("loaded %d advice stashes and %d entries from \"%s\"",
num_stashes, num_entries, PGSA_DUMP_FILE)));
/* Clean up. */
MemoryContextSwitchTo(oldcxt);
MemoryContextDelete(tmpcxt);
}
/*
* Write all advice stash data to disk.
*
* The file format is a simple TSV with a line-type prefix:
* stash\tstash_name
* entry\tstash_name\tquery_id\tadvice_string
*/
static void
pgsa_write_to_disk(void)
{
pgsa_writer_context wctx = {0};
MemoryContext tmpcxt;
MemoryContext oldcxt;
Assert(pgsa_entry_dshash != NULL);
/* Use a temporary context so all allocations are freed at the end. */
tmpcxt = AllocSetContextCreate(CurrentMemoryContext,
"pg_stash_advice dump",
ALLOCSET_DEFAULT_SIZES);
oldcxt = MemoryContextSwitchTo(tmpcxt);
/* Set up the writer context. */
snprintf(wctx.pathname, MAXPGPATH, "%s.tmp", PGSA_DUMP_FILE);
wctx.file = AllocateFile(wctx.pathname, "w");
if (!wctx.file)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\": %m", wctx.pathname)));
wctx.nhash = pgsa_stash_name_table_create(tmpcxt, 64, NULL);
initStringInfo(&wctx.buf);
/* Write stash lines, then entry lines. */
pgsa_write_stashes(&wctx);
pgsa_write_entries(&wctx);
/*
* If nothing was written, remove both the temp file and any existing dump
* file rather than installing a zero-length file.
*/
if (wctx.nhash->members == 0)
{
ereport(DEBUG1,
errmsg("there are no advice stashes to save"));
FreeFile(wctx.file);
unlink(wctx.pathname);
if (unlink(PGSA_DUMP_FILE) == 0)
ereport(DEBUG1,
errmsg("removed \"%s\"", PGSA_DUMP_FILE));
}
else
{
if (FreeFile(wctx.file) != 0)
{
int save_errno = errno;
unlink(wctx.pathname);
errno = save_errno;
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not close file \"%s\": %m",
wctx.pathname)));
}
(void) durable_rename(wctx.pathname, PGSA_DUMP_FILE, ERROR);
ereport(LOG,
errmsg("saved %d advice stashes and %d entries to \"%s\"",
(int) wctx.nhash->members, wctx.entries_written,
PGSA_DUMP_FILE));
}
MemoryContextSwitchTo(oldcxt);
MemoryContextDelete(tmpcxt);
}
/*
* Append the TSV-escaped form of str to buf.
*
* Backslash, tab, newline, and carriage return are escaped with backslash
* sequences. All other characters are passed through unchanged.
*/
static void
pgsa_append_tsv_escaped_string(StringInfo buf, const char *str)
{
for (const char *p = str; *p != '\0'; p++)
{
switch (*p)
{
case '\\':
appendStringInfoString(buf, "\\\\");
break;
case '\t':
appendStringInfoString(buf, "\\t");
break;
case '\n':
appendStringInfoString(buf, "\\n");
break;
case '\r':
appendStringInfoString(buf, "\\r");
break;
default:
appendStringInfoChar(buf, *p);
break;
}
}
}
/*
* Extract the next tab-delimited field from *cursor.
*
* The tab delimiter is replaced with '\0' and *cursor is advanced past it.
* If *cursor already points to '\0' (no more fields), returns NULL.
*/
static char *
pgsa_next_tsv_field(char **cursor)
{
char *start = *cursor;
char *p = start;
if (*p == '\0')
return NULL;
while (*p != '\0' && *p != '\t')
p++;
if (*p == '\t')
*p++ = '\0';
*cursor = p;
return start;
}
/*
* Insert entries into shared memory from the parsed entry array.
*/
static void
pgsa_restore_entries(pgsa_saved_entry *entries, int num_entries)
{
LWLockAcquire(&pgsa_state->lock, LW_SHARED);
for (int i = 0; i < num_entries; i++)
{
ereport(DEBUG2,
errmsg("restoring advice stash entry for \"%s\", query ID %" PRId64,
entries[i].stash_name, entries[i].queryId));
pgsa_set_advice_string(entries[i].stash_name,
entries[i].queryId,
entries[i].advice_string);
}
LWLockRelease(&pgsa_state->lock);
}
/*
* Create stashes in shared memory from the parsed stash hash table.
*/
static void
pgsa_restore_stashes(pgsa_saved_stash_table_hash *saved_stashes)
{
pgsa_saved_stash_table_iterator iter;
pgsa_saved_stash *s;
LWLockAcquire(&pgsa_state->lock, LW_EXCLUSIVE);
pgsa_saved_stash_table_start_iterate(saved_stashes, &iter);
while ((s = pgsa_saved_stash_table_iterate(saved_stashes,
&iter)) != NULL)
{
ereport(DEBUG2,
errmsg("restoring advice stash \"%s\"", s->name));
pgsa_create_stash(s->name);
}
LWLockRelease(&pgsa_state->lock);
}
/*
* Unescape a TSV field in place.
*
* Recognized escape sequences are \\, \t, \n, and \r. A trailing backslash
* or an unrecognized escape sequence is a syntax error.
*/
static void
pgsa_unescape_tsv_field(char *str, const char *filename, unsigned lineno)
{
char *src = str;
char *dst = str;
while (*src != '\0')
{
/* Just pass through anything that's not a backslash-escape. */
if (likely(*src != '\\'))
{
*dst++ = *src++;
continue;
}
/* Check what sort of escape we've got. */
switch (src[1])
{
case '\\':
*dst++ = '\\';
break;
case 't':
*dst++ = '\t';
break;
case 'n':
*dst++ = '\n';
break;
case 'r':
*dst++ = '\r';
break;
case '\0':
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("syntax error in file \"%s\" line %u: trailing backslash",
filename, lineno)));
break;
default:
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("syntax error in file \"%s\" line %u: unrecognized escape \"\\%c\"",
filename, lineno, src[1])));
break;
}
/* We consumed the backslash and the following character. */
src += 2;
}
*dst = '\0';
}
/*
* Write an entry line for each advice entry.
*/
static void
pgsa_write_entries(pgsa_writer_context *wctx)
{
dshash_seq_status iter;
pgsa_entry *entry;
dshash_seq_init(&iter, pgsa_entry_dshash, false);
while ((entry = dshash_seq_next(&iter)) != NULL)
{
pgsa_stash_name *n;
char *advice_string;
if (entry->advice_string == InvalidDsaPointer)
continue;
n = pgsa_stash_name_table_lookup(wctx->nhash,
entry->key.pgsa_stash_id);
if (n == NULL)
continue; /* orphan entry, skip */
advice_string = dsa_get_address(pgsa_dsa_area, entry->advice_string);
resetStringInfo(&wctx->buf);
appendStringInfo(&wctx->buf, "entry\t%s\t%" PRId64 "\t",
n->name, entry->key.queryId);
pgsa_append_tsv_escaped_string(&wctx->buf, advice_string);
appendStringInfoChar(&wctx->buf, '\n');
fwrite(wctx->buf.data, 1, wctx->buf.len, wctx->file);
if (ferror(wctx->file))
pgsa_write_error(wctx);
wctx->entries_written++;
}
dshash_seq_term(&iter);
}
/*
* Clean up and report a write error. Does not return.
*/
static void
pgsa_write_error(pgsa_writer_context *wctx)
{
int save_errno = errno;
FreeFile(wctx->file);
unlink(wctx->pathname);
errno = save_errno;
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to file \"%s\": %m", wctx->pathname)));
}
/*
* Write a stash line for each advice stash, and populate the ID-to-name
* hash table for use by pgsa_write_entries.
*/
static void
pgsa_write_stashes(pgsa_writer_context *wctx)
{
dshash_seq_status iter;
pgsa_stash *stash;
dshash_seq_init(&iter, pgsa_stash_dshash, false);
while ((stash = dshash_seq_next(&iter)) != NULL)
{
pgsa_stash_name *n;
bool found;
n = pgsa_stash_name_table_insert(wctx->nhash, stash->pgsa_stash_id,
&found);
Assert(!found);
n->name = pstrdup(stash->name);
resetStringInfo(&wctx->buf);
appendStringInfo(&wctx->buf, "stash\t%s\n", n->name);
fwrite(wctx->buf.data, 1, wctx->buf.len, wctx->file);
if (ferror(wctx->file))
pgsa_write_error(wctx);
}
dshash_seq_term(&iter);
}

View file

@ -0,0 +1,84 @@
# Copyright (c) 2016-2026, PostgreSQL Global Development Group
use strict;
use warnings FATAL => 'all';
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
my $node = PostgreSQL::Test::Cluster->new('main');
$node->init;
$node->append_conf(
'postgresql.conf',
qq{shared_preload_libraries = 'pg_plan_advice, pg_stash_advice'
pg_stash_advice.persist = true
pg_stash_advice.persist_interval = 0});
$node->start;
$node->safe_psql("postgres",
"CREATE EXTENSION pg_stash_advice;\n");
# Create two stashes: one with 2 entries, one with 1 entry.
$node->safe_psql("postgres", qq{
SELECT pg_create_advice_stash('stash_a');
SELECT pg_set_stashed_advice('stash_a', 1001, 'IndexScan(t)');
SELECT pg_set_stashed_advice('stash_a', 1002, E'line1\\nline2\\ttab\\\\backslash');
SELECT pg_create_advice_stash('stash_b');
SELECT pg_set_stashed_advice('stash_b', 2001, 'SeqScan(t)');
});
# Verify before restart.
my $result = $node->safe_psql("postgres",
"SELECT stash_name, num_entries FROM pg_get_advice_stashes() ORDER BY stash_name");
is($result, "stash_a|2\nstash_b|1", 'stashes present before restart');
# Restart and verify the data survived.
$node->restart;
$node->wait_for_log("loaded 2 advice stashes and 3 entries");
$result = $node->safe_psql("postgres",
"SELECT stash_name, num_entries FROM pg_get_advice_stashes() ORDER BY stash_name");
is($result, "stash_a|2\nstash_b|1", 'stashes survived restart');
# Verify entry contents, including the one with special characters.
$result = $node->safe_psql("postgres",
"SELECT stash_name, query_id, advice_string FROM pg_get_advice_stash_contents(NULL) ORDER BY stash_name, query_id");
is($result,
"stash_a|1001|IndexScan(t)\nstash_a|1002|line1\nline2\ttab\\backslash\nstash_b|2001|SeqScan(t)",
'entry contents survived restart with special characters intact');
# Add a third stash with 0 entries.
$node->safe_psql("postgres", qq{
SELECT pg_create_advice_stash('stash_c');
});
# Restart again and verify all three stashes are present.
$node->restart;
$node->wait_for_log("loaded 3 advice stashes and 3 entries");
$result = $node->safe_psql("postgres",
"SELECT stash_name, num_entries FROM pg_get_advice_stashes() ORDER BY stash_name");
is($result, "stash_a|2\nstash_b|1\nstash_c|0", 'all three stashes survived second restart');
# Drop all stashes and verify the dump file is removed after restart.
$node->safe_psql("postgres", qq{
SELECT pg_drop_advice_stash('stash_a');
SELECT pg_drop_advice_stash('stash_b');
SELECT pg_drop_advice_stash('stash_c');
});
$node->restart;
$result = $node->safe_psql("postgres",
"SELECT count(*) FROM pg_get_advice_stashes()");
is($result, "0", 'no stashes after dropping all and restarting');
ok(!-f $node->data_dir . '/pg_stash_advice.tsv',
'dump file removed after all stashes dropped');
$node->stop;
done_testing();

View file

@ -15,10 +15,12 @@
<link linkend="guc-compute-query-id">query identifiers</link> to plan advice
strings. Whenever a session is asked to plan a query whose query ID appears
in the relevant advice stash, the plan advice string is automatically applied
to guide planning. Note that advice stashes exist purely in memory. This
means both that it is important to be mindful of memory consumption when
deciding how much plan advice to stash, and also that advice stashes must
be recreated and repopulated whenever the server is restarted.
to guide planning. Note that advice stashes are stored in dynamically
allocated shared memory. This means both that it is important to be mindful
of memory consumption when deciding how much plan advice to stash.
Optionally, advice stashes and their contents can automatically be persisted
to disk and reloaded from disk; see
<literal>pg_stash_advice.persist</literal>, below.
</para>
<para>
@ -175,6 +177,28 @@
</listitem>
</varlistentry>
<varlistentry>
<term>
<function>pg_start_stash_advice_worker() returns void</function>
<indexterm>
<primary>pg_start_stash_advice_worker</primary>
</indexterm>
</term>
<listitem>
<para>
Starts the background worker, so that advice stash contents can be
automatically persisted to disk. If this module is included in
<xref linkend="guc-shared-preload-libraries"/> at startup time with
<literal>pg_stash_advice.persist = true</literal>, the worker will be
started automatically. When started manually, the worker will not load
anything from disk, but it will still persist data to disk. You can then
configure the server to start the worker automatically after the next
restart, preserving any stashed advice you add now.
</para>
</listitem>
</varlistentry>
</variablelist>
</sect2>
@ -184,6 +208,44 @@
<variablelist>
<varlistentry>
<term>
<varname>pg_stash_advice.persist</varname> (<type>boolean</type>)
<indexterm>
<primary><varname>pg_stash_advice.persist</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Controls whether the advice stashes and stash entries should be
persisted to disk. This is on by default. If any stashes are persisted,
a file named <literal>pg_stash_advice.tsv</literal> will be created in
the data directory. Stashes are loaded and saved using a background
worker process. This parameter can only be set at server start.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
<varname>pg_stash_advice.persist_interval</varname> (<type>integer</type>)
<indexterm>
<primary><varname>pg_stash_advice.persist_interval</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Specifies the interval, in seconds, between checks for changes that
need to be written to <literal>pg_stash_advice.tsv</literal>. If set to
zero, changes are only written when the server shuts down. The default
value is <literal>30</literal>. This parameter can only be set in the
<filename>postgresql.conf</filename> file or on the server command line.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
<varname>pg_stash_advice.stash_name</varname> (<type>string</type>)

View file

@ -4087,10 +4087,14 @@ pgpa_trove_slice
pgpa_unrolled_join
pgsa_entry
pgsa_entry_key
pgsa_saved_entry
pgsa_saved_stash
pgsa_saved_stash_table_hash
pgsa_shared_state
pgsa_stash
pgsa_stash_count
pgsa_stash_name
pgsa_writer_context
pgsocket
pgsql_thing_t
pgssEntry