MM-67099 - Membership Sync fix (#35230)

This commit is contained in:
catalintomai 2026-02-16 16:05:04 +01:00 committed by GitHub
parent d5eeebae82
commit 8738f8c4b3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 394 additions and 4 deletions

View file

@ -996,8 +996,11 @@ func TestSharedChannelMembershipSyncSelfReferential(t *testing.T) {
atomic.StoreInt32(countPtr, 0)
}
// Create a new user that will be added "by cluster-2"
// Create a remote user belonging to cluster-2
userFromCluster2 := th.CreateUser(t)
userFromCluster2.RemoteId = &clusters[1].RemoteId
userFromCluster2, appErr = th.App.UpdateUser(th.Context, userFromCluster2, false)
require.Nil(t, appErr)
_, _, appErr = th.App.AddUserToTeam(th.Context, team.Id, userFromCluster2.Id, th.BasicUser.Id)
require.Nil(t, appErr)

View file

@ -77,6 +77,16 @@ func (scs *Service) onReceiveMembershipChanges(syncMsg *model.SyncMsg, rc *model
var successCount, skipCount, failCount int
for _, change := range syncMsg.MembershipChanges {
if change.ChannelId != syncMsg.ChannelId {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "ChannelId mismatch in membership change",
mlog.String("expected", syncMsg.ChannelId),
mlog.String("got", change.ChannelId),
mlog.String("remote_id", rc.RemoteId),
)
failCount++
continue
}
// Check for conflicts
shouldSkip, _ := scs.checkMembershipConflict(change.UserId, change.ChannelId, change.ChangeTime)
if shouldSkip {
@ -140,6 +150,10 @@ func (scs *Service) processMemberAdd(change *model.MembershipChangeMsg, channel
}
}
if user.GetRemoteID() != rc.RemoteId {
return fmt.Errorf("membership add sync failed: %w", ErrRemoteIDMismatch)
}
// Check user permissions for private channels
if channel.Type == model.ChannelTypePrivate {
// Add user to team if needed for private channel
@ -187,11 +201,17 @@ func (scs *Service) processMemberRemove(change *model.MembershipChangeMsg, rc *m
// Continue anyway to update sync status - the channel might be deleted
}
rctx := request.EmptyContext(scs.server.Log())
user, userErr := scs.server.GetStore().User().Get(rctx.Context(), change.UserId)
if userErr != nil {
return fmt.Errorf("cannot get user for channel remove: %w", userErr)
}
if user.GetRemoteID() != rc.RemoteId {
return fmt.Errorf("membership remove sync failed: %w", ErrRemoteIDMismatch)
}
// Use the app layer's remove user method if channel still exists
if channel != nil {
rctx := request.EmptyContext(scs.server.Log())
// We use empty string for removerUserId to indicate system-initiated removal
// This also ensures we bypass permission checks intended for user-initiated removals
appErr := scs.app.RemoveUserFromChannel(rctx, change.UserId, "", channel)
if appErr != nil {
// Ignore "not found" errors - the user might already be removed

View file

@ -0,0 +1,367 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package sharedchannel
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/plugin/plugintest/mock"
"github.com/mattermost/mattermost/server/public/shared/mlog"
"github.com/mattermost/mattermost/server/v8/channels/store/storetest/mocks"
)
// setupMembershipTest creates the common test fixtures for membership sync tests.
// Returns a Service, the mock stores, and IDs for the channel, remote, and team.
func setupMembershipTest(t *testing.T) (*Service, *MockServerIface, *MockAppIface, *mocks.Store, *mocks.SharedChannelStore, *mocks.ChannelStore, *mocks.UserStore) {
t.Helper()
mockServer := &MockServerIface{}
logger := mlog.CreateConsoleTestLogger(t)
mockServer.On("Log").Return(logger)
mockApp := &MockAppIface{}
scs := &Service{
server: mockServer,
app: mockApp,
}
mockStore := &mocks.Store{}
mockSharedChannelStore := &mocks.SharedChannelStore{}
mockChannelStore := &mocks.ChannelStore{}
mockUserStore := &mocks.UserStore{}
mockStore.On("SharedChannel").Return(mockSharedChannelStore)
mockStore.On("Channel").Return(mockChannelStore)
mockStore.On("User").Return(mockUserStore)
mockServer.On("GetStore").Return(mockStore)
// Enable membership sync feature flag
mockConfig := model.Config{}
mockConfig.SetDefaults()
mockConfig.FeatureFlags.EnableSharedChannelsMemberSync = true
mockServer.On("Config").Return(&mockConfig)
return scs, mockServer, mockApp, mockStore, mockSharedChannelStore, mockChannelStore, mockUserStore
}
func TestOnReceiveMembershipChanges_ChannelIdMismatch(t *testing.T) {
scs, _, _, _, mockSharedChannelStore, mockChannelStore, _ := setupMembershipTest(t)
channelId := model.NewId()
remoteId := model.NewId()
rc := &model.RemoteCluster{RemoteId: remoteId}
channel := &model.Channel{Id: channelId, Type: model.ChannelTypeOpen}
mockChannelStore.On("Get", channelId, true).Return(channel, nil)
mockSharedChannelStore.On("Get", channelId).Return(&model.SharedChannel{ChannelId: channelId}, nil)
syncMsg := &model.SyncMsg{
ChannelId: channelId,
MembershipChanges: []*model.MembershipChangeMsg{
{
ChannelId: model.NewId(), // different channel ID
UserId: model.NewId(),
IsAdd: true,
ChangeTime: 1000,
},
},
}
err := scs.onReceiveMembershipChanges(syncMsg, rc, nil)
require.NoError(t, err) // function returns nil even on individual failures
// The conflict check should never have been called since the mismatch was caught first
mockSharedChannelStore.AssertNotCalled(t, "GetUserChanges", mock.Anything, mock.Anything, mock.Anything)
}
func TestProcessMemberAdd_RejectsLocalUser(t *testing.T) {
scs, _, mockApp, _, mockSharedChannelStore, mockChannelStore, mockUserStore := setupMembershipTest(t)
channelId := model.NewId()
remoteId := model.NewId()
localUserId := model.NewId()
rc := &model.RemoteCluster{RemoteId: remoteId}
channel := &model.Channel{Id: channelId, Type: model.ChannelTypeOpen}
mockChannelStore.On("Get", channelId, true).Return(channel, nil)
mockSharedChannelStore.On("Get", channelId).Return(&model.SharedChannel{ChannelId: channelId}, nil)
// Local user has no remote ID
localUser := &model.User{Id: localUserId}
mockUserStore.On("Get", mockTypeContext, localUserId).Return(localUser, nil)
// No conflict
mockSharedChannelStore.On("GetUserChanges", localUserId, channelId, mock.AnythingOfType("int64")).Return([]*model.SharedChannelUser{}, nil)
syncMsg := &model.SyncMsg{
ChannelId: channelId,
MembershipChanges: []*model.MembershipChangeMsg{
{
ChannelId: channelId,
UserId: localUserId,
IsAdd: true,
ChangeTime: 1000,
},
},
}
err := scs.onReceiveMembershipChanges(syncMsg, rc, nil)
require.NoError(t, err) // function returns nil even on individual failures
mockApp.AssertNotCalled(t, "AddUserToChannel", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
}
func TestProcessMemberAdd_RejectsOtherRemoteUser(t *testing.T) {
scs, _, mockApp, _, mockSharedChannelStore, mockChannelStore, mockUserStore := setupMembershipTest(t)
channelId := model.NewId()
remoteId := model.NewId()
otherRemoteId := model.NewId()
userId := model.NewId()
rc := &model.RemoteCluster{RemoteId: remoteId}
channel := &model.Channel{Id: channelId, Type: model.ChannelTypeOpen}
mockChannelStore.On("Get", channelId, true).Return(channel, nil)
mockSharedChannelStore.On("Get", channelId).Return(&model.SharedChannel{ChannelId: channelId}, nil)
// User belongs to a different remote
otherRemoteUser := &model.User{Id: userId, RemoteId: &otherRemoteId}
mockUserStore.On("Get", mockTypeContext, userId).Return(otherRemoteUser, nil)
// No conflict
mockSharedChannelStore.On("GetUserChanges", userId, channelId, mock.AnythingOfType("int64")).Return([]*model.SharedChannelUser{}, nil)
syncMsg := &model.SyncMsg{
ChannelId: channelId,
MembershipChanges: []*model.MembershipChangeMsg{
{
ChannelId: channelId,
UserId: userId,
IsAdd: true,
ChangeTime: 1000,
},
},
}
err := scs.onReceiveMembershipChanges(syncMsg, rc, nil)
require.NoError(t, err)
mockApp.AssertNotCalled(t, "AddUserToChannel", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
}
func TestProcessMemberAdd_AllowsOwnRemoteUser(t *testing.T) {
scs, _, mockApp, _, mockSharedChannelStore, mockChannelStore, mockUserStore := setupMembershipTest(t)
channelId := model.NewId()
remoteId := model.NewId()
userId := model.NewId()
rc := &model.RemoteCluster{RemoteId: remoteId}
channel := &model.Channel{Id: channelId, Type: model.ChannelTypeOpen}
mockChannelStore.On("Get", channelId, true).Return(channel, nil)
mockSharedChannelStore.On("Get", channelId).Return(&model.SharedChannel{ChannelId: channelId}, nil)
// User belongs to the sending remote
remoteUser := &model.User{Id: userId, RemoteId: &remoteId}
mockUserStore.On("Get", mockTypeContext, userId).Return(remoteUser, nil)
// No conflict
mockSharedChannelStore.On("GetUserChanges", userId, channelId, mock.AnythingOfType("int64")).Return([]*model.SharedChannelUser{}, nil)
// Expect the add to proceed
mockApp.On("AddUserToChannel", mockTypeReqContext, mockTypeUser, mockTypeChannel, true).Return(&model.ChannelMember{}, nil)
mockSharedChannelStore.On("UpdateUserLastMembershipSyncAt", userId, channelId, remoteId, int64(1000)).Return(nil)
syncMsg := &model.SyncMsg{
ChannelId: channelId,
MembershipChanges: []*model.MembershipChangeMsg{
{
ChannelId: channelId,
UserId: userId,
IsAdd: true,
ChangeTime: 1000,
},
},
}
err := scs.onReceiveMembershipChanges(syncMsg, rc, nil)
require.NoError(t, err)
mockApp.AssertCalled(t, "AddUserToChannel", mockTypeReqContext, mockTypeUser, mockTypeChannel, true)
}
func TestProcessMemberRemove_RejectsLocalUser(t *testing.T) {
scs, _, mockApp, _, mockSharedChannelStore, mockChannelStore, mockUserStore := setupMembershipTest(t)
channelId := model.NewId()
remoteId := model.NewId()
localUserId := model.NewId()
rc := &model.RemoteCluster{RemoteId: remoteId}
channel := &model.Channel{Id: channelId, Type: model.ChannelTypeOpen}
mockChannelStore.On("Get", channelId, true).Return(channel, nil)
mockSharedChannelStore.On("Get", channelId).Return(&model.SharedChannel{ChannelId: channelId}, nil)
// Local user has no remote ID
localUser := &model.User{Id: localUserId}
mockUserStore.On("Get", mockTypeContext, localUserId).Return(localUser, nil)
// No conflict
mockSharedChannelStore.On("GetUserChanges", localUserId, channelId, mock.AnythingOfType("int64")).Return([]*model.SharedChannelUser{}, nil)
syncMsg := &model.SyncMsg{
ChannelId: channelId,
MembershipChanges: []*model.MembershipChangeMsg{
{
ChannelId: channelId,
UserId: localUserId,
IsAdd: false,
ChangeTime: 1000,
},
},
}
err := scs.onReceiveMembershipChanges(syncMsg, rc, nil)
require.NoError(t, err)
mockApp.AssertNotCalled(t, "RemoveUserFromChannel", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
}
func TestProcessMemberRemove_RejectsOtherRemoteUser(t *testing.T) {
scs, _, mockApp, _, mockSharedChannelStore, mockChannelStore, mockUserStore := setupMembershipTest(t)
channelId := model.NewId()
remoteId := model.NewId()
otherRemoteId := model.NewId()
userId := model.NewId()
rc := &model.RemoteCluster{RemoteId: remoteId}
channel := &model.Channel{Id: channelId, Type: model.ChannelTypeOpen}
mockChannelStore.On("Get", channelId, true).Return(channel, nil)
mockSharedChannelStore.On("Get", channelId).Return(&model.SharedChannel{ChannelId: channelId}, nil)
// User belongs to a different remote
otherRemoteUser := &model.User{Id: userId, RemoteId: &otherRemoteId}
mockUserStore.On("Get", mockTypeContext, userId).Return(otherRemoteUser, nil)
// No conflict
mockSharedChannelStore.On("GetUserChanges", userId, channelId, mock.AnythingOfType("int64")).Return([]*model.SharedChannelUser{}, nil)
syncMsg := &model.SyncMsg{
ChannelId: channelId,
MembershipChanges: []*model.MembershipChangeMsg{
{
ChannelId: channelId,
UserId: userId,
IsAdd: false,
ChangeTime: 1000,
},
},
}
err := scs.onReceiveMembershipChanges(syncMsg, rc, nil)
require.NoError(t, err)
mockApp.AssertNotCalled(t, "RemoveUserFromChannel", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
}
func TestProcessMemberRemove_AllowsOwnRemoteUser(t *testing.T) {
scs, _, mockApp, _, mockSharedChannelStore, mockChannelStore, mockUserStore := setupMembershipTest(t)
channelId := model.NewId()
remoteId := model.NewId()
userId := model.NewId()
rc := &model.RemoteCluster{RemoteId: remoteId}
channel := &model.Channel{Id: channelId, Type: model.ChannelTypeOpen}
mockChannelStore.On("Get", channelId, true).Return(channel, nil)
mockSharedChannelStore.On("Get", channelId).Return(&model.SharedChannel{ChannelId: channelId}, nil)
// User belongs to the sending remote
remoteUser := &model.User{Id: userId, RemoteId: &remoteId}
mockUserStore.On("Get", mockTypeContext, userId).Return(remoteUser, nil)
// No conflict
mockSharedChannelStore.On("GetUserChanges", userId, channelId, mock.AnythingOfType("int64")).Return([]*model.SharedChannelUser{}, nil)
// Expect the remove to proceed
mockApp.On("RemoveUserFromChannel", mockTypeReqContext, userId, "", channel).Return(nil)
mockSharedChannelStore.On("UpdateUserLastMembershipSyncAt", userId, channelId, remoteId, int64(1000)).Return(nil)
syncMsg := &model.SyncMsg{
ChannelId: channelId,
MembershipChanges: []*model.MembershipChangeMsg{
{
ChannelId: channelId,
UserId: userId,
IsAdd: false,
ChangeTime: 1000,
},
},
}
err := scs.onReceiveMembershipChanges(syncMsg, rc, nil)
require.NoError(t, err)
mockApp.AssertCalled(t, "RemoveUserFromChannel", mockTypeReqContext, userId, "", channel)
}
func TestProcessMemberAdd_RejectsLocalUser_ErrorMessage(t *testing.T) {
scs, _, _, _, _, _, mockUserStore := setupMembershipTest(t)
channelId := model.NewId()
remoteId := model.NewId()
localUserId := model.NewId()
rc := &model.RemoteCluster{RemoteId: remoteId}
channel := &model.Channel{Id: channelId, Type: model.ChannelTypeOpen}
// Local user (no remote ID) — needed for the fallback User().Get path
mockUserStore.On("Get", mockTypeContext, localUserId).Return(&model.User{Id: localUserId}, nil)
change := &model.MembershipChangeMsg{
ChannelId: channelId,
UserId: localUserId,
IsAdd: true,
ChangeTime: 1000,
}
syncMsg := &model.SyncMsg{
ChannelId: channelId,
}
err := scs.processMemberAdd(change, channel, rc, 1000, syncMsg)
require.Error(t, err)
assert.ErrorIs(t, err, ErrRemoteIDMismatch)
assert.Contains(t, err.Error(), "membership add sync failed")
}
func TestProcessMemberRemove_RejectsLocalUser_ErrorMessage(t *testing.T) {
scs, _, _, _, _, mockChannelStore, mockUserStore := setupMembershipTest(t)
channelId := model.NewId()
remoteId := model.NewId()
localUserId := model.NewId()
rc := &model.RemoteCluster{RemoteId: remoteId}
channel := &model.Channel{Id: channelId, Type: model.ChannelTypeOpen}
mockChannelStore.On("Get", channelId, true).Return(channel, nil)
// Local user (no remote ID)
mockUserStore.On("Get", mockTypeContext, localUserId).Return(&model.User{Id: localUserId}, nil)
change := &model.MembershipChangeMsg{
ChannelId: channelId,
UserId: localUserId,
IsAdd: false,
ChangeTime: 1000,
}
err := scs.processMemberRemove(change, rc, 1000)
require.Error(t, err)
assert.ErrorIs(t, err, ErrRemoteIDMismatch)
assert.Contains(t, err.Error(), "membership remove sync failed")
}