MM-52600: [Shared Channels] Shared channels do not sync channel membership (#30976)

This commit is contained in:
catalintomai 2025-06-15 10:07:56 +02:00 committed by GitHub
parent 0082e3e94d
commit fa1c77d9b0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
37 changed files with 3371 additions and 85 deletions

View file

@ -1711,6 +1711,13 @@ func (a *App) addUserToChannel(c request.CTX, user *model.User, channel *model.C
a.Srv().Platform().InvalidateChannelCacheForUser(user.Id)
a.invalidateCacheForChannelMembers(channel.Id)
// Synchronize membership change for shared channels
if channel.IsShared() {
if scs := a.Srv().Platform().GetSharedChannelService(); scs != nil {
scs.HandleMembershipChange(channel.Id, user.Id, true, user.GetRemoteID())
}
}
return newMember, nil
}
@ -2236,7 +2243,12 @@ func (s *Server) getChannelMemberLastViewedAt(c request.CTX, channelID string, u
}
func (a *App) GetChannelMembersPage(c request.CTX, channelID string, page, perPage int) (model.ChannelMembers, *model.AppError) {
channelMembers, err := a.Srv().Store().Channel().GetMembers(channelID, page*perPage, perPage)
opts := model.ChannelMembersGetOptions{
ChannelID: channelID,
Offset: page * perPage,
Limit: perPage,
}
channelMembers, err := a.Srv().Store().Channel().GetMembers(opts)
if err != nil {
return nil, model.NewAppError("GetChannelMembersPage", "app.channel.get_members.app_error", nil, "", http.StatusInternalServerError).Wrap(err)
}
@ -2740,6 +2752,14 @@ func (a *App) removeUserFromChannel(c request.CTX, userIDToRemove string, remove
userMsg.Add("remover_id", removerUserId)
a.Publish(userMsg)
// Synchronize membership change for shared channels
if channel.IsShared() {
// isAdd=false, empty remoteId means locally initiated
if scs := a.Srv().Platform().GetSharedChannelService(); scs != nil {
scs.HandleMembershipChange(channel.Id, userIDToRemove, false, "")
}
}
return nil
}
@ -3639,7 +3659,12 @@ func (a *App) forEachChannelMember(c request.CTX, channelID string, f func(model
page := 0
for {
channelMembers, err := a.Srv().Store().Channel().GetMembers(channelID, page*perPage, perPage)
opts := model.ChannelMembersGetOptions{
ChannelID: channelID,
Offset: page * perPage,
Limit: perPage,
}
channelMembers, err := a.Srv().Store().Channel().GetMembers(opts)
if err != nil {
return err
}

View file

@ -2513,8 +2513,16 @@ func TestClearChannelMembersCache(t *testing.T) {
ChannelId: "1",
})
}
mockChannelStore.On("GetMembers", "channelID", 0, 100).Return(cms, nil)
mockChannelStore.On("GetMembers", "channelID", 100, 100).Return(model.ChannelMembers{
mockChannelStore.On("GetMembers", model.ChannelMembersGetOptions{
ChannelID: "channelID",
Offset: 0,
Limit: 100,
}).Return(cms, nil)
mockChannelStore.On("GetMembers", model.ChannelMembersGetOptions{
ChannelID: "channelID",
Offset: 100,
Limit: 100,
}).Return(model.ChannelMembers{
model.ChannelMember{
ChannelId: "1",
},

View file

@ -23,6 +23,7 @@ type SharedChannelServiceIFace interface {
CheckChannelNotShared(channelID string) error
CheckChannelIsShared(channelID string) error
CheckCanInviteToSharedChannel(channelId string) error
HandleMembershipChange(channelID, userID string, isAdd bool, remoteID string)
}
type MockOptionSharedChannelService func(service *mockSharedChannelService)
@ -77,3 +78,7 @@ func (mrcs *mockSharedChannelService) SendChannelInvite(channel *model.Channel,
func (mrcs *mockSharedChannelService) NumInvitations() int {
return mrcs.numInvitations
}
func (mrcs *mockSharedChannelService) HandleMembershipChange(channelID, userID string, isAdd bool, remoteID string) {
// This is a mock implementation - it doesn't need to do anything
}

File diff suppressed because it is too large Load diff

View file

@ -26,6 +26,7 @@ type SharedChannelServiceIFace interface {
CheckChannelNotShared(channelID string) error
CheckChannelIsShared(channelID string) error
CheckCanInviteToSharedChannel(channelId string) error
HandleMembershipChange(channelID, userID string, isAdd bool, remoteID string)
}
func NewMockSharedChannelService(service SharedChannelServiceIFace) *mockSharedChannelService {
@ -91,3 +92,9 @@ func (mrcs *mockSharedChannelService) SendChannelInvite(channel *model.Channel,
func (mrcs *mockSharedChannelService) NumInvitations() int {
return mrcs.numInvitations
}
func (mrcs *mockSharedChannelService) HandleMembershipChange(channelID, userID string, isAdd bool, remoteID string) {
if mrcs.SharedChannelServiceIFace != nil {
mrcs.SharedChannelServiceIFace.HandleMembershipChange(channelID, userID, isAdd, remoteID)
}
}

View file

@ -105,6 +105,27 @@ func (h *SelfReferentialSyncHandler) HandleRequest(w http.ResponseWriter, r *htt
}
}
// Handle membership sync using unified field
if len(syncMsg.MembershipChanges) > 0 {
batch := make([]string, 0)
for _, change := range syncMsg.MembershipChanges {
if change.IsAdd {
syncResp.UsersSyncd = append(syncResp.UsersSyncd, change.UserId)
batch = append(batch, change.UserId)
}
}
// Call appropriate callback
if len(batch) > 0 {
if h.OnBatchSync != nil {
h.OnBatchSync(batch, currentCall)
}
if len(batch) == 1 && h.OnIndividualSync != nil {
h.OnIndividualSync(batch[0], currentCall)
}
}
}
_ = response.SetPayload(syncResp)
}
}
@ -135,28 +156,36 @@ func (h *SelfReferentialSyncHandler) GetSyncMessageCount() int32 {
return atomic.LoadInt32(h.syncMessageCount)
}
// GetUsersFromSyncMsg extracts user IDs from a sync message
func GetUsersFromSyncMsg(msg model.SyncMsg) []string {
var userIds []string
// Extract from users field
for userId := range msg.Users {
userIds = append(userIds, userId)
}
return userIds
}
// EnsureCleanState ensures a clean test state by removing all shared channels, remote clusters,
// and extra team/channel members. This helps prevent state pollution between tests.
func EnsureCleanState(t *testing.T, th *TestHelper, ss store.Store) {
t.Helper()
// First, wait for any pending async tasks to complete, then shutdown services
scsInterface := th.App.Srv().GetSharedChannelSyncService()
if scsInterface != nil && scsInterface.Active() {
// Cast to concrete type to access testing methods
if service, ok := scsInterface.(*sharedchannel.Service); ok {
// Wait for any pending tasks from previous tests to complete
require.Eventually(t, func() bool {
return !service.HasPendingTasksForTesting()
}, 10*time.Second, 100*time.Millisecond, "All pending sync tasks should complete before cleanup")
}
// Shutdown the shared channel service to stop any async operations
_ = scsInterface.Shutdown()
// Wait for shutdown to complete with more time
require.Eventually(t, func() bool {
return !scsInterface.Active()
}, 5*time.Second, 100*time.Millisecond, "Shared channel service should be inactive after shutdown")
}
// Clear all shared channels and remotes from previous tests
allSharedChannels, _ := ss.SharedChannel().GetAll(0, 1000, model.SharedChannelFilterOpts{})
for _, sc := range allSharedChannels {
// Delete all remotes for this channel
remotes, _ := ss.SharedChannel().GetRemotes(0, 100, model.SharedChannelRemoteFilterOpts{ChannelId: sc.ChannelId})
remotes, _ := ss.SharedChannel().GetRemotes(0, 999999, model.SharedChannelRemoteFilterOpts{ChannelId: sc.ChannelId})
for _, remote := range remotes {
_, _ = ss.SharedChannel().DeleteRemote(remote.Id)
}
@ -170,13 +199,32 @@ func EnsureCleanState(t *testing.T, th *TestHelper, ss store.Store) {
_, _ = ss.RemoteCluster().Delete(rc.RemoteId)
}
// Clear all SharedChannelUsers sync state - this is critical for test isolation
// The SharedChannelUsers table tracks per-user sync timestamps that can interfere between tests
_, _ = th.SQLStore.GetMaster().Exec("DELETE FROM SharedChannelUsers WHERE 1=1")
// Clear all SharedChannelAttachments sync state
_, _ = th.SQLStore.GetMaster().Exec("DELETE FROM SharedChannelAttachments WHERE 1=1")
// Reset sync cursors in any remaining SharedChannelRemotes (before they get deleted)
// This ensures cursors don't persist if deletion fails
_, _ = th.SQLStore.GetMaster().Exec(`UPDATE SharedChannelRemotes SET
LastPostCreateAt = 0,
LastPostCreateId = '',
LastPostUpdateAt = 0,
LastPostId = '',
LastMembersSyncAt = 0
WHERE 1=1`)
// Remove all channel members from test channels (except the basic team/channel setup)
channels, _ := ss.Channel().GetAll(th.BasicTeam.Id)
for _, channel := range channels {
// Skip direct message and group channels, and skip the default channels
if channel.Type != model.ChannelTypeDirect && channel.Type != model.ChannelTypeGroup &&
channel.Id != th.BasicChannel.Id {
members, _ := ss.Channel().GetMembers(channel.Id, 0, 10000)
members, _ := ss.Channel().GetMembers(model.ChannelMembersGetOptions{
ChannelID: channel.Id,
})
for _, member := range members {
_ = ss.Channel().RemoveMember(th.Context, channel.Id, member.UserId)
}
@ -228,12 +276,16 @@ func EnsureCleanState(t *testing.T, th *TestHelper, ss store.Store) {
cfg.ConnectedWorkspacesSettings.GlobalUserSyncBatchSize = &defaultBatchSize
})
// Ensure services are running and ready
scsInterface := th.App.Srv().GetSharedChannelSyncService()
if scs, ok := scsInterface.(*sharedchannel.Service); ok {
require.Eventually(t, func() bool {
return scs.Active()
}, 2*time.Second, 100*time.Millisecond, "Shared channel service should be active")
// Restart services and ensure they are running and ready
if scsInterface != nil {
// Restart the shared channel service
_ = scsInterface.Start()
if scs, ok := scsInterface.(*sharedchannel.Service); ok {
require.Eventually(t, func() bool {
return scs.Active()
}, 5*time.Second, 100*time.Millisecond, "Shared channel service should be active after restart")
}
}
rcService := th.App.Srv().GetRemoteClusterService()
@ -243,6 +295,6 @@ func EnsureCleanState(t *testing.T, th *TestHelper, ss store.Store) {
}
require.Eventually(t, func() bool {
return rcService.Active()
}, 2*time.Second, 100*time.Millisecond, "Remote cluster service should be active")
}, 5*time.Second, 100*time.Millisecond, "Remote cluster service should be active")
}
}

View file

@ -16,6 +16,7 @@ func setupSharedChannels(tb testing.TB) *TestHelper {
return SetupConfig(tb, func(cfg *model.Config) {
*cfg.ConnectedWorkspacesSettings.EnableRemoteClusterService = true
*cfg.ConnectedWorkspacesSettings.EnableSharedChannels = true
cfg.FeatureFlags.EnableSharedChannelsMemberSync = true
})
}

View file

@ -275,6 +275,8 @@ channels/db/migrations/mysql/000138_add_default_category_name_to_channel.down.sq
channels/db/migrations/mysql/000138_add_default_category_name_to_channel.up.sql
channels/db/migrations/mysql/000139_remoteclusters_add_last_global_user_sync_at.down.sql
channels/db/migrations/mysql/000139_remoteclusters_add_last_global_user_sync_at.up.sql
channels/db/migrations/mysql/000140_add_lastmemberssyncat_to_sharedchannelremotes.down.sql
channels/db/migrations/mysql/000140_add_lastmemberssyncat_to_sharedchannelremotes.up.sql
channels/db/migrations/postgres/000001_create_teams.down.sql
channels/db/migrations/postgres/000001_create_teams.up.sql
channels/db/migrations/postgres/000002_create_team_members.down.sql
@ -551,3 +553,5 @@ channels/db/migrations/postgres/000138_add_default_category_name_to_channel.down
channels/db/migrations/postgres/000138_add_default_category_name_to_channel.up.sql
channels/db/migrations/postgres/000139_remoteclusters_add_last_global_user_sync_at.down.sql
channels/db/migrations/postgres/000139_remoteclusters_add_last_global_user_sync_at.up.sql
channels/db/migrations/postgres/000140_add_lastmemberssyncat_to_sharedchannelremotes.down.sql
channels/db/migrations/postgres/000140_add_lastmemberssyncat_to_sharedchannelremotes.up.sql

View file

@ -0,0 +1,29 @@
SET @preparedStatement = (SELECT IF(
(
SELECT COUNT(*) FROM INFORMATION_SCHEMA.COLUMNS
WHERE table_name = 'SharedChannelRemotes'
AND table_schema = DATABASE()
AND column_name = 'LastMembersSyncAt'
) > 0,
'ALTER TABLE SharedChannelRemotes DROP COLUMN LastMembersSyncAt;',
'SELECT 1'
));
PREPARE alterIfExists FROM @preparedStatement;
EXECUTE alterIfExists;
DEALLOCATE PREPARE alterIfExists;
SET @preparedStatement2 = (SELECT IF(
(
SELECT COUNT(*) FROM INFORMATION_SCHEMA.COLUMNS
WHERE table_name = 'SharedChannelUsers'
AND table_schema = DATABASE()
AND column_name = 'LastMembershipSyncAt'
) > 0,
'ALTER TABLE SharedChannelUsers DROP COLUMN LastMembershipSyncAt;',
'SELECT 1'
));
PREPARE alterIfExists2 FROM @preparedStatement2;
EXECUTE alterIfExists2;
DEALLOCATE PREPARE alterIfExists2;

View file

@ -0,0 +1,29 @@
SET @preparedStatement = (SELECT IF(
(
SELECT COUNT(*) FROM INFORMATION_SCHEMA.COLUMNS
WHERE table_name = 'SharedChannelRemotes'
AND table_schema = DATABASE()
AND column_name = 'LastMembersSyncAt'
) > 0,
'SELECT 1',
'ALTER TABLE SharedChannelRemotes ADD LastMembersSyncAt bigint DEFAULT 0;'
));
PREPARE alterIfNotExists FROM @preparedStatement;
EXECUTE alterIfNotExists;
DEALLOCATE PREPARE alterIfNotExists;
SET @preparedStatement2 = (SELECT IF(
(
SELECT COUNT(*) FROM INFORMATION_SCHEMA.COLUMNS
WHERE table_name = 'SharedChannelUsers'
AND table_schema = DATABASE()
AND column_name = 'LastMembershipSyncAt'
) > 0,
'SELECT 1',
'ALTER TABLE SharedChannelUsers ADD LastMembershipSyncAt bigint DEFAULT 0;'
));
PREPARE alterIfNotExists2 FROM @preparedStatement2;
EXECUTE alterIfNotExists2;
DEALLOCATE PREPARE alterIfNotExists2;

View file

@ -0,0 +1,2 @@
ALTER TABLE sharedchannelremotes DROP COLUMN IF EXISTS lastmemberssyncat;
ALTER TABLE sharedchannelusers DROP COLUMN IF EXISTS lastmembershipsyncat;

View file

@ -0,0 +1,2 @@
ALTER TABLE sharedchannelremotes ADD COLUMN IF NOT EXISTS lastmemberssyncat bigint DEFAULT 0;
ALTER TABLE sharedchannelusers ADD COLUMN IF NOT EXISTS lastmembershipsyncat bigint DEFAULT 0;

View file

@ -2081,11 +2081,11 @@ func (s *RetryLayerChannelStore) GetMemberLastViewedAt(ctx context.Context, chan
}
func (s *RetryLayerChannelStore) GetMembers(channelID string, offset int, limit int) (model.ChannelMembers, error) {
func (s *RetryLayerChannelStore) GetMembers(opts model.ChannelMembersGetOptions) (model.ChannelMembers, error) {
tries := 0
for {
result, err := s.ChannelStore.GetMembers(channelID, offset, limit)
result, err := s.ChannelStore.GetMembers(opts)
if err == nil {
return result, nil
}
@ -11642,6 +11642,27 @@ func (s *RetryLayerSharedChannelStore) GetSingleUser(userID string, channelID st
}
func (s *RetryLayerSharedChannelStore) GetUserChanges(userID string, channelID string, afterTime int64) ([]*model.SharedChannelUser, error) {
tries := 0
for {
result, err := s.SharedChannelStore.GetUserChanges(userID, channelID, afterTime)
if err == nil {
return result, nil
}
if !isRepeatableError(err) {
return result, err
}
tries++
if tries >= 3 {
err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures")
return result, err
}
timepkg.Sleep(100 * timepkg.Millisecond)
}
}
func (s *RetryLayerSharedChannelStore) GetUsersForSync(filter model.GetUsersForSyncFilter) ([]*model.User, error) {
tries := 0
@ -11894,6 +11915,48 @@ func (s *RetryLayerSharedChannelStore) UpdateRemoteCursor(id string, cursor mode
}
func (s *RetryLayerSharedChannelStore) UpdateRemoteMembershipCursor(id string, syncTime int64) error {
tries := 0
for {
err := s.SharedChannelStore.UpdateRemoteMembershipCursor(id, syncTime)
if err == nil {
return nil
}
if !isRepeatableError(err) {
return err
}
tries++
if tries >= 3 {
err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures")
return err
}
timepkg.Sleep(100 * timepkg.Millisecond)
}
}
func (s *RetryLayerSharedChannelStore) UpdateUserLastMembershipSyncAt(userID string, channelID string, remoteID string, syncTime int64) error {
tries := 0
for {
err := s.SharedChannelStore.UpdateUserLastMembershipSyncAt(userID, channelID, remoteID, syncTime)
if err == nil {
return nil
}
if !isRepeatableError(err) {
return err
}
tries++
if tries >= 3 {
err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures")
return err
}
timepkg.Sleep(100 * timepkg.Millisecond)
}
}
func (s *RetryLayerSharedChannelStore) UpdateUserLastSyncAt(userID string, channelID string, remoteID string) error {
tries := 0

View file

@ -2070,22 +2070,34 @@ func (s SqlChannelStore) PatchMultipleMembersNotifyProps(members []*model.Channe
return updated, nil
}
func (s SqlChannelStore) GetMembers(channelID string, offset, limit int) (model.ChannelMembers, error) {
sql, args, err := s.channelMembersForTeamWithSchemeSelectQuery.
func (s SqlChannelStore) GetMembers(opts model.ChannelMembersGetOptions) (model.ChannelMembers, error) {
query := s.channelMembersForTeamWithSchemeSelectQuery.
Where(sq.Eq{
"ChannelId": channelID,
}).
Limit(uint64(limit)).
Offset(uint64(offset)).
ToSql()
"ChannelId": opts.ChannelID,
})
if opts.UpdatedAfter > 0 {
query = query.Where(sq.Gt{"ChannelMembers.LastUpdateAt": opts.UpdatedAfter})
query = query.OrderBy("ChannelMembers.LastUpdateAt")
}
if opts.Limit > 0 {
query = query.Limit(uint64(opts.Limit))
}
if opts.Offset > 0 {
query = query.Offset(uint64(opts.Offset))
}
sql, args, err := query.ToSql()
if err != nil {
return nil, errors.Wrapf(err, "GetMember_ToSql ChannelID=%s", channelID)
return nil, errors.Wrapf(err, "GetMember_ToSql ChannelID=%s", opts.ChannelID)
}
dbMembers := channelMemberWithSchemeRolesList{}
err = s.GetReplica().Select(&dbMembers, sql, args...)
if err != nil {
return nil, errors.Wrapf(err, "failed to get ChannelMembers with channelId=%s", channelID)
return nil, errors.Wrapf(err, "failed to get ChannelMembers with channelId=%s", opts.ChannelID)
}
return dbMembers.ToModel(), nil

View file

@ -417,6 +417,7 @@ func sharedChannelRemoteFields(prefix string) []string {
"COALESCE(" + prefix + "LastPostCreateID,'') AS LastPostCreateID",
prefix + "LastPostUpdateAt",
"COALESCE(" + prefix + "LastPostId,'') AS LastPostUpdateID",
prefix + "LastMembersSyncAt",
}
}
@ -708,6 +709,7 @@ func sharedChannelUserFields(prefix string) []string {
prefix + "RemoteId",
prefix + "CreateAt",
prefix + "LastSyncAt",
prefix + "LastMembershipSyncAt",
}
}
@ -720,7 +722,7 @@ func (s SqlSharedChannelStore) SaveUser(scUser *model.SharedChannelUser) (*model
query, args, err := s.getQueryBuilder().Insert("SharedChannelUsers").
Columns(sharedChannelUserFields("")...).
Values(scUser.Id, scUser.UserId, scUser.ChannelId, scUser.RemoteId, scUser.CreateAt, scUser.LastSyncAt).
Values(scUser.Id, scUser.UserId, scUser.ChannelId, scUser.RemoteId, scUser.CreateAt, scUser.LastSyncAt, scUser.LastMembershipSyncAt).
ToSql()
if err != nil {
return nil, errors.Wrapf(err, "savesharedchanneluser_tosql")
@ -853,6 +855,25 @@ func (s SqlSharedChannelStore) UpdateUserLastSyncAt(userID string, channelID str
return nil
}
// UpdateUserLastMembershipSyncAt updates the LastMembershipSyncAt timestamp for the specified SharedChannelUser using the provided sync time.
func (s SqlSharedChannelStore) UpdateUserLastMembershipSyncAt(userID string, channelID string, remoteID string, syncTime int64) error {
query := s.getQueryBuilder().
Update("SharedChannelUsers AS scu").
Set("LastMembershipSyncAt", sq.Expr("GREATEST(scu.LastMembershipSyncAt, ?)", syncTime)).
Where(sq.Eq{
"scu.UserId": userID,
"scu.ChannelId": channelID,
"scu.RemoteId": remoteID,
})
_, err := s.GetMaster().ExecBuilder(query)
if err != nil {
return fmt.Errorf("failed to update LastMembershipSyncAt for SharedChannelUser with userId=%s, channelId=%s, remoteId=%s: %w",
userID, channelID, remoteID, err)
}
return nil
}
func sharedChannelAttachementFields(prefix string) []string {
if prefix != "" && !strings.HasSuffix(prefix, ".") {
prefix = prefix + "."

View file

@ -0,0 +1,66 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package sqlstore
import (
"database/sql"
"fmt"
"github.com/mattermost/mattermost/server/public/model"
sq "github.com/mattermost/squirrel"
"github.com/pkg/errors"
)
// UpdateRemoteMembershipCursor updates the LastMembersSyncAt timestamp for the specified SharedChannelRemote,
// but only if the new timestamp is greater than the current value.
func (s SqlSharedChannelStore) UpdateRemoteMembershipCursor(id string, syncTime int64) error {
query := s.getQueryBuilder().
Update("SharedChannelRemotes")
query = query.Set("LastMembersSyncAt", sq.Expr("GREATEST(LastMembersSyncAt, ?)", syncTime))
query = query.Where(sq.Eq{"Id": id})
result, err := s.GetMaster().ExecBuilder(query)
if err != nil {
return errors.Wrap(err, "failed to update membership cursor for SharedChannelRemote")
}
count, err := result.RowsAffected()
if err != nil {
return errors.Wrap(err, "failed to determine rows affected")
}
if count == 0 {
return fmt.Errorf("id not found: %s", id)
}
return nil
}
// GetUserChanges gets all SharedChannelUser changes for a given user, channel after a specific time.
// This is used to detect if there are conflicting membership changes.
func (s SqlSharedChannelStore) GetUserChanges(userID string, channelID string, afterTime int64) ([]*model.SharedChannelUser, error) {
squery, args, err := s.getQueryBuilder().
Select(sharedChannelUserFields("")...).
From("SharedChannelUsers").
Where(sq.Eq{"SharedChannelUsers.UserId": userID}).
Where(sq.Eq{"SharedChannelUsers.ChannelId": channelID}).
Where(sq.Gt{"SharedChannelUsers.LastSyncAt": afterTime}).
ToSql()
if err != nil {
return nil, errors.Wrapf(err, "getsharedchanneluserchanges_tosql")
}
users := []*model.SharedChannelUser{}
if err := s.GetReplica().Select(&users, squery, args...); err != nil {
if err == sql.ErrNoRows {
return make([]*model.SharedChannelUser, 0), nil
}
return nil, errors.Wrapf(err, "failed to find shared channel user changes with UserId=%s, ChannelId=%s, afterTime=%d",
userID, channelID, afterTime)
}
return users, nil
}

View file

@ -232,7 +232,7 @@ type ChannelStore interface {
// It replaces existing fields and creates new ones which don't exist.
UpdateMemberNotifyProps(channelID, userID string, props map[string]string) (*model.ChannelMember, error)
PatchMultipleMembersNotifyProps(members []*model.ChannelMemberIdentifier, notifyProps map[string]string) ([]*model.ChannelMember, error)
GetMembers(channelID string, offset, limit int) (model.ChannelMembers, error)
GetMembers(opts model.ChannelMembersGetOptions) (model.ChannelMembers, error)
GetMember(ctx context.Context, channelID string, userID string) (*model.ChannelMember, error)
GetMemberLastViewedAt(ctx context.Context, channelID string, userID string) (int64, error)
GetChannelMembersTimezones(channelID string) ([]model.StringMap, error)
@ -1014,6 +1014,7 @@ type SharedChannelStore interface {
GetRemoteByIds(channelID string, remoteID string) (*model.SharedChannelRemote, error)
GetRemotes(offset, limit int, opts model.SharedChannelRemoteFilterOpts) ([]*model.SharedChannelRemote, error)
UpdateRemoteCursor(id string, cursor model.GetPostsSinceForSyncCursor) error
UpdateRemoteMembershipCursor(id string, syncTime int64) error
DeleteRemote(remoteID string) (bool, error)
GetRemotesStatus(channelID string) ([]*model.SharedChannelRemoteStatus, error)
@ -1021,7 +1022,9 @@ type SharedChannelStore interface {
GetSingleUser(userID string, channelID string, remoteID string) (*model.SharedChannelUser, error)
GetUsersForUser(userID string) ([]*model.SharedChannelUser, error)
GetUsersForSync(filter model.GetUsersForSyncFilter) ([]*model.User, error)
GetUserChanges(userID string, channelID string, afterTime int64) ([]*model.SharedChannelUser, error)
UpdateUserLastSyncAt(userID string, channelID string, remoteID string) error
UpdateUserLastMembershipSyncAt(userID string, channelID string, remoteID string, syncTime int64) error
SaveAttachment(remote *model.SharedChannelAttachment) (*model.SharedChannelAttachment, error)
UpsertAttachment(remote *model.SharedChannelAttachment) (string, error)

View file

@ -73,6 +73,7 @@ func TestChannelStore(t *testing.T, rctx request.CTX, ss store.Store, s SqlStore
t.Run("Save", func(t *testing.T) { testChannelStoreSave(t, rctx, ss) })
t.Run("SaveDirectChannel", func(t *testing.T) { testChannelStoreSaveDirectChannel(t, rctx, ss, s) })
t.Run("CreateDirectChannel", func(t *testing.T) { testChannelStoreCreateDirectChannel(t, rctx, ss) })
t.Run("GetMembersWithCursorPagination", func(t *testing.T) { testChannelStoreGetMembersWithCursorPagination(t, rctx, ss) })
t.Run("Update", func(t *testing.T) { testChannelStoreUpdate(t, rctx, ss) })
t.Run("GetChannelUnread", func(t *testing.T) { testGetChannelUnread(t, rctx, ss) })
t.Run("Get", func(t *testing.T) { testChannelStoreGet(t, rctx, ss, s) })
@ -265,7 +266,7 @@ func testChannelStoreSaveDirectChannel(t *testing.T, rctx request.CTX, ss store.
_, nErr = ss.Channel().SaveDirectChannel(rctx, &o1, &m1, &m2)
require.NoError(t, nErr, "couldn't save direct channel", nErr)
members, nErr := ss.Channel().GetMembers(o1.Id, 0, 100)
members, nErr := ss.Channel().GetMembers(model.ChannelMembersGetOptions{ChannelID: o1.Id, Offset: 0, Limit: 100})
require.NoError(t, nErr)
require.Len(t, members, 2, "should have saved 2 members")
@ -305,7 +306,7 @@ func testChannelStoreSaveDirectChannel(t *testing.T, rctx request.CTX, ss store.
_, nErr = ss.Channel().SaveDirectChannel(rctx, &o1, &m1, &m1)
require.NoError(t, nErr, "couldn't save direct channel", nErr)
members, nErr = ss.Channel().GetMembers(o1.Id, 0, 100)
members, nErr = ss.Channel().GetMembers(model.ChannelMembersGetOptions{ChannelID: o1.Id, Offset: 0, Limit: 100})
require.NoError(t, nErr)
require.Len(t, members, 1, "should have saved just 1 member")
@ -341,11 +342,72 @@ func testChannelStoreCreateDirectChannel(t *testing.T, rctx request.CTX, ss stor
ss.Channel().PermanentDelete(rctx, c1.Id)
}()
members, nErr := ss.Channel().GetMembers(c1.Id, 0, 100)
members, nErr := ss.Channel().GetMembers(model.ChannelMembersGetOptions{ChannelID: c1.Id, Offset: 0, Limit: 100})
require.NoError(t, nErr)
require.Len(t, members, 2, "should have saved 2 members")
}
// testChannelStoreGetMembersWithCursorPagination tests the cursor-based pagination functionality
// of the GetMembers method, using the UpdatedAfter parameter to return only members that were
// updated after a specific timestamp.
func testChannelStoreGetMembersWithCursorPagination(t *testing.T, rctx request.CTX, ss store.Store) {
// Create two users
u1 := &model.User{}
u1.Email = MakeEmail()
u1.Nickname = model.NewId()
_, err := ss.User().Save(rctx, u1)
require.NoError(t, err)
_, nErr := ss.Team().SaveMember(rctx, &model.TeamMember{TeamId: model.NewId(), UserId: u1.Id}, -1)
require.NoError(t, nErr)
u2 := &model.User{}
u2.Email = MakeEmail()
u2.Nickname = model.NewId()
_, err = ss.User().Save(rctx, u2)
require.NoError(t, err)
_, nErr = ss.Team().SaveMember(rctx, &model.TeamMember{TeamId: model.NewId(), UserId: u2.Id}, -1)
require.NoError(t, nErr)
// Create direct channel between the users
c1, nErr := ss.Channel().CreateDirectChannel(rctx, u1, u2)
require.NoError(t, nErr, "couldn't create direct channel", nErr)
defer func() {
ss.Channel().PermanentDeleteMembersByChannel(rctx, c1.Id)
ss.Channel().PermanentDelete(rctx, c1.Id)
}()
// First get all members
members, nErr := ss.Channel().GetMembers(model.ChannelMembersGetOptions{ChannelID: c1.Id, Offset: 0, Limit: 100})
require.NoError(t, nErr)
require.Len(t, members, 2, "should have saved 2 members")
// Ensure members have different LastUpdateAt values by updating one of them after a short delay
time.Sleep(1 * time.Millisecond)
member := members[0]
_, err = ss.Channel().UpdateMember(rctx, &member)
require.NoError(t, err)
// Get members again after the update
members, nErr = ss.Channel().GetMembers(model.ChannelMembersGetOptions{ChannelID: c1.Id, Offset: 0, Limit: 100})
require.NoError(t, nErr)
require.Len(t, members, 2, "should have 2 members")
// Find member with smaller LastUpdateAt
sort.Slice(members, func(i, j int) bool {
return members[i].LastUpdateAt < members[j].LastUpdateAt
})
updateTime := members[0].LastUpdateAt
// Test cursor-based pagination by querying for members updated after that timestamp
membersAfter, nErr := ss.Channel().GetMembers(model.ChannelMembersGetOptions{
ChannelID: c1.Id,
UpdatedAfter: updateTime,
Limit: 100,
})
require.NoError(t, nErr)
require.Len(t, membersAfter, 1, "should have found only 1 member created after the timestamp")
}
func testChannelStoreUpdate(t *testing.T, rctx request.CTX, ss store.Store) {
o1 := model.Channel{}
o1.TeamId = model.NewId()
@ -7867,7 +7929,7 @@ func testChannelStoreRemoveAllDeactivatedMembers(t *testing.T, rctx request.CTX,
require.NoError(t, err)
// Get all the channel members. Check there are 3.
d1, err := ss.Channel().GetMembers(c1.Id, 0, 1000)
d1, err := ss.Channel().GetMembers(model.ChannelMembersGetOptions{ChannelID: c1.Id, Offset: 0, Limit: 1000})
assert.NoError(t, err)
assert.Len(t, d1, 3)
@ -7887,7 +7949,7 @@ func testChannelStoreRemoveAllDeactivatedMembers(t *testing.T, rctx request.CTX,
assert.NoError(t, ss.Channel().RemoveAllDeactivatedMembers(rctx, c1.Id))
// Get all the channel members. Check there is now only 1: m3.
d2, err := ss.Channel().GetMembers(c1.Id, 0, 1000)
d2, err := ss.Channel().GetMembers(model.ChannelMembersGetOptions{ChannelID: c1.Id, Offset: 0, Limit: 1000})
assert.NoError(t, err)
assert.Len(t, d2, 1)
assert.Equal(t, u3.Id, d2[0].UserId)

View file

@ -5340,7 +5340,7 @@ func groupTestpUpdateMembersRoleChannel(t *testing.T, rctx request.CTX, ss store
}
assert.ElementsMatch(t, tt.expectedUpdatedUsers, updatedUserIDs)
members, err := ss.Channel().GetMembers(channel.Id, 0, 100)
members, err := ss.Channel().GetMembers(model.ChannelMembersGetOptions{ChannelID: channel.Id, Offset: 0, Limit: 100})
require.NoError(t, err)
assert.GreaterOrEqual(t, len(members), 4) // sanity check for channel membership

View file

@ -1580,9 +1580,9 @@ func (_m *ChannelStore) GetMemberLastViewedAt(ctx context.Context, channelID str
return r0, r1
}
// GetMembers provides a mock function with given fields: channelID, offset, limit
func (_m *ChannelStore) GetMembers(channelID string, offset int, limit int) (model.ChannelMembers, error) {
ret := _m.Called(channelID, offset, limit)
// GetMembers provides a mock function with given fields: opts
func (_m *ChannelStore) GetMembers(opts model.ChannelMembersGetOptions) (model.ChannelMembers, error) {
ret := _m.Called(opts)
if len(ret) == 0 {
panic("no return value specified for GetMembers")
@ -1590,19 +1590,19 @@ func (_m *ChannelStore) GetMembers(channelID string, offset int, limit int) (mod
var r0 model.ChannelMembers
var r1 error
if rf, ok := ret.Get(0).(func(string, int, int) (model.ChannelMembers, error)); ok {
return rf(channelID, offset, limit)
if rf, ok := ret.Get(0).(func(model.ChannelMembersGetOptions) (model.ChannelMembers, error)); ok {
return rf(opts)
}
if rf, ok := ret.Get(0).(func(string, int, int) model.ChannelMembers); ok {
r0 = rf(channelID, offset, limit)
if rf, ok := ret.Get(0).(func(model.ChannelMembersGetOptions) model.ChannelMembers); ok {
r0 = rf(opts)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(model.ChannelMembers)
}
}
if rf, ok := ret.Get(1).(func(string, int, int) error); ok {
r1 = rf(channelID, offset, limit)
if rf, ok := ret.Get(1).(func(model.ChannelMembersGetOptions) error); ok {
r1 = rf(opts)
} else {
r1 = ret.Error(1)
}

View file

@ -368,6 +368,36 @@ func (_m *SharedChannelStore) GetSingleUser(userID string, channelID string, rem
return r0, r1
}
// GetUserChanges provides a mock function with given fields: userID, channelID, afterTime
func (_m *SharedChannelStore) GetUserChanges(userID string, channelID string, afterTime int64) ([]*model.SharedChannelUser, error) {
ret := _m.Called(userID, channelID, afterTime)
if len(ret) == 0 {
panic("no return value specified for GetUserChanges")
}
var r0 []*model.SharedChannelUser
var r1 error
if rf, ok := ret.Get(0).(func(string, string, int64) ([]*model.SharedChannelUser, error)); ok {
return rf(userID, channelID, afterTime)
}
if rf, ok := ret.Get(0).(func(string, string, int64) []*model.SharedChannelUser); ok {
r0 = rf(userID, channelID, afterTime)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*model.SharedChannelUser)
}
}
if rf, ok := ret.Get(1).(func(string, string, int64) error); ok {
r1 = rf(userID, channelID, afterTime)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetUsersForSync provides a mock function with given fields: filter
func (_m *SharedChannelStore) GetUsersForSync(filter model.GetUsersForSyncFilter) ([]*model.User, error) {
ret := _m.Called(filter)
@ -700,6 +730,42 @@ func (_m *SharedChannelStore) UpdateRemoteCursor(id string, cursor model.GetPost
return r0
}
// UpdateRemoteMembershipCursor provides a mock function with given fields: id, syncTime
func (_m *SharedChannelStore) UpdateRemoteMembershipCursor(id string, syncTime int64) error {
ret := _m.Called(id, syncTime)
if len(ret) == 0 {
panic("no return value specified for UpdateRemoteMembershipCursor")
}
var r0 error
if rf, ok := ret.Get(0).(func(string, int64) error); ok {
r0 = rf(id, syncTime)
} else {
r0 = ret.Error(0)
}
return r0
}
// UpdateUserLastMembershipSyncAt provides a mock function with given fields: userID, channelID, remoteID, syncTime
func (_m *SharedChannelStore) UpdateUserLastMembershipSyncAt(userID string, channelID string, remoteID string, syncTime int64) error {
ret := _m.Called(userID, channelID, remoteID, syncTime)
if len(ret) == 0 {
panic("no return value specified for UpdateUserLastMembershipSyncAt")
}
var r0 error
if rf, ok := ret.Get(0).(func(string, string, string, int64) error); ok {
r0 = rf(userID, channelID, remoteID, syncTime)
} else {
r0 = ret.Error(0)
}
return r0
}
// UpdateUserLastSyncAt provides a mock function with given fields: userID, channelID, remoteID
func (_m *SharedChannelStore) UpdateUserLastSyncAt(userID string, channelID string, remoteID string) error {
ret := _m.Called(userID, channelID, remoteID)

View file

@ -1729,10 +1729,10 @@ func (s *TimerLayerChannelStore) GetMemberLastViewedAt(ctx context.Context, chan
return result, err
}
func (s *TimerLayerChannelStore) GetMembers(channelID string, offset int, limit int) (model.ChannelMembers, error) {
func (s *TimerLayerChannelStore) GetMembers(opts model.ChannelMembersGetOptions) (model.ChannelMembers, error) {
start := time.Now()
result, err := s.ChannelStore.GetMembers(channelID, offset, limit)
result, err := s.ChannelStore.GetMembers(opts)
elapsed := float64(time.Since(start)) / float64(time.Second)
if s.Root.Metrics != nil {
@ -9173,6 +9173,22 @@ func (s *TimerLayerSharedChannelStore) GetSingleUser(userID string, channelID st
return result, err
}
func (s *TimerLayerSharedChannelStore) GetUserChanges(userID string, channelID string, afterTime int64) ([]*model.SharedChannelUser, error) {
start := time.Now()
result, err := s.SharedChannelStore.GetUserChanges(userID, channelID, afterTime)
elapsed := float64(time.Since(start)) / float64(time.Second)
if s.Root.Metrics != nil {
success := "false"
if err == nil {
success = "true"
}
s.Root.Metrics.ObserveStoreMethodDuration("SharedChannelStore.GetUserChanges", success, elapsed)
}
return result, err
}
func (s *TimerLayerSharedChannelStore) GetUsersForSync(filter model.GetUsersForSyncFilter) ([]*model.User, error) {
start := time.Now()
@ -9365,6 +9381,38 @@ func (s *TimerLayerSharedChannelStore) UpdateRemoteCursor(id string, cursor mode
return err
}
func (s *TimerLayerSharedChannelStore) UpdateRemoteMembershipCursor(id string, syncTime int64) error {
start := time.Now()
err := s.SharedChannelStore.UpdateRemoteMembershipCursor(id, syncTime)
elapsed := float64(time.Since(start)) / float64(time.Second)
if s.Root.Metrics != nil {
success := "false"
if err == nil {
success = "true"
}
s.Root.Metrics.ObserveStoreMethodDuration("SharedChannelStore.UpdateRemoteMembershipCursor", success, elapsed)
}
return err
}
func (s *TimerLayerSharedChannelStore) UpdateUserLastMembershipSyncAt(userID string, channelID string, remoteID string, syncTime int64) error {
start := time.Now()
err := s.SharedChannelStore.UpdateUserLastMembershipSyncAt(userID, channelID, remoteID, syncTime)
elapsed := float64(time.Since(start)) / float64(time.Second)
if s.Root.Metrics != nil {
success := "false"
if err == nil {
success = "true"
}
s.Root.Metrics.ObserveStoreMethodDuration("SharedChannelStore.UpdateUserLastMembershipSyncAt", success, elapsed)
}
return err
}
func (s *TimerLayerSharedChannelStore) UpdateUserLastSyncAt(userID string, channelID string, remoteID string) error {
start := time.Now()

View file

@ -290,7 +290,7 @@ func (rcs *Service) pause() {
rcs.server.Log().Debug("Remote Cluster Service inactive")
}
// SetActive forces the service to be active or inactive
// SetActive forces the service to be active or inactive for testing
func (rcs *Service) SetActive(active bool) {
rcs.mux.Lock()
defer rcs.mux.Unlock()

View file

@ -89,6 +89,7 @@ func (scs *Service) SendChannelInvite(channel *model.Channel, userId string, rc
RemoteId: rc.RemoteId,
IsInviteAccepted: true,
IsInviteConfirmed: false,
LastMembersSyncAt: 0,
}
if _, err = scs.server.GetStore().SharedChannel().SaveRemote(scr); err != nil {
scs.sendEphemeralPost(channel.Id, userId, fmt.Sprintf("Error saving channel invite for %s: %v", rc.DisplayName, err))
@ -134,6 +135,7 @@ func (scs *Service) SendChannelInvite(channel *model.Channel, userId string, rc
}
curTime := model.GetMillis()
var sharedChannelRemote *model.SharedChannelRemote
if existingScr != nil {
if existingScr.DeleteAt == 0 && existingScr.IsInviteConfirmed {
// the shared channel remote exists and is not
@ -153,6 +155,7 @@ func (scs *Service) SendChannelInvite(channel *model.Channel, userId string, rc
scs.sendEphemeralPost(channel.Id, userId, fmt.Sprintf("Error confirming channel invite for %s: %v", rc.DisplayName, sErr))
return
}
sharedChannelRemote = existingScr
} else {
// the shared channel remote doesn't exists, so we create it
scr := &model.SharedChannelRemote{
@ -163,15 +166,26 @@ func (scs *Service) SendChannelInvite(channel *model.Channel, userId string, rc
IsInviteConfirmed: true,
LastPostCreateAt: curTime,
LastPostUpdateAt: curTime,
LastMembersSyncAt: 0,
}
if _, err = scs.server.GetStore().SharedChannel().SaveRemote(scr); err != nil {
scs.sendEphemeralPost(channel.Id, userId, fmt.Sprintf("Error confirming channel invite for %s: %v", rc.DisplayName, err))
return
}
sharedChannelRemote = scr
}
scs.NotifyChannelChanged(sc.ChannelId)
scs.sendEphemeralPost(channel.Id, userId, fmt.Sprintf("`%s` has been added to channel.", rc.DisplayName))
// Sync all channel members to the remote now that the remote entry exists
if syncErr := scs.SyncAllChannelMembers(sc.ChannelId, rc.RemoteId, sharedChannelRemote); syncErr != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Failed to sync channel members after invite confirmation",
mlog.String("channel_id", sc.ChannelId),
mlog.String("remote_id", rc.RemoteId),
mlog.Err(syncErr),
)
}
}
if rc.IsPlugin() {
@ -311,6 +325,15 @@ func (scs *Service) onReceiveChannelInvite(msg model.RemoteClusterMsg, rc *model
if _, err := scs.server.GetStore().SharedChannel().UpdateRemote(existingScr); err != nil {
return fmt.Errorf("cannot restore deleted shared channel remote (channel_id=%s): %w", invite.ChannelId, err)
}
// Sync local channel members to the remote after restoring the shared channel
if syncErr := scs.SyncAllChannelMembers(channel.Id, rc.RemoteId, existingScr); syncErr != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Failed to sync local channel members after restoring shared channel",
mlog.String("channel_id", channel.Id),
mlog.String("remote_id", rc.RemoteId),
mlog.Err(syncErr),
)
}
} else {
creatorID := channel.CreatorId
if creatorID == "" {
@ -325,6 +348,7 @@ func (scs *Service) onReceiveChannelInvite(msg model.RemoteClusterMsg, rc *model
RemoteId: rc.RemoteId,
LastPostCreateAt: model.GetMillis(),
LastPostUpdateAt: model.GetMillis(),
LastMembersSyncAt: 0,
}
if _, err := scs.server.GetStore().SharedChannel().SaveRemote(scr); err != nil {
@ -336,6 +360,15 @@ func (scs *Service) onReceiveChannelInvite(msg model.RemoteClusterMsg, rc *model
scs.server.GetStore().SharedChannel().Delete(sharedChannel.ChannelId)
return fmt.Errorf("cannot create shared channel remote (channel_id=%s): %w", invite.ChannelId, err)
}
// Sync local channel members to the remote after accepting the invitation
if syncErr := scs.SyncAllChannelMembers(channel.Id, rc.RemoteId, scr); syncErr != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Failed to sync local channel members after accepting invitation",
mlog.String("channel_id", channel.Id),
mlog.String("remote_id", rc.RemoteId),
mlog.Err(syncErr),
)
}
}
return nil
}

View file

@ -29,6 +29,18 @@ var (
mockTypeContext = mock.MatchedBy(func(ctx context.Context) bool { return true })
)
// setupMockServerWithConfig sets up the standard mocks that all tests need
func setupMockServerWithConfig(mockServer *MockServerIface) {
// Mock Config for feature flag check - disable membership sync to avoid complex mocking
mockConfig := model.Config{}
mockConfig.SetDefaults()
mockConfig.FeatureFlags.EnableSharedChannelsMemberSync = false
mockServer.On("Config").Return(&mockConfig)
// Mock GetRemoteClusterService for feature flag check
mockServer.On("GetRemoteClusterService").Return(nil)
}
func TestOnReceiveChannelInvite(t *testing.T) {
t.Run("when msg payload is empty, it does nothing", func(t *testing.T) {
mockServer := &MockServerIface{}
@ -92,6 +104,8 @@ func TestOnReceiveChannelInvite(t *testing.T) {
mockStore.On("SharedChannel").Return(&mockSharedChannelStore)
mockServer.On("GetStore").Return(mockStore)
setupMockServerWithConfig(mockServer)
createPostPermission := model.ChannelModeratedPermissionsMap[model.PermissionCreatePost.Id]
createReactionPermission := model.ChannelModeratedPermissionsMap[model.PermissionAddReaction.Id]
updateMap := model.ChannelModeratedRolesPatch{
@ -216,6 +230,8 @@ func TestOnReceiveChannelInvite(t *testing.T) {
mockStore.On("SharedChannel").Return(&mockSharedChannelStore)
mockServer.On("GetStore").Return(mockStore)
setupMockServerWithConfig(mockServer)
defer mockApp.AssertExpectations(t)
err = scs.onReceiveChannelInvite(msg, remoteCluster, nil)
@ -351,6 +367,7 @@ func TestOnReceiveChannelInvite(t *testing.T) {
mockServer = scs.server.(*MockServerIface)
mockServer.On("GetStore").Return(mockStore)
setupMockServerWithConfig(mockServer)
mockApp.On("GetOrCreateDirectChannel", mockTypeReqContext, mockTypeString, mockTypeString, mock.AnythingOfType("model.ChannelOption")).
Return(channel, nil).Maybe()

View file

@ -0,0 +1,409 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package sharedchannel
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/shared/mlog"
"github.com/mattermost/mattermost/server/v8/platform/services/remotecluster"
)
// isChannelMemberSyncEnabled checks if the feature flag is enabled and remote cluster service is available
func (scs *Service) isChannelMemberSyncEnabled() bool {
featureFlagEnabled := scs.server.Config().FeatureFlags.EnableSharedChannelsMemberSync
remoteClusterService := scs.server.GetRemoteClusterService()
return featureFlagEnabled && remoteClusterService != nil
}
// queueMembershipSyncTask creates and queues a task to synchronize channel membership changes
func (scs *Service) queueMembershipSyncTask(channelID, userID, remoteID string, syncMsg *model.SyncMsg, retryMsg *model.SyncMsg) {
task := newSyncTask(channelID, userID, remoteID, syncMsg, retryMsg)
task.schedule = time.Now().Add(NotifyMinimumDelay)
scs.addTask(task)
}
// HandleMembershipChange is called when users are added or removed from a shared channel.
// It creates a task to notify all remote clusters about the membership change.
func (scs *Service) HandleMembershipChange(channelID, userID string, isAdd bool, remoteID string) {
if !scs.isChannelMemberSyncEnabled() {
return
}
// Create timestamp for consistent usage
changeTime := model.GetMillis()
// Create membership change info
syncMsg := model.NewSyncMsg(channelID)
syncMsg.MembershipChanges = []*model.MembershipChangeMsg{
{
ChannelId: channelID,
UserId: userID,
IsAdd: isAdd,
RemoteId: remoteID, // which remote initiated this change
ChangeTime: changeTime,
},
}
// Queue the membership change task
scs.queueMembershipSyncTask(channelID, userID, "", syncMsg, nil)
}
// HandleMembershipBatchChange is called to process a batch of membership changes for a shared channel.
// It creates a task to notify all remote clusters about the batch membership changes.
func (scs *Service) HandleMembershipBatchChange(channelID string, userIDs []string, isAdd bool, remoteID string) {
if !scs.isChannelMemberSyncEnabled() {
return
}
if len(userIDs) == 0 {
return
}
// Create timestamp for consistent usage
changeTime := model.GetMillis()
// Create sync message with membership changes
syncMsg := model.NewSyncMsg(channelID)
syncMsg.MembershipChanges = make([]*model.MembershipChangeMsg, 0, len(userIDs))
// Add each user to the batch
for _, userID := range userIDs {
syncMsg.MembershipChanges = append(syncMsg.MembershipChanges, &model.MembershipChangeMsg{
ChannelId: channelID,
UserId: userID,
IsAdd: isAdd,
RemoteId: remoteID,
ChangeTime: changeTime,
})
}
// Queue the batch membership sync task
scs.queueMembershipSyncTask(channelID, "", "", syncMsg, nil)
}
// SyncAllChannelMembers synchronizes all channel members to a specific remote.
// This is typically called when a channel is first shared with a remote cluster.
// If remote is provided, it will be used instead of fetching from the database.
func (scs *Service) SyncAllChannelMembers(channelID string, remoteID string, remote *model.SharedChannelRemote) error {
if !scs.isChannelMemberSyncEnabled() {
return nil
}
// Verify the channel exists and is shared
if _, err := scs.server.GetStore().SharedChannel().Get(channelID); err != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceDebug, "Failed to get shared channel",
mlog.String("channel_id", channelID),
mlog.Err(err),
)
return fmt.Errorf("failed to get shared channel %s: %w", channelID, err)
}
// Get the remote to ensure it exists (if not provided)
if remote == nil {
var err error
remote, err = scs.server.GetStore().SharedChannel().GetRemoteByIds(channelID, remoteID)
if err != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceDebug, "Failed to get remote",
mlog.String("channel_id", channelID),
mlog.String("remote_id", remoteID),
mlog.Err(err),
)
return fmt.Errorf("failed to get remote for channel %s: %w", channelID, err)
}
}
// Use offset-based pagination to handle channels with many members
// This ensures we don't skip members when multiple members have the same LastUpdateAt timestamp
maxPerPage := scs.GetMemberSyncBatchSize()
var allMembers model.ChannelMembers
lastSyncAt := remote.LastMembersSyncAt
offset := 0
// Process members incrementally with offset-based pagination
for {
opts := model.ChannelMembersGetOptions{
ChannelID: channelID,
UpdatedAfter: lastSyncAt,
Limit: maxPerPage,
Offset: offset,
}
members, err1 := scs.server.GetStore().Channel().GetMembers(opts)
if err1 != nil {
return fmt.Errorf("failed to get members for channel %s: %w", channelID, err1)
}
if len(members) == 0 {
break // No more members to process
}
// Add to our collection
allMembers = append(allMembers, members...)
// Log progress when processing large channels
if len(allMembers)%1000 == 0 {
scs.server.Log().Log(mlog.LvlSharedChannelServiceDebug, "Processing channel members in batches",
mlog.String("channel_id", channelID),
mlog.String("remote_id", remoteID),
mlog.Int("processed_so_far", len(allMembers)),
)
}
if len(members) < maxPerPage {
break // Last page
}
// Move to next page
offset += maxPerPage
}
if len(allMembers) == 0 {
scs.server.Log().Log(mlog.LvlSharedChannelServiceDebug, "No members to sync for channel",
mlog.String("channel_id", channelID),
mlog.String("remote_id", remoteID),
)
return nil
}
scs.server.Log().Log(mlog.LvlSharedChannelServiceDebug, "Syncing all channel members",
mlog.String("channel_id", channelID),
mlog.String("remote_id", remoteID),
mlog.Int("member_count", len(allMembers)),
)
// Get batch size from config
batchSize := scs.GetMemberSyncBatchSize()
// For small channels, queue individual membership changes
if len(allMembers) <= batchSize {
return scs.syncMembersIndividually(channelID, remoteID, allMembers, remote)
}
// For larger channels, use batch processing
return scs.syncMembersInBatches(channelID, remoteID, allMembers, remote)
}
// syncMembersIndividually processes each member individually
// This is more efficient for small channels
func (scs *Service) syncMembersIndividually(channelID, remoteID string, members model.ChannelMembers, remote *model.SharedChannelRemote) error {
// Queue individual membership changes for each member
for _, member := range members {
// Queue membership change for this user (isAdd=true)
scs.HandleMembershipChange(channelID, member.UserId, true, "")
}
return nil
}
// syncMembersInBatches processes members in batches for greater efficiency
// This is better for channels with many members
func (scs *Service) syncMembersInBatches(channelID, remoteID string, members model.ChannelMembers, remote *model.SharedChannelRemote) error {
// Get batch size from config
batchSize := scs.GetMemberSyncBatchSize()
for i := 0; i < len(members); i += batchSize {
end := i + batchSize
if end > len(members) {
end = len(members)
}
// Create a batch of members
batchMembers := members[i:end]
// Extract user IDs from the batch
userIDs := make([]string, len(batchMembers))
for j, member := range batchMembers {
userIDs[j] = member.UserId
}
// Use the batch handling function to queue the changes
scs.HandleMembershipBatchChange(channelID, userIDs, true, "")
}
return nil
}
// processMembershipChange processes a channel membership change task.
// It determines which remotes should receive the update and creates tasks for each.
func (scs *Service) processMembershipChange(syncMsg *model.SyncMsg) {
if len(syncMsg.MembershipChanges) == 0 {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Invalid membership change task - no membership changes",
mlog.String("channel_id", syncMsg.ChannelId),
)
return
}
// Get the shared channel (to verify it exists)
_, err := scs.server.GetStore().SharedChannel().Get(syncMsg.ChannelId)
if err != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Failed to get shared channel for membership change",
mlog.String("channel_id", syncMsg.ChannelId),
mlog.Int("change_count", len(syncMsg.MembershipChanges)),
mlog.Err(err),
)
return
}
// Get all remotes for this channel
remotes, err := scs.server.GetStore().SharedChannel().GetRemotes(0, 999999, model.SharedChannelRemoteFilterOpts{
ChannelId: syncMsg.ChannelId,
})
if err != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Failed to get shared channel remotes for membership change",
mlog.String("channel_id", syncMsg.ChannelId),
mlog.Err(err),
)
return
}
// Always use batch processing for consistency (works for single or multiple changes)
scs.syncMembershipBatchToRemotes(syncMsg, remotes)
}
// syncMembershipBatchToRemotes synchronizes membership changes (single or batch) with remote clusters.
func (scs *Service) syncMembershipBatchToRemotes(syncMsg *model.SyncMsg, remotes []*model.SharedChannelRemote) {
if len(syncMsg.MembershipChanges) == 0 {
return
}
// Get the initiating remote ID from the first change (all should be the same)
initiatingRemoteId := ""
if len(syncMsg.MembershipChanges) > 0 {
initiatingRemoteId = syncMsg.MembershipChanges[0].RemoteId
}
// Send to all remotes except the one that initiated this change
for _, remote := range remotes {
// Skip the remote that initiated this change to prevent loops
if remote.RemoteId == initiatingRemoteId {
continue
}
// Get the remote cluster
rc, err := scs.server.GetStore().RemoteCluster().Get(remote.RemoteId, false)
if err != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Failed to get remote cluster for batch membership sync",
mlog.String("remote_id", remote.RemoteId),
mlog.String("channel_id", syncMsg.ChannelId),
mlog.Err(err),
)
continue
}
// Create a copy of the sync message to potentially add user profiles
enrichedSyncMsg := &model.SyncMsg{
Id: syncMsg.Id,
ChannelId: syncMsg.ChannelId,
MembershipChanges: syncMsg.MembershipChanges,
Users: make(map[string]*model.User),
}
// Add user profiles for all users being added
for _, change := range syncMsg.MembershipChanges {
if change.IsAdd {
user, pErr := scs.server.GetStore().User().Get(context.Background(), change.UserId)
if pErr != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceWarn, "Failed to get user for batch membership sync",
mlog.String("user_id", change.UserId),
mlog.String("channel_id", syncMsg.ChannelId),
mlog.String("remote_id", remote.RemoteId),
mlog.Err(pErr),
)
continue
}
// Check if user profile needs to be synced
doSync, _, sErr := scs.shouldUserSync(user, syncMsg.ChannelId, rc)
if sErr == nil && doSync {
enrichedSyncMsg.Users[user.Id] = user
}
}
}
// Send message using the existing remote cluster framework
payload, err := json.Marshal(enrichedSyncMsg)
if err != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Failed to marshal batch membership message",
mlog.String("remote_id", remote.RemoteId),
mlog.String("channel_id", syncMsg.ChannelId),
mlog.Err(err),
)
continue
}
msg := model.RemoteClusterMsg{
Id: model.NewId(),
Topic: TopicChannelMembership,
CreateAt: model.GetMillis(),
Payload: payload,
}
ctx, cancel := context.WithTimeout(context.Background(), remotecluster.SendTimeout)
defer cancel()
// Define a callback function
callback := func(msg model.RemoteClusterMsg, rc *model.RemoteCluster, resp *remotecluster.Response, err error) {
if err != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Error sending batch membership changes to remote",
mlog.String("remote", remote.RemoteId),
mlog.String("channel_id", syncMsg.ChannelId),
mlog.Int("change_count", len(syncMsg.MembershipChanges)),
mlog.Err(err),
)
return
}
if resp != nil && resp.Err != "" {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Remote error when processing batch membership changes",
mlog.String("remote", remote.RemoteId),
mlog.String("channel_id", syncMsg.ChannelId),
mlog.String("remote_error", resp.Err),
)
return
}
// Update sync timestamps
for _, change := range syncMsg.MembershipChanges {
if err := scs.server.GetStore().SharedChannel().UpdateUserLastMembershipSyncAt(change.UserId, change.ChannelId, remote.RemoteId, change.ChangeTime); err != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Failed to update user membership sync timestamp in batch",
mlog.String("user_id", change.UserId),
mlog.Err(err),
)
}
}
// Update the cursor with the latest change time
var maxChangeTime int64
for _, change := range syncMsg.MembershipChanges {
if change.ChangeTime > maxChangeTime {
maxChangeTime = change.ChangeTime
}
}
if err := scs.updateMembershipSyncCursor(syncMsg.ChannelId, remote.RemoteId, maxChangeTime); err != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Failed to update membership sync cursor for batch",
mlog.String("remote_id", remote.RemoteId),
mlog.String("channel_id", syncMsg.ChannelId),
mlog.Err(err),
)
}
}
err = scs.server.GetRemoteClusterService().SendMsg(ctx, msg, rc, callback)
if err != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Failed to send batch membership changes to remote",
mlog.String("remote_id", remote.RemoteId),
mlog.String("channel_id", syncMsg.ChannelId),
mlog.Err(err),
)
}
}
}

View file

@ -0,0 +1,213 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package sharedchannel
import (
"fmt"
"strings"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/shared/mlog"
"github.com/mattermost/mattermost/server/public/shared/request"
"github.com/mattermost/mattermost/server/v8/platform/services/remotecluster"
)
// checkMembershipConflict checks if there are newer changes that would conflict with this one
// Returns true if this change should be skipped due to a conflict
func (scs *Service) checkMembershipConflict(userID, channelID string, changeTime int64) (bool, error) {
conflicts, err := scs.server.GetStore().SharedChannel().GetUserChanges(userID, channelID, changeTime)
if err != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Failed to check for membership change conflicts",
mlog.String("user_id", userID),
mlog.String("channel_id", channelID),
mlog.Err(err),
)
return false, err
}
// If there are conflicting operations, the latest one wins
for _, conflict := range conflicts {
if conflict.LastMembershipSyncAt > changeTime {
scs.server.Log().Log(mlog.LvlSharedChannelServiceDebug, "Ignoring older membership change due to conflict",
mlog.String("user_id", userID),
mlog.String("channel_id", channelID),
mlog.Int("change_time", int(changeTime)),
mlog.Int("conflicting_time", int(conflict.LastMembershipSyncAt)),
)
return true, nil
}
}
return false, nil
}
// onReceiveMembershipChanges processes channel membership changes from a remote cluster
func (scs *Service) onReceiveMembershipChanges(syncMsg *model.SyncMsg, rc *model.RemoteCluster, response *remotecluster.Response) error {
// Check if feature flag is enabled
if !scs.server.Config().FeatureFlags.EnableSharedChannelsMemberSync {
return nil
}
if len(syncMsg.MembershipChanges) == 0 {
return fmt.Errorf("onReceiveMembershipChanges: no membership changes")
}
// Get the channel to make sure it exists and is shared
channel, err := scs.server.GetStore().Channel().Get(syncMsg.ChannelId, true)
if err != nil {
return fmt.Errorf("cannot get channel for membership changes: %w", err)
}
// Verify this is a valid shared channel
_, err = scs.server.GetStore().SharedChannel().Get(syncMsg.ChannelId)
if err != nil {
return fmt.Errorf("cannot get shared channel for membership changes: %w", err)
}
// Calculate the maximum ChangeTime from all changes in the batch
var maxChangeTime int64
for _, change := range syncMsg.MembershipChanges {
if change.ChangeTime > maxChangeTime {
maxChangeTime = change.ChangeTime
}
}
// Process each change
var successCount, skipCount, failCount int
for _, change := range syncMsg.MembershipChanges {
// Check for conflicts
shouldSkip, _ := scs.checkMembershipConflict(change.UserId, change.ChannelId, change.ChangeTime)
if shouldSkip {
skipCount++
continue
}
// Process the membership change based on whether it's an add or remove
var processErr error
if change.IsAdd {
scs.server.Log().Log(mlog.LvlSharedChannelServiceDebug, "Adding user to channel from remote cluster",
mlog.String("user_id", change.UserId),
mlog.String("channel_id", change.ChannelId),
mlog.String("remote_id", rc.RemoteId),
)
processErr = scs.processMemberAdd(change, channel, rc, maxChangeTime)
} else {
scs.server.Log().Log(mlog.LvlSharedChannelServiceDebug, "Removing user from channel from remote cluster",
mlog.String("user_id", change.UserId),
mlog.String("channel_id", change.ChannelId),
mlog.String("remote_id", rc.RemoteId),
)
processErr = scs.processMemberRemove(change, rc, maxChangeTime)
}
if processErr != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Failed to process membership change",
mlog.String("user_id", change.UserId),
mlog.String("channel_id", change.ChannelId),
mlog.String("remote_id", rc.RemoteId),
mlog.Bool("is_add", change.IsAdd),
mlog.Err(processErr),
)
failCount++
continue
}
successCount++
}
return nil
}
// processMemberAdd handles adding a user to a channel as part of batch processing
func (scs *Service) processMemberAdd(change *model.MembershipChangeMsg, channel *model.Channel, rc *model.RemoteCluster, maxChangeTime int64) error {
// Get the user if they exist
user, err := scs.server.GetStore().User().Get(request.EmptyContext(scs.server.Log()).Context(), change.UserId)
if err != nil {
return fmt.Errorf("cannot get user for channel add: %w", err)
}
// Check user permissions for private channels
if channel.Type == model.ChannelTypePrivate {
// Add user to team if needed for private channel
rctx := request.EmptyContext(scs.server.Log())
appErr := scs.app.AddUserToTeamByTeamId(rctx, channel.TeamId, user)
if appErr != nil {
return fmt.Errorf("cannot add user to team for private channel: %w", appErr)
}
}
// Use the app layer to add the user to the channel
// This ensures proper processing of all side effects
rctx := request.EmptyContext(scs.server.Log())
_, appErr := scs.app.AddUserToChannel(rctx, user, channel, false)
if appErr != nil {
// Skip "already added" errors
if appErr.Error() != "api.channel.add_user.to_channel.failed.app_error" &&
!strings.Contains(appErr.Error(), "channel_member_exists") {
return fmt.Errorf("cannot add user to channel: %w", appErr)
}
// User is already in the channel, which is fine
}
// Update the sync status
if syncErr := scs.server.GetStore().SharedChannel().UpdateUserLastMembershipSyncAt(change.UserId, change.ChannelId, rc.RemoteId, maxChangeTime); syncErr != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Failed to update user LastMembershipSyncAt after batch member add",
mlog.String("user_id", change.UserId),
mlog.String("channel_id", change.ChannelId),
mlog.String("remote_id", rc.RemoteId),
mlog.Err(syncErr),
)
// Continue despite the error - this is not critical
}
return nil
}
// processMemberRemove handles removing a user from a channel as part of batch processing
func (scs *Service) processMemberRemove(change *model.MembershipChangeMsg, rc *model.RemoteCluster, maxChangeTime int64) error {
// Get channel so we can use app layer methods properly
channel, err := scs.server.GetStore().Channel().Get(change.ChannelId, true)
if err != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceWarn, "Cannot find channel for member removal",
mlog.String("channel_id", change.ChannelId),
mlog.String("user_id", change.UserId),
mlog.Err(err),
)
// Continue anyway to update sync status - the channel might be deleted
}
// Use the app layer's remove user method if channel still exists
if channel != nil {
rctx := request.EmptyContext(scs.server.Log())
// We use empty string for removerUserId to indicate system-initiated removal
// This also ensures we bypass permission checks intended for user-initiated removals
appErr := scs.app.RemoveUserFromChannel(rctx, change.UserId, "", channel)
if appErr != nil {
// Ignore "not found" errors - the user might already be removed
if !strings.Contains(appErr.Error(), "store.sql_channel.remove_member.missing.app_error") {
scs.server.Log().Log(mlog.LvlSharedChannelServiceWarn, "Error removing user from channel",
mlog.String("channel_id", change.ChannelId),
mlog.String("user_id", change.UserId),
mlog.Err(appErr),
)
// Continue anyway to update sync status - don't return error here
// to ensure sync status still gets updated
}
}
}
// Update the sync status
if syncErr := scs.server.GetStore().SharedChannel().UpdateUserLastMembershipSyncAt(change.UserId, change.ChannelId, rc.RemoteId, maxChangeTime); syncErr != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Failed to update user LastMembershipSyncAt after batch member remove",
mlog.String("user_id", change.UserId),
mlog.String("channel_id", change.ChannelId),
mlog.String("remote_id", rc.RemoteId),
mlog.Err(syncErr),
)
// Continue despite the error - this is not critical
}
return nil
}

View file

@ -513,6 +513,26 @@ func (_m *MockAppIface) Publish(message *model.WebSocketEvent) {
_m.Called(message)
}
// RemoveUserFromChannel provides a mock function with given fields: c, userID, removerUserId, channel
func (_m *MockAppIface) RemoveUserFromChannel(c request.CTX, userID string, removerUserId string, channel *model.Channel) *model.AppError {
ret := _m.Called(c, userID, removerUserId, channel)
if len(ret) == 0 {
panic("no return value specified for RemoveUserFromChannel")
}
var r0 *model.AppError
if rf, ok := ret.Get(0).(func(request.CTX, string, string, *model.Channel) *model.AppError); ok {
r0 = rf(c, userID, removerUserId, channel)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*model.AppError)
}
}
return r0
}
// SaveAndBroadcastStatus provides a mock function with given fields: status
func (_m *MockAppIface) SaveAndBroadcastStatus(status *model.Status) {
_m.Called(status)

View file

@ -24,6 +24,7 @@ const (
TopicSync = "sharedchannel_sync"
TopicChannelInvite = "sharedchannel_invite"
TopicUploadCreate = "sharedchannel_upload"
TopicChannelMembership = "sharedchannel_membership"
TopicGlobalUserSync = "sharedchannel_global_user_sync"
MaxRetries = 3
MaxUsersPerSync = 25
@ -31,6 +32,7 @@ const (
NotifyMinimumDelay = time.Second * 2
MaxUpsertRetries = 25
ProfileImageSyncTimeout = time.Second * 5
// Default value for MaxMembersPerBatch is defined in config.go as ConnectedWorkspacesSettingsDefaultMemberSyncBatchSize
)
// Mocks can be re-generated with `make sharedchannel-mocks`.
@ -58,6 +60,7 @@ type AppIface interface {
UserCanSeeOtherUser(c request.CTX, userID string, otherUserId string) (bool, *model.AppError)
AddUserToChannel(c request.CTX, user *model.User, channel *model.Channel, skipTeamMemberIntegrityCheck bool) (*model.ChannelMember, *model.AppError)
AddUserToTeamByTeamId(c request.CTX, teamId string, user *model.User) *model.AppError
RemoveUserFromChannel(c request.CTX, userID string, removerUserId string, channel *model.Channel) *model.AppError
PermanentDeleteChannel(c request.CTX, channel *model.Channel) *model.AppError
CreatePost(c request.CTX, post *model.Post, channel *model.Channel, flags model.CreatePostFlags) (savedPost *model.Post, err *model.AppError)
UpdatePost(c request.CTX, post *model.Post, updatePostOptions *model.UpdatePostOptions) (*model.Post, *model.AppError)
@ -90,9 +93,10 @@ type Service struct {
changeSignal chan struct{}
// everything below guarded by `mux`
mux sync.RWMutex
active bool
leaderListenerId string
mux sync.RWMutex
active bool
leaderListenerId string
connectionStateListenerId string
done chan struct{}
tasks map[string]syncTask
@ -136,6 +140,8 @@ func (scs *Service) Start() error {
scs.connectionStateListenerId = rcs.AddConnectionStateListener(scs.onConnectionStateChange)
scs.mux.Unlock()
rcs.AddTopicListener(TopicChannelMembership, scs.onReceiveSyncMessage)
scs.onClusterLeaderChange()
return nil
@ -220,6 +226,14 @@ func (scs *Service) pause() {
scs.server.Log().Debug("Shared Channel Service inactive")
}
// GetMemberSyncBatchSize returns the configured batch size for member synchronization
func (scs *Service) GetMemberSyncBatchSize() int {
if scs.server.Config().ConnectedWorkspacesSettings.MemberSyncBatchSize != nil {
return *scs.server.Config().ConnectedWorkspacesSettings.MemberSyncBatchSize
}
return model.ConnectedWorkspacesSettingsDefaultMemberSyncBatchSize
}
// Makes the remote channel to be read-only(announcement mode, only admins can create posts and reactions).
func (scs *Service) makeChannelReadOnly(channel *model.Channel) *model.AppError {
createPostPermission := model.ChannelModeratedPermissionsMap[model.PermissionCreatePost.Id]
@ -314,17 +328,19 @@ func (scs *Service) scheduleGlobalUserSync(rc *model.RemoteCluster) {
}()
}
// OnReceiveSyncMessageForTesting exposes onReceiveSyncMessage for testing
func (scs *Service) OnReceiveSyncMessageForTesting(msg model.RemoteClusterMsg, rc *model.RemoteCluster, response *remotecluster.Response) error {
return scs.onReceiveSyncMessage(msg, rc, response)
}
// GetUserSyncBatchSizeForTesting returns the configured batch size for user syncing (exported for testing)
func (scs *Service) GetUserSyncBatchSizeForTesting() int {
return scs.getGlobalUserSyncBatchSize()
// HasPendingTasksForTesting returns true if there are pending sync tasks in the queue
func (scs *Service) HasPendingTasksForTesting() bool {
scs.mux.RLock()
defer scs.mux.RUnlock()
return len(scs.tasks) > 0
}
// HandleSyncAllUsersForTesting exposes syncAllUsers for testing
func (scs *Service) HandleSyncAllUsersForTesting(rc *model.RemoteCluster) error {
return scs.syncAllUsers(rc)
}
// OnReceiveSyncMessageForTesting exposes onReceiveSyncMessage for testing
func (scs *Service) OnReceiveSyncMessageForTesting(msg model.RemoteClusterMsg, rc *model.RemoteCluster, response *remotecluster.Response) error {
return scs.onReceiveSyncMessage(msg, rc, response)
}

View file

@ -277,3 +277,41 @@ func (scs *Service) CheckCanInviteToSharedChannel(channelId string) error {
}
return nil
}
// updateMembershipSyncCursor updates the LastMembersSyncAt value for the shared channel remote
// This provides centralized and consistent cursor management
func (scs *Service) updateMembershipSyncCursor(channelID string, remoteID string, newTimestamp int64) error {
// Get the remote record
scr, err := scs.server.GetStore().SharedChannel().GetRemoteByIds(channelID, remoteID)
if err != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Failed to get shared channel remote for cursor update",
mlog.String("channel_id", channelID),
mlog.String("remote_id", remoteID),
mlog.Int("timestamp", int(newTimestamp)),
mlog.Err(err),
)
return fmt.Errorf("failed to get shared channel remote for cursor update: %w", err)
}
if scr == nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Shared channel remote not found for cursor update",
mlog.String("channel_id", channelID),
mlog.String("remote_id", remoteID),
)
return fmt.Errorf("shared channel remote not found for channel %s and remote %s", channelID, remoteID)
}
// Update the cursor - the store will handle ensuring it only moves forward
err = scs.server.GetStore().SharedChannel().UpdateRemoteMembershipCursor(scr.Id, newTimestamp)
if err != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Failed to update membership cursor",
mlog.String("channel_id", channelID),
mlog.String("remote_id", remoteID),
mlog.String("remote_record_id", scr.Id),
mlog.Int("timestamp", int(newTimestamp)),
mlog.Err(err),
)
}
return err
}

View file

@ -24,10 +24,9 @@ var (
)
func (scs *Service) onReceiveSyncMessage(msg model.RemoteClusterMsg, rc *model.RemoteCluster, response *remotecluster.Response) error {
if msg.Topic != TopicSync && msg.Topic != TopicGlobalUserSync {
return fmt.Errorf("wrong topic, expected `%s` or `%s`, got `%s`", TopicSync, TopicGlobalUserSync, msg.Topic)
if msg.Topic != TopicSync && msg.Topic != TopicChannelMembership && msg.Topic != TopicGlobalUserSync {
return fmt.Errorf("wrong topic, expected sync-related topic, got `%s`", msg.Topic)
}
if len(msg.Payload) == 0 {
return errors.New("empty sync message")
}
@ -89,6 +88,15 @@ func (scs *Service) processSyncMessage(c request.CTX, syncMsg *model.SyncMsg, rc
ReactionErrors: make([]string, 0),
}
// Check if feature flag is enabled for membership changes
membershipSyncEnabled := scs.server.Config().FeatureFlags.EnableSharedChannelsMemberSync
hasMembershipChanges := len(syncMsg.MembershipChanges) > 0
// If this message only contains membership changes and feature is disabled, skip it
if hasMembershipChanges && !membershipSyncEnabled && len(syncMsg.Users) == 0 && len(syncMsg.Posts) == 0 && len(syncMsg.Reactions) == 0 {
return nil
}
scs.server.Log().Log(mlog.LvlSharedChannelServiceDebug, "Sync msg received",
mlog.String("remote", rc.Name),
mlog.String("channel_id", syncMsg.ChannelId),
@ -96,6 +104,7 @@ func (scs *Service) processSyncMessage(c request.CTX, syncMsg *model.SyncMsg, rc
mlog.Int("post_count", len(syncMsg.Posts)),
mlog.Int("reaction_count", len(syncMsg.Reactions)),
mlog.Int("status_count", len(syncMsg.Statuses)),
mlog.Int("membership_change_count", len(syncMsg.MembershipChanges)),
)
// Check if this is a global user sync message (no channel ID and only users)
@ -229,6 +238,19 @@ func (scs *Service) processSyncMessage(c request.CTX, syncMsg *model.SyncMsg, rc
scs.app.SaveAndBroadcastStatus(status)
}
// Process membership changes after users have been synced
if hasMembershipChanges && membershipSyncEnabled {
if err := scs.onReceiveMembershipChanges(syncMsg, rc, response); err != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Error processing membership changes",
mlog.String("remote", rc.Name),
mlog.String("channel_id", syncMsg.ChannelId),
mlog.Int("change_count", len(syncMsg.MembershipChanges)),
mlog.Err(err),
)
// Don't fail the entire sync if membership changes fail
}
}
response.SetPayload(syncResp)
return nil

View file

@ -37,8 +37,17 @@ func newSyncTask(channelID, userID string, remoteID string, existingMsg, retryMs
retryID = retryMsg.Id
}
// Generate a unique task ID
taskID := channelID + userID + remoteID + retryID // combination of ids to avoid duplicates
// For batch tasks, add a batch identifier to make the ID unique
if existingMsg != nil && len(existingMsg.MembershipChanges) > 1 {
batchID := model.NewId()[:8] // Use a short unique ID for the batch
taskID = channelID + "batch" + batchID + remoteID + retryID
}
return syncTask{
id: channelID + userID + remoteID + retryID, // combination of ids to avoid duplicates
id: taskID,
channelID: channelID,
userID: userID,
remoteID: remoteID, // empty means update all remote clusters
@ -235,6 +244,7 @@ func (scs *Service) ForceSyncForRemote(rc *model.RemoteCluster) {
// addTask adds or re-adds a task to the queue.
func (scs *Service) addTask(task syncTask) {
task.AddedAt = time.Now()
scs.mux.Lock()
if originalTask, ok := scs.tasks[task.id]; ok {
// if the task was already scheduled, we only update the
@ -365,6 +375,16 @@ func (scs *Service) removeOldestTask() (syncTask, bool, time.Duration) {
// processTask updates one or more remote clusters with any new channel content.
func (scs *Service) processTask(task syncTask) error {
// Check if this is a membership change task
if task.existingMsg != nil && len(task.existingMsg.MembershipChanges) > 0 {
// Check if feature flag is enabled
if !scs.server.Config().FeatureFlags.EnableSharedChannelsMemberSync {
return nil
}
scs.processMembershipChange(task.existingMsg)
return nil
}
// map is used to ensure remotes don't get sync'd twice, such as when
// they have the autoinvited flag and have explicitly subscribed to a channel.
remotesMap := make(map[string]*model.RemoteCluster)

View file

@ -119,6 +119,7 @@ func (scs *Service) syncForRemote(task syncTask, rc *model.RemoteCluster) error
RemoteId: rc.RemoteId,
LastPostCreateAt: model.GetMillis(),
LastPostUpdateAt: model.GetMillis(),
LastMembersSyncAt: 0,
}
if scr, err = scs.server.GetStore().SharedChannel().SaveRemote(scr); err != nil {
return fmt.Errorf("cannot auto-create shared channel remote (channel_id=%s, remote_id=%s): %w", task.channelID, rc.RemoteId, err)

View file

@ -507,3 +507,15 @@ type GroupMessageConversionRequestBody struct {
Name string `json:"name"`
DisplayName string `json:"display_name"`
}
// ChannelMembersGetOptions provides parameters for getting channel members
type ChannelMembersGetOptions struct {
// ChannelID specifies which channel to get members for
ChannelID string
// Offset for pagination
Offset int
// Limit for pagination (maximum number of results to return)
Limit int
// UpdatedAfter filters members updated after the given timestamp (cursor-based pagination)
UpdatedAfter int64
}

View file

@ -49,7 +49,8 @@ const (
PasswordMaximumLength = 72
PasswordMinimumLength = 5
ServiceGitlab = "gitlab"
ServiceGitlab = "gitlab"
ServiceGoogle = "google"
ServiceOffice365 = "office365"
ServiceOpenid = "openid"
@ -285,7 +286,8 @@ const (
LocalModeSocketPath = "/var/tmp/mattermost_local.socket"
ConnectedWorkspacesSettingsDefaultMaxPostsPerSync = 50 // a bit more than 4 typical screenfulls of posts
ConnectedWorkspacesSettingsDefaultMaxPostsPerSync = 50 // a bit more than 4 typical screenfulls of posts
ConnectedWorkspacesSettingsDefaultMemberSyncBatchSize = 20 // optimal batch size for syncing channel members
// These storage classes are the valid values for the x-amz-storage-class header. More documentation here https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html#AmazonS3-PutObject-request-header-StorageClass
StorageClassStandard = "STANDARD"
@ -3462,6 +3464,7 @@ type ConnectedWorkspacesSettings struct {
SyncUsersOnConnectionOpen *bool
GlobalUserSyncBatchSize *int
MaxPostsPerSync *int
MemberSyncBatchSize *int // Maximum number of members to process in a single batch during shared channel synchronization
}
func (c *ConnectedWorkspacesSettings) SetDefaults(isUpdate bool, e ExperimentalSettings) {
@ -3496,6 +3499,10 @@ func (c *ConnectedWorkspacesSettings) SetDefaults(isUpdate bool, e ExperimentalS
if c.MaxPostsPerSync == nil {
c.MaxPostsPerSync = NewPointer(ConnectedWorkspacesSettingsDefaultMaxPostsPerSync)
}
if c.MemberSyncBatchSize == nil {
c.MemberSyncBatchSize = NewPointer(ConnectedWorkspacesSettingsDefaultMemberSyncBatchSize)
}
}
type GlobalRelayMessageExportSettings struct {

View file

@ -25,6 +25,9 @@ type FeatureFlags struct {
// Enable plugins in shared channels.
EnableSharedChannelsPlugins bool
// Enable synchronization of channel members in shared channels
EnableSharedChannelsMemberSync bool
// Enable syncing all users for remote clusters in shared channels
EnableSyncAllUsersForRemoteCluster bool
@ -72,6 +75,7 @@ func (f *FeatureFlags) SetDefaults() {
f.TestBoolFeature = false
f.EnableRemoteClusterService = false
f.EnableSharedChannelsDMs = false
f.EnableSharedChannelsMemberSync = false
f.EnableSyncAllUsersForRemoteCluster = false
f.EnableSharedChannelsPlugins = true
f.AppsEnabled = false

View file

@ -118,6 +118,7 @@ type SharedChannelRemote struct {
LastPostUpdateID string `json:"last_post_id"`
LastPostCreateAt int64 `json:"last_post_create_at"`
LastPostCreateID string `json:"last_post_create_id"`
LastMembersSyncAt int64 `json:"last_members_sync_at"`
}
func (sc *SharedChannelRemote) IsValid() *AppError {
@ -169,12 +170,13 @@ type SharedChannelRemoteStatus struct {
// SharedChannelUser stores a lastSyncAt timestamp on behalf of a remote cluster for
// each user that has been synchronized.
type SharedChannelUser struct {
Id string `json:"id"`
UserId string `json:"user_id"`
ChannelId string `json:"channel_id"`
RemoteId string `json:"remote_id"`
CreateAt int64 `json:"create_at"`
LastSyncAt int64 `json:"last_sync_at"`
Id string `json:"id"`
UserId string `json:"user_id"`
ChannelId string `json:"channel_id"`
RemoteId string `json:"remote_id"`
CreateAt int64 `json:"create_at"`
LastSyncAt int64 `json:"last_sync_at"`
LastMembershipSyncAt int64 `json:"last_membership_sync_at"`
}
func (scu *SharedChannelUser) PreSave() {
@ -270,15 +272,25 @@ type SharedChannelRemoteFilterOpts struct {
IncludeDeleted bool
}
// MembershipChangeMsg represents a change in channel membership
type MembershipChangeMsg struct {
ChannelId string `json:"channel_id"`
UserId string `json:"user_id"`
IsAdd bool `json:"is_add"`
RemoteId string `json:"remote_id"`
ChangeTime int64 `json:"change_time"`
}
// SyncMsg represents a change in content (post add/edit/delete, reaction add/remove, users).
// It is sent to remote clusters as the payload of a `RemoteClusterMsg`.
type SyncMsg struct {
Id string `json:"id"`
ChannelId string `json:"channel_id"`
Users map[string]*User `json:"users,omitempty"`
Posts []*Post `json:"posts,omitempty"`
Reactions []*Reaction `json:"reactions,omitempty"`
Statuses []*Status `json:"statuses,omitempty"`
Id string `json:"id"`
ChannelId string `json:"channel_id"`
Users map[string]*User `json:"users,omitempty"`
Posts []*Post `json:"posts,omitempty"`
Reactions []*Reaction `json:"reactions,omitempty"`
Statuses []*Status `json:"statuses,omitempty"`
MembershipChanges []*MembershipChangeMsg `json:"membership_changes,omitempty"`
}
func NewSyncMsg(channelID string) *SyncMsg {