mirror of
https://github.com/postgres/postgres.git
synced 2026-05-28 04:35:45 -04:00
Fix NOTIFY wakeups for pre-commit LISTEN entries.
Commit282b1cde9made SignalBackends() ignore ListenerEntry entries whose "listening" flag said that the listener was not yet committed. That will be true for a new listener that has already registered its queue position, but has not yet reached AtCommit_Notify(). If another backend notifies the same channel in that window, SignalBackends() would directly advance the new listener's queue position, causing it to miss message(s). Really this is a definitional question: is a new listener active as of PreCommit, or as of AtCommit? But it seems to make more sense to expect that the new listener will see all messages after its initially-registered queue position, especially since the direct-advance logic is supposed to be an optimization that doesn't affect semantics. Fix this by treating all channel entries as valid wakeup targets. Rename the "listening" flag to removeOnAbort to reflect its remaining purpose: identifying staged LISTEN entries that abort cleanup must remove. While we're here, remove an obsolete test case added by282b1cde9. The check for "ChannelHashAddListener array growth" was meant to exercise code that never made it into the committed patch, so now it's just a waste of test cycles. Author: Joel Jacobson <joel@compiler.org> Reviewed-by: Arseniy Mukhin <arseniy.mukhin.dev@gmail.com> Reviewed-by: Tom Lane <tgl@sss.pgh.pa.us> Discussion: https://postgr.es/m/9835b0a4-9121-47ac-9c44-427b8b1a7f1b@app.fastmail.com Discussion: https://postgr.es/m/6fe5ee75-537d-4d4f-909a-b21303c3ce75@app.fastmail.com
This commit is contained in:
parent
2fbb21170e
commit
0f24332aeb
3 changed files with 33 additions and 49 deletions
|
|
@ -114,15 +114,15 @@
|
|||
* If the current transaction has executed any LISTEN/UNLISTEN actions,
|
||||
* PreCommit_Notify() prepares to commit those. For LISTEN, it
|
||||
* pre-allocates entries in both the per-backend localChannelTable and the
|
||||
* shared globalChannelTable (with listening=false so that these entries
|
||||
* are no-ops for the moment). It also records the final per-channel
|
||||
* intent in pendingListenActions, so post-commit/abort processing can
|
||||
* apply that in a single step. Since all these allocations happen before
|
||||
* committing to clog, we can safely abort the transaction on failure.
|
||||
* shared globalChannelTable, marking new shared entries removeOnAbort.
|
||||
* It also records the final per-channel intent in pendingListenActions,
|
||||
* so post-commit/abort processing can apply that in a single step.
|
||||
* Since all these allocations happen before committing to clog, we can
|
||||
* safely abort the transaction on failure.
|
||||
*
|
||||
* After commit, AtCommit_Notify() runs through pendingListenActions and
|
||||
* updates the backend's per-channel listening flags to activate or
|
||||
* deactivate listening. This happens before sending signals.
|
||||
* applies the final per-channel listen/unlisten state. This happens
|
||||
* before sending signals.
|
||||
*
|
||||
* SignalBackends() consults the shared global channel table to identify
|
||||
* listeners for the channels that the current transaction sent
|
||||
|
|
@ -385,7 +385,7 @@ static SlruDesc NotifySlruDesc;
|
|||
*
|
||||
* This hash table maps (database OID, channel name) keys to arrays of
|
||||
* ProcNumbers representing the backends listening or about to listen
|
||||
* on each channel. The "listening" flags allow us to create hash table
|
||||
* on each channel. The removeOnAbort flags allow us to create hash table
|
||||
* entries pre-commit and not have to assume that creating them post-commit
|
||||
* will succeed.
|
||||
*/
|
||||
|
|
@ -400,7 +400,7 @@ typedef struct GlobalChannelKey
|
|||
typedef struct ListenerEntry
|
||||
{
|
||||
ProcNumber procNo; /* listener's ProcNumber */
|
||||
bool listening; /* true if committed listener */
|
||||
bool removeOnAbort; /* remove entry if current xact aborts */
|
||||
} ListenerEntry;
|
||||
|
||||
typedef struct GlobalChannelEntry
|
||||
|
|
@ -1523,9 +1523,8 @@ BecomeRegisteredListener(void)
|
|||
*
|
||||
* Prepare a LISTEN by recording it in pendingListenActions, pre-allocating
|
||||
* an entry in localChannelTable, and pre-allocating an entry in the shared
|
||||
* globalChannelTable with listening=false. The listening flag will be set
|
||||
* to true in AtCommit_Notify. If we abort later, unwanted table entries
|
||||
* will be removed.
|
||||
* globalChannelTable with removeOnAbort set. AtCommit_Notify will clear
|
||||
* removeOnAbort; abort processing will remove entries still marked so.
|
||||
*/
|
||||
static void
|
||||
PrepareTableEntriesForListen(const char *channel)
|
||||
|
|
@ -1557,7 +1556,7 @@ PrepareTableEntriesForListen(const char *channel)
|
|||
*/
|
||||
(void) hash_search(localChannelTable, channel, HASH_ENTER, NULL);
|
||||
|
||||
/* Pre-allocate entry in shared globalChannelTable with listening=false */
|
||||
/* Pre-allocate entry in shared globalChannelTable */
|
||||
GlobalChannelKeyInit(&key, MyDatabaseId, channel);
|
||||
entry = dshash_find_or_insert(globalChannelTable, &key, &found);
|
||||
|
||||
|
|
@ -1592,7 +1591,7 @@ PrepareTableEntriesForListen(const char *channel)
|
|||
{
|
||||
if (listeners[i].procNo == MyProcNumber)
|
||||
{
|
||||
/* Already have an entry; listening flag stays as-is until commit */
|
||||
/* Already have an entry; leave removeOnAbort as-is */
|
||||
dshash_release_lock(globalChannelTable, entry);
|
||||
return;
|
||||
}
|
||||
|
|
@ -1615,8 +1614,7 @@ PrepareTableEntriesForListen(const char *channel)
|
|||
}
|
||||
|
||||
listeners[entry->numListeners].procNo = MyProcNumber;
|
||||
listeners[entry->numListeners].listening = false; /* staged, not yet
|
||||
* committed */
|
||||
listeners[entry->numListeners].removeOnAbort = true;
|
||||
entry->numListeners++;
|
||||
|
||||
dshash_release_lock(globalChannelTable, entry);
|
||||
|
|
@ -1766,11 +1764,11 @@ ApplyPendingListenActions(bool isCommit)
|
|||
if (pending->action == PENDING_LISTEN)
|
||||
{
|
||||
/*
|
||||
* LISTEN being committed: set listening=true.
|
||||
* LISTEN being committed: entry is now permanent.
|
||||
* localChannelTable entry was created during
|
||||
* PreCommit and should be kept.
|
||||
*/
|
||||
listeners[i].listening = true;
|
||||
listeners[i].removeOnAbort = false;
|
||||
removeLocal = false;
|
||||
}
|
||||
else
|
||||
|
|
@ -1790,20 +1788,19 @@ ApplyPendingListenActions(bool isCommit)
|
|||
* pendingListenActions entries, so it's pretty hard to
|
||||
* test.
|
||||
*/
|
||||
if (!listeners[i].listening)
|
||||
if (listeners[i].removeOnAbort)
|
||||
{
|
||||
/*
|
||||
* Staged LISTEN (or LISTEN+UNLISTEN) being aborted,
|
||||
* and we weren't listening before, so remove
|
||||
* pre-allocated entries from both tables.
|
||||
* so remove pre-allocated entries from both tables.
|
||||
*/
|
||||
RemoveListenerFromChannel(&entry, listeners, i);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* We're aborting, but the previous state was that
|
||||
* we're listening, so keep localChannelTable entry.
|
||||
* Entry predates this transaction, so keep the
|
||||
* localChannelTable entry.
|
||||
*/
|
||||
removeLocal = false;
|
||||
}
|
||||
|
|
@ -2297,18 +2294,23 @@ SignalBackends(void)
|
|||
listeners = (ListenerEntry *) dsa_get_address(globalChannelDSA,
|
||||
entry->listenersArray);
|
||||
|
||||
/* Identify listeners that now need waking, add them to arrays */
|
||||
/*
|
||||
* Identify listeners that now need waking, add them to arrays.
|
||||
*
|
||||
* Note that we signal listeners regardless of the state of their
|
||||
* removeOnAbort flags. Hence a new listener that reached PreCommit,
|
||||
* but then failed before AtCommit_Notify, can receive a signal even
|
||||
* though it was never really listening. This is okay because it will
|
||||
* not do anything in response to that signal. If we did not do it
|
||||
* like this then a new listener might miss some messages due to the
|
||||
* direct-advance logic below.
|
||||
*/
|
||||
for (int j = 0; j < entry->numListeners; j++)
|
||||
{
|
||||
ProcNumber i;
|
||||
ProcNumber i = listeners[j].procNo;
|
||||
int32 pid;
|
||||
QueuePosition pos;
|
||||
|
||||
if (!listeners[j].listening)
|
||||
continue; /* ignore not-yet-committed listeners */
|
||||
|
||||
i = listeners[j].procNo;
|
||||
|
||||
if (QUEUE_BACKEND_WAKEUP_PENDING(i))
|
||||
continue; /* already signaled, no need to repeat */
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
Parsed test spec with 7 sessions
|
||||
Parsed test spec with 6 sessions
|
||||
|
||||
starting permutation: listenc notify1 notify2 notify3 notifyf
|
||||
step listenc: LISTEN c1; LISTEN c2;
|
||||
|
|
@ -149,13 +149,6 @@ notifier: NOTIFY "c1" with payload "msg15" from notifier
|
|||
notifier: NOTIFY "c1" with payload "msg16" from notifier
|
||||
notifier: NOTIFY "c1" with payload "msg17" from notifier
|
||||
|
||||
starting permutation: listenc llisten l2listen l3listen lslisten
|
||||
step listenc: LISTEN c1; LISTEN c2;
|
||||
step llisten: LISTEN c1; LISTEN c2;
|
||||
step l2listen: LISTEN c1;
|
||||
step l3listen: LISTEN c1;
|
||||
step lslisten: LISTEN c1; LISTEN c2;
|
||||
|
||||
starting permutation: llisten notify1 notify2 notify3 notifyf lcheck
|
||||
step llisten: LISTEN c1; LISTEN c2;
|
||||
step notify1: NOTIFY c1;
|
||||
|
|
@ -204,8 +197,6 @@ listener: NOTIFY "c2" with payload "" from notifier
|
|||
|
||||
starting permutation: l2listen l2begin notify1 lbegins llisten lcommit l2commit l2stop
|
||||
step l2listen: LISTEN c1;
|
||||
listener2: NOTIFY "c1" with payload "" from notifier
|
||||
listener2: NOTIFY "c1" with payload "" from notifier
|
||||
step l2begin: BEGIN;
|
||||
step notify1: NOTIFY c1;
|
||||
step lbegins: BEGIN ISOLATION LEVEL SERIALIZABLE;
|
||||
|
|
|
|||
|
|
@ -68,12 +68,6 @@ step l2begin { BEGIN; }
|
|||
step l2commit { COMMIT; }
|
||||
step l2stop { UNLISTEN *; }
|
||||
|
||||
# Third listener session for testing array growth.
|
||||
|
||||
session listener3
|
||||
step l3listen { LISTEN c1; }
|
||||
teardown { UNLISTEN *; }
|
||||
|
||||
# Listener session for cross-session notification test with channel 'ch'.
|
||||
|
||||
session listener_ch
|
||||
|
|
@ -125,9 +119,6 @@ permutation lunlisten_all notify1 lcheck
|
|||
# Check notification_match function (triggered by hash table duplicate detection).
|
||||
permutation listenc notify_many_with_dup
|
||||
|
||||
# Check ChannelHashAddListener array growth.
|
||||
permutation listenc llisten l2listen l3listen lslisten
|
||||
|
||||
# Cross-backend notification delivery. We use a "select 1" to force the
|
||||
# listener session to check for notifies. In principle we could just wait
|
||||
# for delivery, but that would require extra support in isolationtester
|
||||
|
|
|
|||
Loading…
Reference in a new issue