diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 26afd8f0af9..661d68ad653 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -1186,55 +1186,70 @@ replorigin_session_setup(ReplOriginId node, int acquired_by) if (curstate->roident != node) continue; - else if (curstate->acquired_by != 0 && acquired_by == 0) + if (acquired_by == 0) { - ereport(ERROR, - (errcode(ERRCODE_OBJECT_IN_USE), - errmsg("replication origin with ID %d is already active for PID %d", - curstate->roident, curstate->acquired_by))); + /* With acquired_by == 0, we need the origin to be free */ + if (curstate->acquired_by != 0) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("replication origin with ID %d is already active for PID %d", + curstate->roident, curstate->acquired_by))); + } + else if (curstate->refcount > 0) + { + /* + * The origin is in use, but PID is not recorded. This can + * happen if the process that originally acquired the origin + * exited without releasing it. To ensure correctness, other + * processes cannot acquire the origin until all processes + * currently using it have released it. + */ + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("replication origin with ID %d is already active in another process", + curstate->roident))); + } } - - else if (curstate->acquired_by != acquired_by) + else { - ereport(ERROR, - (errcode(ERRCODE_OBJECT_IN_USE), - errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d", - node, acquired_by))); - } + /* + * With acquired_by != 0, we need the origin to be active by the + * given PID + */ + if (curstate->acquired_by != acquired_by) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("replication origin with ID %d is not active for PID %d", + curstate->roident, acquired_by))); - /* - * The origin is in use, but PID is not recorded. This can happen if - * the process that originally acquired the origin exited without - * releasing it. To ensure correctness, other processes cannot acquire - * the origin until all processes currently using it have released it. - */ - else if (curstate->acquired_by == 0 && curstate->refcount > 0) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_IN_USE), - errmsg("replication origin with ID %d is already active in another process", - curstate->roident))); + /* + * Here, it is okay to have refcount > 0 as more than one process + * can safely re-use the origin. + */ + } /* ok, found slot */ session_replication_state = curstate; break; } - - if (session_replication_state == NULL && free_slot == -1) - ereport(ERROR, - (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), - errmsg("could not find free replication state slot for replication origin with ID %d", - node), - errhint("Increase \"max_active_replication_origins\" and try again."))); - else if (session_replication_state == NULL) + if (session_replication_state == NULL) { - if (acquired_by) + if (acquired_by != 0) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot use PID %d for inactive replication origin with ID %d", acquired_by, node))); /* initialize new slot */ + if (free_slot == -1) + ereport(ERROR, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("could not find free replication state slot for replication origin with ID %d", + node), + errhint("Increase \"max_active_replication_origins\" and try again."))); + session_replication_state = &replication_states[free_slot]; Assert(!XLogRecPtrIsValid(session_replication_state->remote_lsn)); Assert(!XLogRecPtrIsValid(session_replication_state->local_lsn));