diff --git a/contrib/pg_stash_advice/Makefile b/contrib/pg_stash_advice/Makefile index f7670c2d4b6..470c07b9dd7 100644 --- a/contrib/pg_stash_advice/Makefile +++ b/contrib/pg_stash_advice/Makefile @@ -4,13 +4,15 @@ MODULE_big = pg_stash_advice OBJS = \ $(WIN32RES) \ pg_stash_advice.o \ - stashfuncs.o + stashfuncs.o \ + stashpersist.o EXTENSION = pg_stash_advice DATA = pg_stash_advice--1.0.sql PGFILEDESC = "pg_stash_advice - store and automatically apply plan advice" REGRESS = pg_stash_advice pg_stash_advice_utf8 +TAP_TESTS = 1 EXTRA_INSTALL = contrib/pg_plan_advice ifdef USE_PGXS diff --git a/contrib/pg_stash_advice/meson.build b/contrib/pg_stash_advice/meson.build index 8fbcfcf8693..96f485b7729 100644 --- a/contrib/pg_stash_advice/meson.build +++ b/contrib/pg_stash_advice/meson.build @@ -2,7 +2,8 @@ pg_stash_advice_sources = files( 'pg_stash_advice.c', - 'stashfuncs.c' + 'stashfuncs.c', + 'stashpersist.c' ) if host_system == 'windows' @@ -34,4 +35,9 @@ tests += { 'pg_stash_advice_utf8', ], }, + 'tap': { + 'tests': [ + 't/001_persist.pl', + ], + }, } diff --git a/contrib/pg_stash_advice/pg_stash_advice--1.0.sql b/contrib/pg_stash_advice/pg_stash_advice--1.0.sql index 88dedd8ef1b..50f12dac313 100644 --- a/contrib/pg_stash_advice/pg_stash_advice--1.0.sql +++ b/contrib/pg_stash_advice/pg_stash_advice--1.0.sql @@ -36,8 +36,14 @@ RETURNS SETOF record AS 'MODULE_PATHNAME', 'pg_get_advice_stash_contents' LANGUAGE C; +CREATE FUNCTION pg_start_stash_advice_worker() +RETURNS void +AS 'MODULE_PATHNAME', 'pg_start_stash_advice_worker' +LANGUAGE C STRICT; + REVOKE ALL ON FUNCTION pg_create_advice_stash(text) FROM PUBLIC; REVOKE ALL ON FUNCTION pg_drop_advice_stash(text) FROM PUBLIC; REVOKE ALL ON FUNCTION pg_get_advice_stash_contents(text) FROM PUBLIC; REVOKE ALL ON FUNCTION pg_get_advice_stashes() FROM PUBLIC; REVOKE ALL ON FUNCTION pg_set_stashed_advice(text, bigint, text) FROM PUBLIC; +REVOKE ALL ON FUNCTION pg_start_stash_advice_worker() FROM PUBLIC; diff --git a/contrib/pg_stash_advice/pg_stash_advice.c b/contrib/pg_stash_advice/pg_stash_advice.c index 715e6a2d19e..1858c6a135a 100644 --- a/contrib/pg_stash_advice/pg_stash_advice.c +++ b/contrib/pg_stash_advice/pg_stash_advice.c @@ -13,9 +13,11 @@ #include "common/hashfn.h" #include "common/string.h" +#include "miscadmin.h" #include "nodes/queryjumble.h" #include "pg_plan_advice.h" #include "pg_stash_advice.h" +#include "postmaster/bgworker.h" #include "storage/dsm_registry.h" #include "utils/guc.h" #include "utils/memutils.h" @@ -41,8 +43,10 @@ static dshash_parameters pgsa_entry_dshash_parameters = { LWTRANCHE_INVALID /* gets set at runtime */ }; -/* GUC variable */ +/* GUC variables */ static char *pg_stash_advice_stash_name = ""; +bool pg_stash_advice_persist = true; +int pg_stash_advice_persist_interval = 30; /* Shared memory pointers */ pgsa_shared_state *pgsa_state; @@ -87,6 +91,33 @@ _PG_init(void) EnableQueryId(); /* Define our GUCs. */ + if (process_shared_preload_libraries_in_progress) + DefineCustomBoolVariable("pg_stash_advice.persist", + "Save and restore advice stash contents across restarts.", + NULL, + &pg_stash_advice_persist, + true, + PGC_POSTMASTER, + 0, + NULL, + NULL, + NULL); + else + pg_stash_advice_persist = false; + + DefineCustomIntVariable("pg_stash_advice.persist_interval", + "Interval between advice stash saves, in seconds.", + NULL, + &pg_stash_advice_persist_interval, + 30, + 0, + 3600, + PGC_SIGHUP, + GUC_UNIT_S, + NULL, + NULL, + NULL); + DefineCustomStringVariable("pg_stash_advice.stash_name", "Name of the advice stash to be used in this session.", NULL, @@ -100,6 +131,10 @@ _PG_init(void) MarkGUCPrefixReserved("pg_stash_advice"); + /* Start the background worker for persistence, if enabled. */ + if (pg_stash_advice_persist) + pgsa_start_worker(); + /* Tell pg_plan_advice that we want to provide advice strings. */ add_advisor_fn = load_external_function("pg_plan_advice", "pg_plan_advice_add_advisor", @@ -131,6 +166,10 @@ pgsa_advisor(PlannerGlobal *glob, Query *parse, if (unlikely(pgsa_entry_dshash == NULL)) pgsa_attach(); + /* If stash data is still being restored from disk, ignore. */ + if (pg_atomic_unlocked_test_flag(&pgsa_state->stashes_ready)) + return NULL; + /* * Translate pg_stash_advice.stash_name to an integer ID. * @@ -279,6 +318,19 @@ pgsa_attach(void) MemoryContextSwitchTo(oldcontext); } +/* + * Error out if the stashes have not been loaded from disk yet. + */ +void +pgsa_check_lockout(void) +{ + if (pg_atomic_unlocked_test_flag(&pgsa_state->stashes_ready)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("stash modifications are not allowed because \"%s\" has not been loaded yet", + PGSA_DUMP_FILE))); +} + /* * Check whether an advice stash name is legal, and signal an error if not. * @@ -383,6 +435,9 @@ pgsa_create_stash(char *stash_name) errmsg("advice stash \"%s\" already exists", stash_name)); stash->pgsa_stash_id = pgsa_state->next_stash_id++; dshash_release_lock(pgsa_stash_dshash, stash); + + /* Bump change count. */ + pg_atomic_add_fetch_u64(&pgsa_state->change_count, 1); } /* @@ -423,6 +478,9 @@ pgsa_clear_advice_string(char *stash_name, int64 queryId) /* Now we free the advice string as well, if there was one. */ if (old_dp != InvalidDsaPointer) dsa_free(pgsa_dsa_area, old_dp); + + /* Bump change count. */ + pg_atomic_add_fetch_u64(&pgsa_state->change_count, 1); } /* @@ -464,6 +522,43 @@ pgsa_drop_stash(char *stash_name) } } dshash_seq_term(&iterator); + + /* Bump change count. */ + pg_atomic_add_fetch_u64(&pgsa_state->change_count, 1); +} + +/* + * Remove all stashes and entries from shared memory. + * + * This is intended to be called before reloading from a dump file, so that + * a failed previous attempt doesn't leave stale data behind. + */ +void +pgsa_reset_all_stashes(void) +{ + dshash_seq_status iter; + pgsa_entry *entry; + + Assert(LWLockHeldByMeInMode(&pgsa_state->lock, LW_EXCLUSIVE)); + + /* Remove all stashes. */ + dshash_seq_init(&iter, pgsa_stash_dshash, true); + while (dshash_seq_next(&iter) != NULL) + dshash_delete_current(&iter); + dshash_seq_term(&iter); + + /* Remove all entries. */ + dshash_seq_init(&iter, pgsa_entry_dshash, true); + while ((entry = dshash_seq_next(&iter)) != NULL) + { + if (entry->advice_string != InvalidDsaPointer) + dsa_free(pgsa_dsa_area, entry->advice_string); + dshash_delete_current(&iter); + } + dshash_seq_term(&iter); + + /* Reset the stash ID counter. */ + pgsa_state->next_stash_id = UINT64CONST(1); } /* @@ -483,6 +578,23 @@ pgsa_init_shared_state(void *ptr, void *arg) state->area = DSA_HANDLE_INVALID; state->stash_hash = DSHASH_HANDLE_INVALID; state->entry_hash = DSHASH_HANDLE_INVALID; + state->bgworker_pid = InvalidPid; + pg_atomic_init_flag(&state->stashes_ready); + pg_atomic_init_u64(&state->change_count, 0); + + /* + * If this module was loaded via shared_preload_libraries, then + * pg_stash_advice_persist is a GUC variable. If it's true, that means + * that we should lock out manual stash modifications until the dump file + * has been successfully loaded. If it's false, there's nothing to load, + * so we set stashes_ready immediately. + * + * If this module was not loaded via shared_preload_libraries, then + * pg_stash_advice_persist is not a GUC variable, but it will be false, + * which leads to the correct behavior. + */ + if (!pg_stash_advice_persist) + pg_atomic_test_set_flag(&state->stashes_ready); } /* @@ -602,4 +714,60 @@ pgsa_set_advice_string(char *stash_name, int64 queryId, char *advice_string) */ if (DsaPointerIsValid(old_dp)) dsa_free(pgsa_dsa_area, old_dp); + + /* Bump change count. */ + pg_atomic_add_fetch_u64(&pgsa_state->change_count, 1); +} + +/* + * Start our worker process. + */ +void +pgsa_start_worker(void) +{ + BackgroundWorker worker = {0}; + BackgroundWorkerHandle *handle; + BgwHandleStatus status; + pid_t pid; + + worker.bgw_flags = BGWORKER_SHMEM_ACCESS; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + worker.bgw_restart_time = BGW_DEFAULT_RESTART_INTERVAL; + strcpy(worker.bgw_library_name, "pg_stash_advice"); + strcpy(worker.bgw_function_name, "pg_stash_advice_worker_main"); + strcpy(worker.bgw_name, "pg_stash_advice worker"); + strcpy(worker.bgw_type, "pg_stash_advice worker"); + + /* + * If process_shared_preload_libraries_in_progress = true, we may be in + * the postmaster, in which case this will really register the worker, or + * we may be in a child process in an EXEC_BACKEND build, in which case it + * will silently do nothing (which is the correct behavior). + */ + if (process_shared_preload_libraries_in_progress) + { + RegisterBackgroundWorker(&worker); + return; + } + + /* + * If process_shared_preload_libraries_in_progress = false, we're being + * asked to start the worker after system startup time. In other words, + * unless this is single-user mode, we're not in the postmaster, so we + * should use RegisterDynamicBackgroundWorker and then wait for startup to + * complete. (If we do happen to be in single-user mode, this will error + * out, which is fine.) + */ + worker.bgw_notify_pid = MyProcPid; + if (!RegisterDynamicBackgroundWorker(&worker, &handle)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not register background process"), + errhint("You may need to increase \"max_worker_processes\"."))); + status = WaitForBackgroundWorkerStartup(handle, &pid); + if (status != BGWH_STARTED) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not start background process"), + errhint("More details may be available in the server log."))); } diff --git a/contrib/pg_stash_advice/pg_stash_advice.h b/contrib/pg_stash_advice/pg_stash_advice.h index eeaa61e0f37..01aded472f3 100644 --- a/contrib/pg_stash_advice/pg_stash_advice.h +++ b/contrib/pg_stash_advice/pg_stash_advice.h @@ -22,6 +22,8 @@ #include "lib/dshash.h" #include "storage/lwlock.h" +#define PGSA_DUMP_FILE "pg_stash_advice.tsv" + /* * The key that we use to find a particular stash entry. */ @@ -62,6 +64,9 @@ typedef struct pgsa_shared_state dsa_handle area; dshash_table_handle stash_hash; dshash_table_handle entry_hash; + pid_t bgworker_pid; + pg_atomic_flag stashes_ready; + pg_atomic_uint64 change_count; } pgsa_shared_state; /* For stash ID -> stash name hash table */ @@ -86,14 +91,21 @@ extern dsa_area *pgsa_dsa_area; extern dshash_table *pgsa_stash_dshash; extern dshash_table *pgsa_entry_dshash; +/* GUC variables */ +extern bool pg_stash_advice_persist; +extern int pg_stash_advice_persist_interval; + /* Function prototypes */ extern void pgsa_attach(void); +extern void pgsa_check_lockout(void); extern void pgsa_check_stash_name(char *stash_name); extern void pgsa_clear_advice_string(char *stash_name, int64 queryId); extern void pgsa_create_stash(char *stash_name); extern void pgsa_drop_stash(char *stash_name); extern uint64 pgsa_lookup_stash_id(char *stash_name); +extern void pgsa_reset_all_stashes(void); extern void pgsa_set_advice_string(char *stash_name, int64 queryId, char *advice_string); +extern void pgsa_start_worker(void); #endif diff --git a/contrib/pg_stash_advice/stashfuncs.c b/contrib/pg_stash_advice/stashfuncs.c index 33e86abd9d4..77f8e19e867 100644 --- a/contrib/pg_stash_advice/stashfuncs.c +++ b/contrib/pg_stash_advice/stashfuncs.c @@ -14,6 +14,7 @@ #include "common/hashfn.h" #include "fmgr.h" #include "funcapi.h" +#include "miscadmin.h" #include "pg_stash_advice.h" #include "utils/builtins.h" #include "utils/tuplestore.h" @@ -23,6 +24,7 @@ PG_FUNCTION_INFO_V1(pg_drop_advice_stash); PG_FUNCTION_INFO_V1(pg_get_advice_stash_contents); PG_FUNCTION_INFO_V1(pg_get_advice_stashes); PG_FUNCTION_INFO_V1(pg_set_stashed_advice); +PG_FUNCTION_INFO_V1(pg_start_stash_advice_worker); typedef struct pgsa_stash_count { @@ -53,6 +55,7 @@ pg_create_advice_stash(PG_FUNCTION_ARGS) pgsa_check_stash_name(stash_name); if (unlikely(pgsa_entry_dshash == NULL)) pgsa_attach(); + pgsa_check_lockout(); LWLockAcquire(&pgsa_state->lock, LW_EXCLUSIVE); pgsa_create_stash(stash_name); LWLockRelease(&pgsa_state->lock); @@ -70,6 +73,7 @@ pg_drop_advice_stash(PG_FUNCTION_ARGS) pgsa_check_stash_name(stash_name); if (unlikely(pgsa_entry_dshash == NULL)) pgsa_attach(); + pgsa_check_lockout(); LWLockAcquire(&pgsa_state->lock, LW_EXCLUSIVE); pgsa_drop_stash(stash_name); LWLockRelease(&pgsa_state->lock); @@ -94,6 +98,10 @@ pg_get_advice_stashes(PG_FUNCTION_ARGS) if (unlikely(pgsa_entry_dshash == NULL)) pgsa_attach(); + /* If stash data is still being restored from disk, ignore. */ + if (pg_atomic_unlocked_test_flag(&pgsa_state->stashes_ready)) + return (Datum) 0; + /* Tally up the number of entries per stash. */ chash = pgsa_stash_count_table_create(CurrentMemoryContext, 64, NULL); dshash_seq_init(&iterator, pgsa_entry_dshash, true); @@ -154,6 +162,10 @@ pg_get_advice_stash_contents(PG_FUNCTION_ARGS) if (unlikely(pgsa_entry_dshash == NULL)) pgsa_attach(); + /* If stash data is still being restored from disk, ignore. */ + if (pg_atomic_unlocked_test_flag(&pgsa_state->stashes_ready)) + return (Datum) 0; + /* User can pass NULL for all stashes, or the name of a specific stash. */ if (!PG_ARGISNULL(0)) { @@ -286,6 +298,9 @@ pg_set_stashed_advice(PG_FUNCTION_ARGS) if (unlikely(pgsa_entry_dshash == NULL)) pgsa_attach(); + /* Don't allow writes if stash data is still being restored from disk. */ + pgsa_check_lockout(); + /* Now call the appropriate function to do the real work. */ if (PG_ARGISNULL(2)) { @@ -304,3 +319,29 @@ pg_set_stashed_advice(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + +/* + * SQL-callable function to start the persistence background worker. + */ +Datum +pg_start_stash_advice_worker(PG_FUNCTION_ARGS) +{ + pid_t pid; + + if (unlikely(pgsa_entry_dshash == NULL)) + pgsa_attach(); + + LWLockAcquire(&pgsa_state->lock, LW_SHARED); + pid = pgsa_state->bgworker_pid; + LWLockRelease(&pgsa_state->lock); + + if (pid != InvalidPid) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("pg_stash_advice worker is already running under PID %d", + (int) pid))); + + pgsa_start_worker(); + + PG_RETURN_VOID(); +} diff --git a/contrib/pg_stash_advice/stashpersist.c b/contrib/pg_stash_advice/stashpersist.c new file mode 100644 index 00000000000..07a4da65b7e --- /dev/null +++ b/contrib/pg_stash_advice/stashpersist.c @@ -0,0 +1,800 @@ +/*------------------------------------------------------------------------- + * + * stashpersist.c + * Persistence support for pg_stash_advice. + * + * Copyright (c) 2016-2026, PostgreSQL Global Development Group + * + * contrib/pg_stash_advice/stashpersist.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include + +#include "common/hashfn.h" +#include "miscadmin.h" +#include "pg_stash_advice.h" +#include "postmaster/bgworker.h" +#include "postmaster/interrupt.h" +#include "storage/fd.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "storage/proc.h" +#include "storage/procsignal.h" +#include "utils/backend_status.h" +#include "utils/guc.h" +#include "utils/memutils.h" +#include "utils/timestamp.h" + +typedef struct pgsa_writer_context +{ + char pathname[MAXPGPATH]; + FILE *file; + pgsa_stash_name_table_hash *nhash; + StringInfoData buf; + int entries_written; +} pgsa_writer_context; + +/* + * A parsed entry line, with pointers into the slurp buffer. + */ +typedef struct pgsa_saved_entry +{ + char *stash_name; + int64 queryId; + char *advice_string; +} pgsa_saved_entry; + +/* + * simplehash for detecting duplicate stash names during parsing. + * Keyed by stash name (char *), pointing into the slurp buffer. + */ +typedef struct pgsa_saved_stash +{ + uint32 status; + char *name; +} pgsa_saved_stash; + +#define SH_PREFIX pgsa_saved_stash_table +#define SH_ELEMENT_TYPE pgsa_saved_stash +#define SH_KEY_TYPE char * +#define SH_KEY name +#define SH_HASH_KEY(tb, key) hash_bytes((const unsigned char *) (key), strlen(key)) +#define SH_EQUAL(tb, a, b) (strcmp(a, b) == 0) +#define SH_SCOPE static inline +#define SH_DEFINE +#define SH_DECLARE +#include "lib/simplehash.h" + +extern PGDLLEXPORT void pg_stash_advice_worker_main(Datum main_arg); +static void pgsa_append_tsv_escaped_string(StringInfo buf, const char *str); +static void pgsa_detach_shmem(int code, Datum arg); +static char *pgsa_next_tsv_field(char **cursor); +static void pgsa_read_from_disk(void); +static void pgsa_restore_entries(pgsa_saved_entry *entries, int num_entries); +static void pgsa_restore_stashes(pgsa_saved_stash_table_hash *saved_stashes); +static void pgsa_unescape_tsv_field(char *str, const char *filename, + unsigned lineno); +static void pgsa_write_entries(pgsa_writer_context *wctx); +pg_noreturn static void pgsa_write_error(pgsa_writer_context *wctx); +static void pgsa_write_stashes(pgsa_writer_context *wctx); +static void pgsa_write_to_disk(void); + +/* + * Background worker entry point for pg_stash_advice persistence. + * + * On startup, if load_from_disk_pending is set, we load previously saved + * stash data from disk. Then we enter a loop, periodically checking whether + * any changes have been made (via the change_count atomic counter) and + * writing them to disk. On shutdown, we perform a final write. + */ +PGDLLEXPORT void +pg_stash_advice_worker_main(Datum main_arg) +{ + uint64 last_change_count; + TimestampTz last_write_time = 0; + + /* Establish signal handlers; once that's done, unblock signals. */ + pqsignal(SIGTERM, SignalHandlerForShutdownRequest); + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGUSR1, procsignal_sigusr1_handler); + BackgroundWorkerUnblockSignals(); + + /* Log a debug message */ + ereport(DEBUG1, + errmsg("pg_stash_advice worker started")); + + /* Set up session user so pgstat can report it. */ + InitializeSessionUserIdStandalone(); + + /* Report this worker in pg_stat_activity. */ + pgstat_beinit(); + pgstat_bestart_initial(); + pgstat_bestart_final(); + + /* Attach to shared memory structures. */ + pgsa_attach(); + + /* Set on-detach hook so that our PID will be cleared on exit. */ + before_shmem_exit(pgsa_detach_shmem, 0); + + /* + * Store our PID in shared memory, unless there's already another worker + * running, in which case just exit. + */ + LWLockAcquire(&pgsa_state->lock, LW_EXCLUSIVE); + if (pgsa_state->bgworker_pid != InvalidPid) + { + LWLockRelease(&pgsa_state->lock); + ereport(LOG, + (errmsg("pg_stash_advice worker is already running under PID %d", + (int) pgsa_state->bgworker_pid))); + return; + } + pgsa_state->bgworker_pid = MyProcPid; + LWLockRelease(&pgsa_state->lock); + + /* + * If pg_stash_advice.persist was set to true during + * process_shared_preload_libraries() and the data has not yet been + * successfully loaded, load it now. + */ + if (pg_atomic_unlocked_test_flag(&pgsa_state->stashes_ready)) + { + pgsa_read_from_disk(); + pg_atomic_test_set_flag(&pgsa_state->stashes_ready); + } + + /* Note the current change count so we can detect future changes. */ + last_change_count = pg_atomic_read_u64(&pgsa_state->change_count); + + /* Periodically write to disk until terminated. */ + while (!ShutdownRequestPending) + { + /* In case of a SIGHUP, just reload the configuration. */ + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + + if (pg_stash_advice_persist_interval <= 0) + { + /* Only writing at shutdown, so just wait forever. */ + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, + -1L, + PG_WAIT_EXTENSION); + } + else + { + TimestampTz next_write_time; + long delay_in_ms; + uint64 current_change_count; + + /* Compute when the next write should happen. */ + next_write_time = + TimestampTzPlusMilliseconds(last_write_time, + pg_stash_advice_persist_interval * 1000); + delay_in_ms = + TimestampDifferenceMilliseconds(GetCurrentTimestamp(), + next_write_time); + + /* + * When we reach next_write_time, we always update last_write_time + * (which is really the time at which we last considered writing), + * but we only actually write to disk if something has changed. + */ + if (delay_in_ms <= 0) + { + current_change_count = + pg_atomic_read_u64(&pgsa_state->change_count); + if (current_change_count != last_change_count) + { + pgsa_write_to_disk(); + last_change_count = current_change_count; + } + last_write_time = GetCurrentTimestamp(); + continue; + } + + /* Sleep until the next write time. */ + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + delay_in_ms, + PG_WAIT_EXTENSION); + } + + ResetLatch(MyLatch); + } + + /* Write one last time before exiting. */ + pgsa_write_to_disk(); +} + +/* + * Clear our PID from shared memory on exit. + */ +static void +pgsa_detach_shmem(int code, Datum arg) +{ + LWLockAcquire(&pgsa_state->lock, LW_EXCLUSIVE); + if (pgsa_state->bgworker_pid == MyProcPid) + pgsa_state->bgworker_pid = InvalidPid; + LWLockRelease(&pgsa_state->lock); +} + +/* + * Load advice stash data from a dump file on disk, if there is one. + */ +static void +pgsa_read_from_disk(void) +{ + struct stat statbuf; + FILE *file; + char *filebuf; + size_t nread; + char *p; + unsigned lineno; + pgsa_saved_stash_table_hash *saved_stashes; + int num_stashes = 0; + pgsa_saved_entry *entries; + int num_entries = 0; + int max_entries = 64; + MemoryContext tmpcxt; + MemoryContext oldcxt; + + Assert(pgsa_entry_dshash != NULL); + + /* + * Clear any existing shared memory state. + * + * Normally, there won't be any, but if this function was called before + * and failed after beginning to apply changes to shared memory, then we + * need to get rid of any entries created at that time before trying + * again. + */ + LWLockAcquire(&pgsa_state->lock, LW_EXCLUSIVE); + pgsa_reset_all_stashes(); + LWLockRelease(&pgsa_state->lock); + + /* Open the dump file. If it doesn't exist, we're done. */ + file = AllocateFile(PGSA_DUMP_FILE, "r"); + if (!file) + { + if (errno == ENOENT) + return; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", PGSA_DUMP_FILE))); + } + + /* Use a temporary context for all parse-phase allocations. */ + tmpcxt = AllocSetContextCreate(CurrentMemoryContext, + "pg_stash_advice load", + ALLOCSET_DEFAULT_SIZES); + oldcxt = MemoryContextSwitchTo(tmpcxt); + + /* Figure out how long the file is. */ + if (fstat(fileno(file), &statbuf) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", PGSA_DUMP_FILE))); + + /* + * Slurp the entire file into memory all at once. + * + * We could avoid this by reading the file incrementally and applying + * changes to pgsa_stash_dshash and pgsa_entry_dshash as we go. Given the + * lockout mechanism implemented by stashes_ready, that shouldn't have any + * user-visible behavioral consequences, but it would consume shared + * memory to no benefit. It seems better to buffer everything in private + * memory first, and then only apply the changes once the file has been + * successfully parsed in its entirety. + * + * That also has the advantage of possibly being more future-proof: if we + * decide to remove the stashes_ready mechanism in the future, or say + * allow for multiple save files, fully validating the file before + * applying any changes will become much more important. + * + * Of course, this approach does have one major disadvantage, which is + * that we'll temporarily use about twice as much memory as we're + * ultimately going to need, but that seems like it shouldn't be a problem + * in practice. If there's so much stashed advice that parsing the disk + * file runs us out of memory, something has gone terribly wrong. In that + * situation, there probably also isn't enough free memory for the + * workload that the advice is attempting to manipulate to run + * successfully. + */ + filebuf = palloc_extended(statbuf.st_size + 1, MCXT_ALLOC_HUGE); + nread = fread(filebuf, 1, statbuf.st_size, file); + if (ferror(file)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", PGSA_DUMP_FILE))); + FreeFile(file); + filebuf[nread] = '\0'; + + /* Initial memory allocations. */ + saved_stashes = pgsa_saved_stash_table_create(tmpcxt, 64, NULL); + entries = palloc(max_entries * sizeof(pgsa_saved_entry)); + + /* + * For memory and CPU efficiency, we parse the file in place. The end of + * each line gets replaced with a NUL byte, and then the end of each field + * within a line gets the same treatment. The advice string is unescaped + * in place, and stash names and query IDs can't contain any special + * characters. All of the resulting pointers point right back into the + * buffer; we only need additional memory to grow the 'entries' array and + * the 'saved_stashes' hash table. + */ + for (p = filebuf, lineno = 1; *p != '\0'; lineno++) + { + char *cursor = p; + char *eol; + char *line_type; + + /* Find end of line and NUL-terminate. */ + eol = strchr(p, '\n'); + if (eol != NULL) + { + *eol = '\0'; + p = eol + 1; + if (eol > cursor && eol[-1] == '\r') + eol[-1] = '\0'; + } + else + p += strlen(p); + + /* Skip empty lines. */ + if (*cursor == '\0') + continue; + + /* First field is the type of line, either "stash" or "entry". */ + line_type = pgsa_next_tsv_field(&cursor); + if (strcmp(line_type, "stash") == 0) + { + char *name; + bool found; + + /* Second field should be the stash name. */ + name = pgsa_next_tsv_field(&cursor); + if (name == NULL || *name == '\0') + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("syntax error in file \"%s\" line %u: expected stash name", + PGSA_DUMP_FILE, lineno))); + + /* No further fields are expected. */ + if (*cursor != '\0') + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("syntax error in file \"%s\" line %u: expected end of line", + PGSA_DUMP_FILE, lineno))); + + /* Duplicate check. */ + (void) pgsa_saved_stash_table_insert(saved_stashes, name, &found); + if (found) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("syntax error in file \"%s\" line %u: duplicate stash name \"%s\"", + PGSA_DUMP_FILE, lineno, name))); + num_stashes++; + } + else if (strcmp(line_type, "entry") == 0) + { + char *stash_name; + char *queryid_str; + char *advice_str; + char *endptr; + int64 queryId; + + /* Second field should be the stash name. */ + stash_name = pgsa_next_tsv_field(&cursor); + if (stash_name == NULL) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("syntax error in file \"%s\" line %u: expected stash name", + PGSA_DUMP_FILE, lineno))); + + /* Third field should be the query ID. */ + queryid_str = pgsa_next_tsv_field(&cursor); + if (queryid_str == NULL) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("syntax error in file \"%s\" line %u: expected query ID", + PGSA_DUMP_FILE, lineno))); + + /* Fourth field should be the advice string. */ + advice_str = pgsa_next_tsv_field(&cursor); + if (advice_str == NULL) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("syntax error in file \"%s\" line %u: expected advice string", + PGSA_DUMP_FILE, lineno))); + + /* No further fields are expected. */ + if (*cursor != '\0') + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("syntax error in file \"%s\" line %u: expected end of line", + PGSA_DUMP_FILE, lineno))); + + /* Make sure the stash is one we've actually seen. */ + if (pgsa_saved_stash_table_lookup(saved_stashes, + stash_name) == NULL) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("syntax error in file \"%s\" line %u: unknown stash \"%s\"", + PGSA_DUMP_FILE, lineno, stash_name))); + + /* Parse the query ID. */ + errno = 0; + queryId = strtoll(queryid_str, &endptr, 10); + if (*endptr != '\0' || errno != 0 || queryid_str == endptr || + queryId == 0) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("syntax error in file \"%s\" line %u: invalid query ID \"%s\"", + PGSA_DUMP_FILE, lineno, queryid_str))); + + /* Unescape the advice string. */ + pgsa_unescape_tsv_field(advice_str, PGSA_DUMP_FILE, lineno); + + /* Append to the entry array. */ + if (num_entries >= max_entries) + { + max_entries *= 2; + entries = repalloc(entries, + max_entries * sizeof(pgsa_saved_entry)); + } + entries[num_entries].stash_name = stash_name; + entries[num_entries].queryId = queryId; + entries[num_entries].advice_string = advice_str; + num_entries++; + } + else + { + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("syntax error in file \"%s\" line %u: unrecognized line type", + PGSA_DUMP_FILE, lineno))); + } + } + + /* + * Parsing succeeded. Apply everything to shared memory. + * + * At this point, we know that the file we just read is fully valid, but + * it's still possible for this to fail if, for example, DSA memory cannot + * be allocated. If that happens, the worker will die, the postmaster will + * eventually restart it, and we'll try again after clearing any data that + * we did manage to put into shared memory. (Note that we call + * pgsa_reset_all_stashes() at the top of this function.) + */ + pgsa_restore_stashes(saved_stashes); + pgsa_restore_entries(entries, num_entries); + + /* Hooray, it worked! Notify the user. */ + ereport(LOG, + (errmsg("loaded %d advice stashes and %d entries from \"%s\"", + num_stashes, num_entries, PGSA_DUMP_FILE))); + + /* Clean up. */ + MemoryContextSwitchTo(oldcxt); + MemoryContextDelete(tmpcxt); +} + +/* + * Write all advice stash data to disk. + * + * The file format is a simple TSV with a line-type prefix: + * stash\tstash_name + * entry\tstash_name\tquery_id\tadvice_string + */ +static void +pgsa_write_to_disk(void) +{ + pgsa_writer_context wctx = {0}; + MemoryContext tmpcxt; + MemoryContext oldcxt; + + Assert(pgsa_entry_dshash != NULL); + + /* Use a temporary context so all allocations are freed at the end. */ + tmpcxt = AllocSetContextCreate(CurrentMemoryContext, + "pg_stash_advice dump", + ALLOCSET_DEFAULT_SIZES); + oldcxt = MemoryContextSwitchTo(tmpcxt); + + /* Set up the writer context. */ + snprintf(wctx.pathname, MAXPGPATH, "%s.tmp", PGSA_DUMP_FILE); + wctx.file = AllocateFile(wctx.pathname, "w"); + if (!wctx.file) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", wctx.pathname))); + wctx.nhash = pgsa_stash_name_table_create(tmpcxt, 64, NULL); + initStringInfo(&wctx.buf); + + /* Write stash lines, then entry lines. */ + pgsa_write_stashes(&wctx); + pgsa_write_entries(&wctx); + + /* + * If nothing was written, remove both the temp file and any existing dump + * file rather than installing a zero-length file. + */ + if (wctx.nhash->members == 0) + { + ereport(DEBUG1, + errmsg("there are no advice stashes to save")); + FreeFile(wctx.file); + unlink(wctx.pathname); + if (unlink(PGSA_DUMP_FILE) == 0) + ereport(DEBUG1, + errmsg("removed \"%s\"", PGSA_DUMP_FILE)); + } + else + { + if (FreeFile(wctx.file) != 0) + { + int save_errno = errno; + + unlink(wctx.pathname); + errno = save_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", + wctx.pathname))); + } + (void) durable_rename(wctx.pathname, PGSA_DUMP_FILE, ERROR); + + ereport(LOG, + errmsg("saved %d advice stashes and %d entries to \"%s\"", + (int) wctx.nhash->members, wctx.entries_written, + PGSA_DUMP_FILE)); + } + + MemoryContextSwitchTo(oldcxt); + MemoryContextDelete(tmpcxt); +} + +/* + * Append the TSV-escaped form of str to buf. + * + * Backslash, tab, newline, and carriage return are escaped with backslash + * sequences. All other characters are passed through unchanged. + */ +static void +pgsa_append_tsv_escaped_string(StringInfo buf, const char *str) +{ + for (const char *p = str; *p != '\0'; p++) + { + switch (*p) + { + case '\\': + appendStringInfoString(buf, "\\\\"); + break; + case '\t': + appendStringInfoString(buf, "\\t"); + break; + case '\n': + appendStringInfoString(buf, "\\n"); + break; + case '\r': + appendStringInfoString(buf, "\\r"); + break; + default: + appendStringInfoChar(buf, *p); + break; + } + } +} + +/* + * Extract the next tab-delimited field from *cursor. + * + * The tab delimiter is replaced with '\0' and *cursor is advanced past it. + * If *cursor already points to '\0' (no more fields), returns NULL. + */ +static char * +pgsa_next_tsv_field(char **cursor) +{ + char *start = *cursor; + char *p = start; + + if (*p == '\0') + return NULL; + + while (*p != '\0' && *p != '\t') + p++; + + if (*p == '\t') + *p++ = '\0'; + + *cursor = p; + return start; +} + +/* + * Insert entries into shared memory from the parsed entry array. + */ +static void +pgsa_restore_entries(pgsa_saved_entry *entries, int num_entries) +{ + LWLockAcquire(&pgsa_state->lock, LW_SHARED); + for (int i = 0; i < num_entries; i++) + { + ereport(DEBUG2, + errmsg("restoring advice stash entry for \"%s\", query ID %" PRId64, + entries[i].stash_name, entries[i].queryId)); + pgsa_set_advice_string(entries[i].stash_name, + entries[i].queryId, + entries[i].advice_string); + } + LWLockRelease(&pgsa_state->lock); +} + +/* + * Create stashes in shared memory from the parsed stash hash table. + */ +static void +pgsa_restore_stashes(pgsa_saved_stash_table_hash *saved_stashes) +{ + pgsa_saved_stash_table_iterator iter; + pgsa_saved_stash *s; + + LWLockAcquire(&pgsa_state->lock, LW_EXCLUSIVE); + pgsa_saved_stash_table_start_iterate(saved_stashes, &iter); + while ((s = pgsa_saved_stash_table_iterate(saved_stashes, + &iter)) != NULL) + { + ereport(DEBUG2, + errmsg("restoring advice stash \"%s\"", s->name)); + pgsa_create_stash(s->name); + } + LWLockRelease(&pgsa_state->lock); +} + +/* + * Unescape a TSV field in place. + * + * Recognized escape sequences are \\, \t, \n, and \r. A trailing backslash + * or an unrecognized escape sequence is a syntax error. + */ +static void +pgsa_unescape_tsv_field(char *str, const char *filename, unsigned lineno) +{ + char *src = str; + char *dst = str; + + while (*src != '\0') + { + /* Just pass through anything that's not a backslash-escape. */ + if (likely(*src != '\\')) + { + *dst++ = *src++; + continue; + } + + /* Check what sort of escape we've got. */ + switch (src[1]) + { + case '\\': + *dst++ = '\\'; + break; + case 't': + *dst++ = '\t'; + break; + case 'n': + *dst++ = '\n'; + break; + case 'r': + *dst++ = '\r'; + break; + case '\0': + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("syntax error in file \"%s\" line %u: trailing backslash", + filename, lineno))); + break; + default: + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("syntax error in file \"%s\" line %u: unrecognized escape \"\\%c\"", + filename, lineno, src[1]))); + break; + } + + /* We consumed the backslash and the following character. */ + src += 2; + } + *dst = '\0'; +} + +/* + * Write an entry line for each advice entry. + */ +static void +pgsa_write_entries(pgsa_writer_context *wctx) +{ + dshash_seq_status iter; + pgsa_entry *entry; + + dshash_seq_init(&iter, pgsa_entry_dshash, false); + while ((entry = dshash_seq_next(&iter)) != NULL) + { + pgsa_stash_name *n; + char *advice_string; + + if (entry->advice_string == InvalidDsaPointer) + continue; + + n = pgsa_stash_name_table_lookup(wctx->nhash, + entry->key.pgsa_stash_id); + if (n == NULL) + continue; /* orphan entry, skip */ + + advice_string = dsa_get_address(pgsa_dsa_area, entry->advice_string); + + resetStringInfo(&wctx->buf); + appendStringInfo(&wctx->buf, "entry\t%s\t%" PRId64 "\t", + n->name, entry->key.queryId); + pgsa_append_tsv_escaped_string(&wctx->buf, advice_string); + appendStringInfoChar(&wctx->buf, '\n'); + fwrite(wctx->buf.data, 1, wctx->buf.len, wctx->file); + if (ferror(wctx->file)) + pgsa_write_error(wctx); + wctx->entries_written++; + } + dshash_seq_term(&iter); +} + +/* + * Clean up and report a write error. Does not return. + */ +static void +pgsa_write_error(pgsa_writer_context *wctx) +{ + int save_errno = errno; + + FreeFile(wctx->file); + unlink(wctx->pathname); + errno = save_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\": %m", wctx->pathname))); +} + +/* + * Write a stash line for each advice stash, and populate the ID-to-name + * hash table for use by pgsa_write_entries. + */ +static void +pgsa_write_stashes(pgsa_writer_context *wctx) +{ + dshash_seq_status iter; + pgsa_stash *stash; + + dshash_seq_init(&iter, pgsa_stash_dshash, false); + while ((stash = dshash_seq_next(&iter)) != NULL) + { + pgsa_stash_name *n; + bool found; + + n = pgsa_stash_name_table_insert(wctx->nhash, stash->pgsa_stash_id, + &found); + Assert(!found); + n->name = pstrdup(stash->name); + + resetStringInfo(&wctx->buf); + appendStringInfo(&wctx->buf, "stash\t%s\n", n->name); + fwrite(wctx->buf.data, 1, wctx->buf.len, wctx->file); + if (ferror(wctx->file)) + pgsa_write_error(wctx); + } + dshash_seq_term(&iter); +} diff --git a/contrib/pg_stash_advice/t/001_persist.pl b/contrib/pg_stash_advice/t/001_persist.pl new file mode 100644 index 00000000000..d1466166602 --- /dev/null +++ b/contrib/pg_stash_advice/t/001_persist.pl @@ -0,0 +1,84 @@ + +# Copyright (c) 2016-2026, PostgreSQL Global Development Group + +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $node = PostgreSQL::Test::Cluster->new('main'); + +$node->init; +$node->append_conf( + 'postgresql.conf', + qq{shared_preload_libraries = 'pg_plan_advice, pg_stash_advice' +pg_stash_advice.persist = true +pg_stash_advice.persist_interval = 0}); +$node->start; + +$node->safe_psql("postgres", + "CREATE EXTENSION pg_stash_advice;\n"); + +# Create two stashes: one with 2 entries, one with 1 entry. +$node->safe_psql("postgres", qq{ + SELECT pg_create_advice_stash('stash_a'); + SELECT pg_set_stashed_advice('stash_a', 1001, 'IndexScan(t)'); + SELECT pg_set_stashed_advice('stash_a', 1002, E'line1\\nline2\\ttab\\\\backslash'); + SELECT pg_create_advice_stash('stash_b'); + SELECT pg_set_stashed_advice('stash_b', 2001, 'SeqScan(t)'); +}); + +# Verify before restart. +my $result = $node->safe_psql("postgres", + "SELECT stash_name, num_entries FROM pg_get_advice_stashes() ORDER BY stash_name"); +is($result, "stash_a|2\nstash_b|1", 'stashes present before restart'); + +# Restart and verify the data survived. +$node->restart; +$node->wait_for_log("loaded 2 advice stashes and 3 entries"); + +$result = $node->safe_psql("postgres", + "SELECT stash_name, num_entries FROM pg_get_advice_stashes() ORDER BY stash_name"); +is($result, "stash_a|2\nstash_b|1", 'stashes survived restart'); + +# Verify entry contents, including the one with special characters. +$result = $node->safe_psql("postgres", + "SELECT stash_name, query_id, advice_string FROM pg_get_advice_stash_contents(NULL) ORDER BY stash_name, query_id"); +is($result, + "stash_a|1001|IndexScan(t)\nstash_a|1002|line1\nline2\ttab\\backslash\nstash_b|2001|SeqScan(t)", + 'entry contents survived restart with special characters intact'); + +# Add a third stash with 0 entries. +$node->safe_psql("postgres", qq{ + SELECT pg_create_advice_stash('stash_c'); +}); + +# Restart again and verify all three stashes are present. +$node->restart; +$node->wait_for_log("loaded 3 advice stashes and 3 entries"); + +$result = $node->safe_psql("postgres", + "SELECT stash_name, num_entries FROM pg_get_advice_stashes() ORDER BY stash_name"); +is($result, "stash_a|2\nstash_b|1\nstash_c|0", 'all three stashes survived second restart'); + +# Drop all stashes and verify the dump file is removed after restart. +$node->safe_psql("postgres", qq{ + SELECT pg_drop_advice_stash('stash_a'); + SELECT pg_drop_advice_stash('stash_b'); + SELECT pg_drop_advice_stash('stash_c'); +}); + +$node->restart; + +$result = $node->safe_psql("postgres", + "SELECT count(*) FROM pg_get_advice_stashes()"); +is($result, "0", 'no stashes after dropping all and restarting'); + +ok(!-f $node->data_dir . '/pg_stash_advice.tsv', + 'dump file removed after all stashes dropped'); + +$node->stop; + +done_testing(); diff --git a/doc/src/sgml/pgstashadvice.sgml b/doc/src/sgml/pgstashadvice.sgml index ec60552a447..810787fe814 100644 --- a/doc/src/sgml/pgstashadvice.sgml +++ b/doc/src/sgml/pgstashadvice.sgml @@ -15,10 +15,12 @@ query identifiers to plan advice strings. Whenever a session is asked to plan a query whose query ID appears in the relevant advice stash, the plan advice string is automatically applied - to guide planning. Note that advice stashes exist purely in memory. This - means both that it is important to be mindful of memory consumption when - deciding how much plan advice to stash, and also that advice stashes must - be recreated and repopulated whenever the server is restarted. + to guide planning. Note that advice stashes are stored in dynamically + allocated shared memory. This means both that it is important to be mindful + of memory consumption when deciding how much plan advice to stash. + Optionally, advice stashes and their contents can automatically be persisted + to disk and reloaded from disk; see + pg_stash_advice.persist, below. @@ -175,6 +177,28 @@ + + + pg_start_stash_advice_worker() returns void + + pg_start_stash_advice_worker + + + + + + Starts the background worker, so that advice stash contents can be + automatically persisted to disk. If this module is included in + at startup time with + pg_stash_advice.persist = true, the worker will be + started automatically. When started manually, the worker will not load + anything from disk, but it will still persist data to disk. You can then + configure the server to start the worker automatically after the next + restart, preserving any stashed advice you add now. + + + + @@ -184,6 +208,44 @@ + + + pg_stash_advice.persist (boolean) + + pg_stash_advice.persist configuration parameter + + + + + + Controls whether the advice stashes and stash entries should be + persisted to disk. This is on by default. If any stashes are persisted, + a file named pg_stash_advice.tsv will be created in + the data directory. Stashes are loaded and saved using a background + worker process. This parameter can only be set at server start. + + + + + + + pg_stash_advice.persist_interval (integer) + + pg_stash_advice.persist_interval configuration parameter + + + + + + Specifies the interval, in seconds, between checks for changes that + need to be written to pg_stash_advice.tsv. If set to + zero, changes are only written when the server shuts down. The default + value is 30. This parameter can only be set in the + postgresql.conf file or on the server command line. + + + + pg_stash_advice.stash_name (string) diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 9e6a39f5608..637c669a146 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -4087,10 +4087,14 @@ pgpa_trove_slice pgpa_unrolled_join pgsa_entry pgsa_entry_key +pgsa_saved_entry +pgsa_saved_stash +pgsa_saved_stash_table_hash pgsa_shared_state pgsa_stash pgsa_stash_count pgsa_stash_name +pgsa_writer_context pgsocket pgsql_thing_t pgssEntry