diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index db6a9a6561b..eee8bc29f38 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -114,15 +114,15 @@ * If the current transaction has executed any LISTEN/UNLISTEN actions, * PreCommit_Notify() prepares to commit those. For LISTEN, it * pre-allocates entries in both the per-backend localChannelTable and the - * shared globalChannelTable (with listening=false so that these entries - * are no-ops for the moment). It also records the final per-channel - * intent in pendingListenActions, so post-commit/abort processing can - * apply that in a single step. Since all these allocations happen before - * committing to clog, we can safely abort the transaction on failure. + * shared globalChannelTable, marking new shared entries removeOnAbort. + * It also records the final per-channel intent in pendingListenActions, + * so post-commit/abort processing can apply that in a single step. + * Since all these allocations happen before committing to clog, we can + * safely abort the transaction on failure. * * After commit, AtCommit_Notify() runs through pendingListenActions and - * updates the backend's per-channel listening flags to activate or - * deactivate listening. This happens before sending signals. + * applies the final per-channel listen/unlisten state. This happens + * before sending signals. * * SignalBackends() consults the shared global channel table to identify * listeners for the channels that the current transaction sent @@ -385,7 +385,7 @@ static SlruDesc NotifySlruDesc; * * This hash table maps (database OID, channel name) keys to arrays of * ProcNumbers representing the backends listening or about to listen - * on each channel. The "listening" flags allow us to create hash table + * on each channel. The removeOnAbort flags allow us to create hash table * entries pre-commit and not have to assume that creating them post-commit * will succeed. */ @@ -400,7 +400,7 @@ typedef struct GlobalChannelKey typedef struct ListenerEntry { ProcNumber procNo; /* listener's ProcNumber */ - bool listening; /* true if committed listener */ + bool removeOnAbort; /* remove entry if current xact aborts */ } ListenerEntry; typedef struct GlobalChannelEntry @@ -1523,9 +1523,8 @@ BecomeRegisteredListener(void) * * Prepare a LISTEN by recording it in pendingListenActions, pre-allocating * an entry in localChannelTable, and pre-allocating an entry in the shared - * globalChannelTable with listening=false. The listening flag will be set - * to true in AtCommit_Notify. If we abort later, unwanted table entries - * will be removed. + * globalChannelTable with removeOnAbort set. AtCommit_Notify will clear + * removeOnAbort; abort processing will remove entries still marked so. */ static void PrepareTableEntriesForListen(const char *channel) @@ -1557,7 +1556,7 @@ PrepareTableEntriesForListen(const char *channel) */ (void) hash_search(localChannelTable, channel, HASH_ENTER, NULL); - /* Pre-allocate entry in shared globalChannelTable with listening=false */ + /* Pre-allocate entry in shared globalChannelTable */ GlobalChannelKeyInit(&key, MyDatabaseId, channel); entry = dshash_find_or_insert(globalChannelTable, &key, &found); @@ -1592,7 +1591,7 @@ PrepareTableEntriesForListen(const char *channel) { if (listeners[i].procNo == MyProcNumber) { - /* Already have an entry; listening flag stays as-is until commit */ + /* Already have an entry; leave removeOnAbort as-is */ dshash_release_lock(globalChannelTable, entry); return; } @@ -1615,8 +1614,7 @@ PrepareTableEntriesForListen(const char *channel) } listeners[entry->numListeners].procNo = MyProcNumber; - listeners[entry->numListeners].listening = false; /* staged, not yet - * committed */ + listeners[entry->numListeners].removeOnAbort = true; entry->numListeners++; dshash_release_lock(globalChannelTable, entry); @@ -1766,11 +1764,11 @@ ApplyPendingListenActions(bool isCommit) if (pending->action == PENDING_LISTEN) { /* - * LISTEN being committed: set listening=true. + * LISTEN being committed: entry is now permanent. * localChannelTable entry was created during * PreCommit and should be kept. */ - listeners[i].listening = true; + listeners[i].removeOnAbort = false; removeLocal = false; } else @@ -1790,20 +1788,19 @@ ApplyPendingListenActions(bool isCommit) * pendingListenActions entries, so it's pretty hard to * test. */ - if (!listeners[i].listening) + if (listeners[i].removeOnAbort) { /* * Staged LISTEN (or LISTEN+UNLISTEN) being aborted, - * and we weren't listening before, so remove - * pre-allocated entries from both tables. + * so remove pre-allocated entries from both tables. */ RemoveListenerFromChannel(&entry, listeners, i); } else { /* - * We're aborting, but the previous state was that - * we're listening, so keep localChannelTable entry. + * Entry predates this transaction, so keep the + * localChannelTable entry. */ removeLocal = false; } @@ -2297,18 +2294,23 @@ SignalBackends(void) listeners = (ListenerEntry *) dsa_get_address(globalChannelDSA, entry->listenersArray); - /* Identify listeners that now need waking, add them to arrays */ + /* + * Identify listeners that now need waking, add them to arrays. + * + * Note that we signal listeners regardless of the state of their + * removeOnAbort flags. Hence a new listener that reached PreCommit, + * but then failed before AtCommit_Notify, can receive a signal even + * though it was never really listening. This is okay because it will + * not do anything in response to that signal. If we did not do it + * like this then a new listener might miss some messages due to the + * direct-advance logic below. + */ for (int j = 0; j < entry->numListeners; j++) { - ProcNumber i; + ProcNumber i = listeners[j].procNo; int32 pid; QueuePosition pos; - if (!listeners[j].listening) - continue; /* ignore not-yet-committed listeners */ - - i = listeners[j].procNo; - if (QUEUE_BACKEND_WAKEUP_PENDING(i)) continue; /* already signaled, no need to repeat */ diff --git a/src/test/isolation/expected/async-notify.out b/src/test/isolation/expected/async-notify.out index 5d6bcce2b02..55b7cbc6e02 100644 --- a/src/test/isolation/expected/async-notify.out +++ b/src/test/isolation/expected/async-notify.out @@ -1,4 +1,4 @@ -Parsed test spec with 7 sessions +Parsed test spec with 6 sessions starting permutation: listenc notify1 notify2 notify3 notifyf step listenc: LISTEN c1; LISTEN c2; @@ -149,13 +149,6 @@ notifier: NOTIFY "c1" with payload "msg15" from notifier notifier: NOTIFY "c1" with payload "msg16" from notifier notifier: NOTIFY "c1" with payload "msg17" from notifier -starting permutation: listenc llisten l2listen l3listen lslisten -step listenc: LISTEN c1; LISTEN c2; -step llisten: LISTEN c1; LISTEN c2; -step l2listen: LISTEN c1; -step l3listen: LISTEN c1; -step lslisten: LISTEN c1; LISTEN c2; - starting permutation: llisten notify1 notify2 notify3 notifyf lcheck step llisten: LISTEN c1; LISTEN c2; step notify1: NOTIFY c1; @@ -204,8 +197,6 @@ listener: NOTIFY "c2" with payload "" from notifier starting permutation: l2listen l2begin notify1 lbegins llisten lcommit l2commit l2stop step l2listen: LISTEN c1; -listener2: NOTIFY "c1" with payload "" from notifier -listener2: NOTIFY "c1" with payload "" from notifier step l2begin: BEGIN; step notify1: NOTIFY c1; step lbegins: BEGIN ISOLATION LEVEL SERIALIZABLE; diff --git a/src/test/isolation/specs/async-notify.spec b/src/test/isolation/specs/async-notify.spec index d09c2297f09..7aef2e8d180 100644 --- a/src/test/isolation/specs/async-notify.spec +++ b/src/test/isolation/specs/async-notify.spec @@ -68,12 +68,6 @@ step l2begin { BEGIN; } step l2commit { COMMIT; } step l2stop { UNLISTEN *; } -# Third listener session for testing array growth. - -session listener3 -step l3listen { LISTEN c1; } -teardown { UNLISTEN *; } - # Listener session for cross-session notification test with channel 'ch'. session listener_ch @@ -125,9 +119,6 @@ permutation lunlisten_all notify1 lcheck # Check notification_match function (triggered by hash table duplicate detection). permutation listenc notify_many_with_dup -# Check ChannelHashAddListener array growth. -permutation listenc llisten l2listen l3listen lslisten - # Cross-backend notification delivery. We use a "select 1" to force the # listener session to check for notifies. In principle we could just wait # for delivery, but that would require extra support in isolationtester