diff --git a/src/peers.c b/src/peers.c index 9055760d5..c7c28b1ee 100644 --- a/src/peers.c +++ b/src/peers.c @@ -1608,9 +1608,15 @@ static inline int peer_send_teachmsgs(struct appctx *appctx, struct peer *p, st->last_pushed = updateid; - if (peer_stksess_lookup == peer_teach_process_stksess_lookup && - (int)(st->last_pushed - st->table->commitupdate) > 0) - st->table->commitupdate = st->last_pushed; + if (peer_stksess_lookup == peer_teach_process_stksess_lookup) { + uint commitid = _HA_ATOMIC_LOAD(&st->table->commitupdate); + + while ((int)(updateid - commitid) > 0) { + if (_HA_ATOMIC_CAS(&st->table->commitupdate, &commitid, updateid)) + break; + __ha_cpu_relax(); + } + } /* identifier may not needed in next update message */ new_pushed = 0; @@ -2802,6 +2808,8 @@ static inline void init_accepted_peer(struct peer *peer, struct peers *peers) /* Init cursors */ for (st = peer->tables; st ; st = st->next) { + uint commitid, updateid; + st->last_get = st->last_acked = 0; HA_RWLOCK_WRLOCK(STK_TABLE_LOCK, &st->table->lock); /* if st->update appears to be in future it means @@ -2817,8 +2825,16 @@ static inline void init_accepted_peer(struct peer *peer, struct peers *peers) st->update = st->table->localupdate + (2147483648U); st->teaching_origin = st->last_pushed = st->update; st->flags = 0; - if ((int)(st->last_pushed - st->table->commitupdate) > 0) - st->table->commitupdate = st->last_pushed; + + updateid = st->last_pushed; + commitid = _HA_ATOMIC_LOAD(&st->table->commitupdate); + + while ((int)(updateid - commitid) > 0) { + if (_HA_ATOMIC_CAS(&st->table->commitupdate, &commitid, updateid)) + break; + __ha_cpu_relax(); + } + HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &st->table->lock); } @@ -2857,6 +2873,8 @@ static inline void init_connected_peer(struct peer *peer, struct peers *peers) peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT)); /* Init cursors */ for (st = peer->tables; st ; st = st->next) { + uint updateid, commitid; + st->last_get = st->last_acked = 0; HA_RWLOCK_WRLOCK(STK_TABLE_LOCK, &st->table->lock); /* if st->update appears to be in future it means @@ -2872,8 +2890,16 @@ static inline void init_connected_peer(struct peer *peer, struct peers *peers) st->update = st->table->localupdate + (2147483648U); st->teaching_origin = st->last_pushed = st->update; st->flags = 0; - if ((int)(st->last_pushed - st->table->commitupdate) > 0) - st->table->commitupdate = st->last_pushed; + + updateid = st->last_pushed; + commitid = _HA_ATOMIC_LOAD(&st->table->commitupdate); + + while ((int)(updateid - commitid) > 0) { + if (_HA_ATOMIC_CAS(&st->table->commitupdate, &commitid, updateid)) + break; + __ha_cpu_relax(); + } + HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &st->table->lock); } @@ -3963,7 +3989,7 @@ static int peers_dump_peer(struct buffer *msg, struct appctx *appctx, struct pee st->teaching_origin, st->update); chunk_appendf(&trash, "\n table:%p id=%s update=%u localupdate=%u" " commitupdate=%u refcnt=%u", - t, t->id, t->update, t->localupdate, t->commitupdate, t->refcnt); + t, t->id, t->update, t->localupdate, _HA_ATOMIC_LOAD(&t->commitupdate), t->refcnt); if (flags & PEERS_SHOW_F_DICT) { chunk_appendf(&trash, "\n TX dictionary cache:"); count = 0;