Use sequentially consistent ordering in the adaptive rwlock handshake

The adaptive isc_rwlock (the modified C-RW-WP variant) synchronizes a
reader against a writer through a store-buffer handshake across two
independent atomic objects: the reader publishes its arrival in
readers_ingress and then reads writers_lock, while the writer publishes
its lock in writers_lock and then reads the reader indicator. With the
acquire/release ordering introduced by the 2021 simplification, neither
side is forced to observe the other's publish store before its own check
load, so on weak-memory targets a reader could see writers_lock unlocked
while the writer sees the indicator empty, and both would enter their
critical sections at once.

Restore the sequentially consistent ordering the original algorithm
specifies on the handshake atomics. The single total order over the
seq_cst operations is what forbids the overlap; targeting individual
fences is both more fragile and, on x86, more expensive. On x86 this
ordering is free (seq_cst loads remain plain loads and the RMWs remain
lock-prefixed); the added cost falls only on the weak-memory targets that
actually need it.
This commit is contained in:
Ondřej Surý 2026-06-02 06:32:20 +02:00
parent edb03e2ee4
commit b11bf7a45e
No known key found for this signature in database
GPG key ID: 2820F37E873DEA41

View file

@ -82,54 +82,74 @@ read_indicator_wait_until_empty(isc_rwlock_t *rwl);
#include <stdio.h>
/*
* The reader/writer handshake below is the store-buffer pattern: each side
* publishes a store (the reader into readers_ingress, the writer into
* writers_lock) and then loads the other side's state. These are independent
* atomic objects, so acquire/release ordering does not order the publish store
* against the later check load, and a reader and a writer could both observe
* the other's slot as free and enter their critical sections at once. The
* operations therefore use memory_order_seq_cst, not acquire/release: the
* single total order over the seq_cst operations is what forbids that overlap.
* Do not weaken these to acquire/release -- doing so reintroduces #6060.
*/
static void
read_indicator_arrive(isc_rwlock_t *rwl) {
(void)atomic_fetch_add_release(&rwl->readers_ingress, 1);
(void)atomic_fetch_add_explicit(&rwl->readers_ingress, 1,
memory_order_seq_cst);
}
static void
read_indicator_depart(isc_rwlock_t *rwl) {
(void)atomic_fetch_add_release(&rwl->readers_egress, 1);
(void)atomic_fetch_add_explicit(&rwl->readers_egress, 1,
memory_order_seq_cst);
}
static bool
read_indicator_isempty(isc_rwlock_t *rwl) {
return atomic_load_acquire(&rwl->readers_egress) ==
atomic_load_acquire(&rwl->readers_ingress);
return atomic_load_explicit(&rwl->readers_egress,
memory_order_seq_cst) ==
atomic_load_explicit(&rwl->readers_ingress,
memory_order_seq_cst);
}
static void
writers_barrier_raise(isc_rwlock_t *rwl) {
(void)atomic_fetch_add_release(&rwl->writers_barrier, 1);
(void)atomic_fetch_add_explicit(&rwl->writers_barrier, 1,
memory_order_seq_cst);
}
static void
writers_barrier_lower(isc_rwlock_t *rwl) {
(void)atomic_fetch_sub_release(&rwl->writers_barrier, 1);
(void)atomic_fetch_sub_explicit(&rwl->writers_barrier, 1,
memory_order_seq_cst);
}
static bool
writers_barrier_israised(isc_rwlock_t *rwl) {
return atomic_load_acquire(&rwl->writers_barrier) > 0;
return atomic_load_explicit(&rwl->writers_barrier,
memory_order_seq_cst) > 0;
}
static bool
writers_lock_islocked(isc_rwlock_t *rwl) {
return atomic_load_acquire(&rwl->writers_lock) == ISC_RWLOCK_LOCKED;
return atomic_load_explicit(&rwl->writers_lock, memory_order_seq_cst) ==
ISC_RWLOCK_LOCKED;
}
static bool
writers_lock_acquire(isc_rwlock_t *rwl) {
return atomic_compare_exchange_weak_acq_rel(
return atomic_compare_exchange_weak_explicit(
&rwl->writers_lock, &(bool){ ISC_RWLOCK_UNLOCKED },
ISC_RWLOCK_LOCKED);
ISC_RWLOCK_LOCKED, memory_order_seq_cst, memory_order_seq_cst);
}
static void
writers_lock_release(isc_rwlock_t *rwl) {
REQUIRE(atomic_compare_exchange_strong_acq_rel(
REQUIRE(atomic_compare_exchange_strong_explicit(
&rwl->writers_lock, &(bool){ ISC_RWLOCK_LOCKED },
ISC_RWLOCK_UNLOCKED));
ISC_RWLOCK_UNLOCKED, memory_order_seq_cst,
memory_order_seq_cst));
}
#define ran_out_of_patience(cnt) (cnt >= RWLOCK_MAX_READER_PATIENCE)
@ -143,6 +163,7 @@ isc_rwlock_rdlock(isc_rwlock_t *rwl) {
while (true) {
read_indicator_arrive(rwl);
if (!writers_lock_islocked(rwl)) {
/* Acquired lock in read-only mode */
break;
@ -169,6 +190,7 @@ isc_rwlock_rdlock(isc_rwlock_t *rwl) {
isc_result_t
isc_rwlock_tryrdlock(isc_rwlock_t *rwl) {
read_indicator_arrive(rwl);
if (writers_lock_islocked(rwl)) {
/* Writer has acquired the lock, release the read lock */
read_indicator_depart(rwl);