From afbd9d64c3386119df0b22f05150a4cd192f86fe Mon Sep 17 00:00:00 2001 From: Agniva De Sarker Date: Tue, 28 Jan 2025 19:44:55 +0530 Subject: [PATCH] [AI assisted]: MM-62295: Search and index archived channels as well. (#29796) We add a new field delete_at in the channels template. This field is then searched in the SearchChannels function. Also added tests to verify that archived channels are searched properly, and also indexed correctly. https://mattermost.atlassian.net/browse/MM-62295 ```release-note - Now archived channels are searchable with ES/OS if TeamSettings.ExperimentalViewArchivedChannels is enabled. - If there are old channels which were archived before a bulk index was run, users would need to purge indexes, and do bulk index again. Because those old archived channels are removed from the index when a bulk index is run. ``` Co-authored-by: Mattermost Build --- .../store/searchlayer/channel_layer.go | 4 +- .../enterprise/elasticsearch/common/common.go | 2 + .../elasticsearch/common/indexing_job.go | 65 +++++++++---------- .../elasticsearch/common/indexing_job_test.go | 64 ++++++++++++++++++ .../elasticsearch/common/templates.go | 3 + .../elasticsearch/common/test_suite.go | 38 ++++++++--- .../elasticsearch/elasticsearch.go | 10 ++- .../elasticsearch/opensearch/opensearch.go | 10 ++- .../searchengine/bleveengine/search.go | 2 +- .../services/searchengine/interface.go | 2 +- .../mocks/SearchEngineInterface.go | 18 ++--- 11 files changed, 158 insertions(+), 60 deletions(-) create mode 100644 server/enterprise/elasticsearch/common/indexing_job_test.go diff --git a/server/channels/store/searchlayer/channel_layer.go b/server/channels/store/searchlayer/channel_layer.go index 71da71c1108..b56541e345f 100644 --- a/server/channels/store/searchlayer/channel_layer.go +++ b/server/channels/store/searchlayer/channel_layer.go @@ -229,7 +229,7 @@ func (c *SearchChannelStore) AutocompleteInTeam(rctx request.CTX, teamID, userID } func (c *SearchChannelStore) searchAutocompleteChannels(engine searchengine.SearchEngineInterface, teamId, userID, term string, includeDeleted, isGuest bool) (model.ChannelList, error) { - channelIds, err := engine.SearchChannels(teamId, userID, term, isGuest) + channelIds, err := engine.SearchChannels(teamId, userID, term, isGuest, includeDeleted) if err != nil { return nil, err } @@ -247,7 +247,7 @@ func (c *SearchChannelStore) searchAutocompleteChannels(engine searchengine.Sear } func (c *SearchChannelStore) searchAutocompleteChannelsAllTeams(engine searchengine.SearchEngineInterface, userID, term string, includeDeleted, isGuest bool) (model.ChannelListWithTeamData, error) { - channelIds, err := engine.SearchChannels("", userID, term, isGuest) + channelIds, err := engine.SearchChannels("", userID, term, isGuest, includeDeleted) if err != nil { return nil, err } diff --git a/server/enterprise/elasticsearch/common/common.go b/server/enterprise/elasticsearch/common/common.go index 77e9f0b400e..0e0f8fd1896 100644 --- a/server/enterprise/elasticsearch/common/common.go +++ b/server/enterprise/elasticsearch/common/common.go @@ -68,6 +68,7 @@ type ESFile struct { type ESChannel struct { Id string `json:"id"` Type model.ChannelType `json:"type"` + DeleteAt int64 `json:"delete_at"` UserIDs []string `json:"user_ids"` TeamId string `json:"team_id"` TeamMemberIDs []string `json:"team_member_ids"` @@ -206,6 +207,7 @@ func ESChannelFromChannel(channel *model.Channel, userIDs, teamMemberIDs []strin return &ESChannel{ Id: channel.Id, Type: channel.Type, + DeleteAt: channel.DeleteAt, UserIDs: userIDs, TeamId: channel.TeamId, TeamMemberIDs: teamMemberIDs, diff --git a/server/enterprise/elasticsearch/common/indexing_job.go b/server/enterprise/elasticsearch/common/indexing_job.go index 0844d9d170b..851c1c8779e 100644 --- a/server/enterprise/elasticsearch/common/indexing_job.go +++ b/server/enterprise/elasticsearch/common/indexing_job.go @@ -408,18 +408,18 @@ func (worker *IndexerWorker) BulkIndexPosts(posts []*model.PostForIndexing, prog data, err := json.Marshal(searchPost) if err != nil { - worker.logger.Warn("Failed to marshal JSON, skipping this post.", mlog.String("post_id", post.Id)) + worker.logger.Warn("Failed to marshal JSON, skipping this post.", mlog.String("post_id", post.Id), mlog.Err(err)) continue } err = worker.addItemToBulkProcessor(indexName, indexOp, searchPost.Id, bytes.NewReader(data)) if err != nil { - worker.logger.Warn("Failed to add item to bulk processor", mlog.String("indexName", indexName)) + worker.logger.Warn("Failed to add item to bulk processor", mlog.String("indexName", indexName), mlog.Err(err)) } } else { err := worker.addItemToBulkProcessor(indexName, deleteOp, post.Id, nil) if err != nil { - worker.logger.Warn("Failed to add item to bulk processor", mlog.String("indexName", indexName)) + worker.logger.Warn("Failed to add item to bulk processor", mlog.String("indexName", indexName), mlog.Err(err)) } } } @@ -484,18 +484,18 @@ func (worker *IndexerWorker) BulkIndexFiles(files []*model.FileForIndexing, prog data, err := json.Marshal(searchFile) if err != nil { - worker.logger.Warn("Failed to marshal JSON") + worker.logger.Warn("Failed to marshal JSON", mlog.Err(err)) continue } err = worker.addItemToBulkProcessor(indexName, indexOp, searchFile.Id, bytes.NewReader(data)) if err != nil { - worker.logger.Warn("Failed to add item to bulk processor", mlog.String("indexName", indexName)) + worker.logger.Warn("Failed to add item to bulk processor", mlog.String("indexName", indexName), mlog.Err(err)) } } else { err := worker.addItemToBulkProcessor(indexName, deleteOp, file.Id, nil) if err != nil { - worker.logger.Warn("Failed to add item to bulk processor", mlog.String("indexName", indexName)) + worker.logger.Warn("Failed to add item to bulk processor", mlog.String("indexName", indexName), mlog.Err(err)) } } } @@ -556,42 +556,35 @@ func BulkIndexChannels(config *model.Config, logger mlog.LoggerIFace, addItemToBulkProcessorFn func(indexName string, indexOp string, docID string, body io.ReadSeeker) error, channels []*model.Channel, - progress IndexingProgress) (*model.Channel, *model.AppError) { + _ IndexingProgress) (*model.Channel, *model.AppError) { for _, channel := range channels { indexName := *config.ElasticsearchSettings.IndexPrefix + IndexBaseChannels - if channel.DeleteAt == 0 { - var userIDs []string - var err error - if channel.Type == model.ChannelTypePrivate { - userIDs, err = store.Channel().GetAllChannelMemberIdsByChannelId(channel.Id) - if err != nil { - return nil, model.NewAppError("IndexerWorker.BulkIndexChannels", "ent.elasticsearch.getAllChannelMembers.error", nil, "", http.StatusInternalServerError).Wrap(err) - } - } - - teamMemberIDs, err := store.Channel().GetTeamMembersForChannel(channel.Id) + var userIDs []string + var err error + if channel.Type == model.ChannelTypePrivate { + userIDs, err = store.Channel().GetAllChannelMemberIdsByChannelId(channel.Id) if err != nil { - return nil, model.NewAppError("IndexerWorker.BulkIndexChannels", "ent.elasticsearch.getAllTeamMembers.error", nil, "", http.StatusInternalServerError).Wrap(err) + return nil, model.NewAppError("IndexerWorker.BulkIndexChannels", "ent.elasticsearch.getAllChannelMembers.error", nil, "", http.StatusInternalServerError).Wrap(err) } + } - searchChannel := ESChannelFromChannel(channel, userIDs, teamMemberIDs) + teamMemberIDs, err := store.Channel().GetTeamMembersForChannel(channel.Id) + if err != nil { + return nil, model.NewAppError("IndexerWorker.BulkIndexChannels", "ent.elasticsearch.getAllTeamMembers.error", nil, "", http.StatusInternalServerError).Wrap(err) + } - data, err := json.Marshal(searchChannel) - if err != nil { - logger.Warn("Failed to marshal JSON") - continue - } + searchChannel := ESChannelFromChannel(channel, userIDs, teamMemberIDs) - err = addItemToBulkProcessorFn(indexName, indexOp, searchChannel.Id, bytes.NewReader(data)) - if err != nil { - logger.Warn("Failed to add item to bulk processor", mlog.String("indexName", indexName)) - } - } else { - err := addItemToBulkProcessorFn(indexName, deleteOp, channel.Id, nil) - if err != nil { - logger.Warn("Failed to add item to bulk processor", mlog.String("indexName", indexName)) - } + data, err := json.Marshal(searchChannel) + if err != nil { + logger.Warn("Failed to marshal JSON", mlog.Err(err)) + continue + } + + err = addItemToBulkProcessorFn(indexName, indexOp, searchChannel.Id, bytes.NewReader(data)) + if err != nil { + logger.Warn("Failed to add item to bulk processor", mlog.String("indexName", indexName), mlog.Err(err)) } } @@ -653,13 +646,13 @@ func (worker *IndexerWorker) BulkIndexUsers(users []*model.UserForIndexing, prog data, err := json.Marshal(searchUser) if err != nil { - worker.logger.Warn("Failed to marshal JSON") + worker.logger.Warn("Failed to marshal JSON", mlog.Err(err)) continue } err = worker.addItemToBulkProcessor(indexName, indexOp, searchUser.Id, bytes.NewReader(data)) if err != nil { - worker.logger.Warn("Failed to add item to bulk processor", mlog.String("indexName", indexName)) + worker.logger.Warn("Failed to add item to bulk processor", mlog.String("indexName", indexName), mlog.Err(err)) } } diff --git a/server/enterprise/elasticsearch/common/indexing_job_test.go b/server/enterprise/elasticsearch/common/indexing_job_test.go new file mode 100644 index 00000000000..24c302f1914 --- /dev/null +++ b/server/enterprise/elasticsearch/common/indexing_job_test.go @@ -0,0 +1,64 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.enterprise for license information. + +package common + +import ( + "io" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/mattermost/mattermost/server/public/model" + "github.com/mattermost/mattermost/server/public/shared/mlog" + "github.com/mattermost/mattermost/server/v8/channels/store/storetest/mocks" +) + +func TestBulkIndexChannelsWithDeletedChannels(t *testing.T) { + // Create test channels - one active, one deleted + activeChannel := &model.Channel{ + Id: "ch1", + Type: model.ChannelTypeOpen, + DeleteAt: 0, + } + deletedChannel := &model.Channel{ + Id: "ch2", + Type: model.ChannelTypeOpen, + DeleteAt: 123456, + } + channels := []*model.Channel{activeChannel, deletedChannel} + + // Mock store + mockStore := &mocks.Store{} + mockChannelStore := &mocks.ChannelStore{} + mockStore.On("Channel").Return(mockChannelStore) + defer mockStore.AssertExpectations(t) + + // Since these are open channels, GetAllChannelMemberIdsByChannelId won't be called + // But GetTeamMembersForChannel will be called for both channels + mockChannelStore.On("GetTeamMembersForChannel", "ch1").Return([]string{"team1"}, nil) + mockChannelStore.On("GetTeamMembersForChannel", "ch2").Return([]string{"team1"}, nil) + + // Track which channels were actually indexed + indexedChannels := make(map[string]bool) + + // Mock bulk processor function + addItemToBulkProcessorFn := func(_, op, id string, _ io.ReadSeeker) error { + assert.Equal(t, indexOp, op) // Should always be index, not delete + indexedChannels[id] = true + return nil + } + + config := &model.Config{} + config.ElasticsearchSettings.IndexPrefix = model.NewPointer("test_") + + // Call the function + lastChannel, appErr := BulkIndexChannels(config, mockStore, mlog.CreateConsoleTestLogger(t), addItemToBulkProcessorFn, channels, IndexingProgress{}) + + // Verify results + require.Nil(t, appErr) + assert.Equal(t, deletedChannel, lastChannel) + assert.True(t, indexedChannels["ch1"], "Active channel should be indexed") + assert.True(t, indexedChannels["ch2"], "Deleted channel should also be indexed") +} diff --git a/server/enterprise/elasticsearch/common/templates.go b/server/enterprise/elasticsearch/common/templates.go index f0612603b20..5de367e1d81 100644 --- a/server/enterprise/elasticsearch/common/templates.go +++ b/server/enterprise/elasticsearch/common/templates.go @@ -181,6 +181,9 @@ func GetChannelTemplate(cfg *model.Config) *putindextemplate.Request { "type": types.KeywordProperty{ Type: "keyword", }, + "delete_at": types.LongNumberProperty{ + Type: "long", + }, }, } diff --git a/server/enterprise/elasticsearch/common/test_suite.go b/server/enterprise/elasticsearch/common/test_suite.go index 539c8cb232f..e29b51dee0f 100644 --- a/server/enterprise/elasticsearch/common/test_suite.go +++ b/server/enterprise/elasticsearch/common/test_suite.go @@ -770,20 +770,40 @@ func (c *CommonTestSuite) TestSearchChannels() { c.NoError(c.RefreshIndexFn()) - // Private channels should be returned for right user. - ids, appErr := c.ESImpl.SearchChannels("", c.TH.BasicUser.Id, "Channel", false) + for _, includeDeleted := range []bool{true, false} { + // Private channels should be returned for right user. + ids, appErr := c.ESImpl.SearchChannels("", c.TH.BasicUser.Id, "Channel", false, includeDeleted) + c.Nil(appErr) + c.Len(ids, 2) + + // No private channels if user is guest + ids, appErr = c.ESImpl.SearchChannels("", c.TH.BasicUser.Id, "Channel", true, includeDeleted) + c.Nil(appErr) + c.Len(ids, 1) + c.Equal(channel.Id, ids[0]) + + // No Private channels should be returned for wrong user. + ids, appErr = c.ESImpl.SearchChannels("", "otheruser", "Channel", false, includeDeleted) + c.Nil(appErr) + c.Len(ids, 1) + c.Equal(channel.Id, ids[0]) + } + + // Adding a deleted channel + channelDel := createChannel(c.TH.BasicTeam.Id, "channelD", "Channel Open- Deleted", model.ChannelTypeOpen) + channelDel.DeleteAt = 123 + c.Nil(c.ESImpl.IndexChannel(c.TH.Context, channelDel, []string{}, []string{c.TH.BasicUser.Id, "otheruser"})) + c.NoError(c.RefreshIndexFn()) + + ids, appErr := c.ESImpl.SearchChannels("", c.TH.BasicUser.Id, "Channel", false, false) c.Nil(appErr) c.Len(ids, 2) - // No private channels if user is guest - ids, appErr = c.ESImpl.SearchChannels("", c.TH.BasicUser.Id, "Channel", true) + ids, appErr = c.ESImpl.SearchChannels("", c.TH.BasicUser.Id, "Channel", false, true) c.Nil(appErr) - c.Len(ids, 1) - c.Equal(channel.Id, ids[0]) + c.Len(ids, 3) - // No Private channels should be returned for wrong user. - ids, appErr = c.ESImpl.SearchChannels("", "otheruser", "Channel", false) + ids, appErr = c.ESImpl.SearchChannels("", c.TH.BasicUser.Id, "Deleted", false, true) c.Nil(appErr) c.Len(ids, 1) - c.Equal(channel.Id, ids[0]) } diff --git a/server/enterprise/elasticsearch/elasticsearch/elasticsearch.go b/server/enterprise/elasticsearch/elasticsearch/elasticsearch.go index 7bb5b11f532..b6e6506fe96 100644 --- a/server/enterprise/elasticsearch/elasticsearch/elasticsearch.go +++ b/server/enterprise/elasticsearch/elasticsearch/elasticsearch.go @@ -778,7 +778,7 @@ func (es *ElasticsearchInterfaceImpl) IndexChannel(rctx request.CTX, channel *mo return nil } -func (es *ElasticsearchInterfaceImpl) SearchChannels(teamId, userID string, term string, isGuest bool) ([]string, *model.AppError) { +func (es *ElasticsearchInterfaceImpl) SearchChannels(teamId, userID string, term string, isGuest, includeDeleted bool) ([]string, *model.AppError) { es.mutex.RLock() defer es.mutex.RUnlock() @@ -843,6 +843,14 @@ func (es *ElasticsearchInterfaceImpl) SearchChannels(teamId, userID string, term }) } + if !includeDeleted { + query.Filter = append(query.Filter, types.Query{ + Term: map[string]types.TermQuery{ + "delete_at": {Value: 0}, + }, + }) + } + search := es.client.Search(). Index(*es.Platform.Config().ElasticsearchSettings.IndexPrefix + common.IndexBaseChannels). Request(&search.Request{ diff --git a/server/enterprise/elasticsearch/opensearch/opensearch.go b/server/enterprise/elasticsearch/opensearch/opensearch.go index 8f56342b25c..329f447faed 100644 --- a/server/enterprise/elasticsearch/opensearch/opensearch.go +++ b/server/enterprise/elasticsearch/opensearch/opensearch.go @@ -860,7 +860,7 @@ func (os *OpensearchInterfaceImpl) IndexChannel(rctx request.CTX, channel *model return nil } -func (os *OpensearchInterfaceImpl) SearchChannels(teamId, userID string, term string, isGuest bool) ([]string, *model.AppError) { +func (os *OpensearchInterfaceImpl) SearchChannels(teamId, userID string, term string, isGuest, includeDeleted bool) ([]string, *model.AppError) { os.mutex.RLock() defer os.mutex.RUnlock() @@ -925,6 +925,14 @@ func (os *OpensearchInterfaceImpl) SearchChannels(teamId, userID string, term st }) } + if !includeDeleted { + query.Filter = append(query.Filter, types.Query{ + Term: map[string]types.TermQuery{ + "delete_at": {Value: 0}, + }, + }) + } + buf, err := json.Marshal(search.Request{ Query: &types.Query{Bool: query}, }) diff --git a/server/platform/services/searchengine/bleveengine/search.go b/server/platform/services/searchengine/bleveengine/search.go index a93e9ee577c..aaa6b012711 100644 --- a/server/platform/services/searchengine/bleveengine/search.go +++ b/server/platform/services/searchengine/bleveengine/search.go @@ -315,7 +315,7 @@ func (b *BleveEngine) IndexChannel(_ request.CTX, channel *model.Channel, userID return nil } -func (b *BleveEngine) SearchChannels(teamId, userID, term string, isGuest bool) ([]string, *model.AppError) { +func (b *BleveEngine) SearchChannels(teamId, userID, term string, isGuest, _ bool) ([]string, *model.AppError) { // This query essentially boils down to (if teamID is passed): // match teamID == <> // AND diff --git a/server/platform/services/searchengine/interface.go b/server/platform/services/searchengine/interface.go index 1c03b42862d..8d8ea1d5944 100644 --- a/server/platform/services/searchengine/interface.go +++ b/server/platform/services/searchengine/interface.go @@ -33,7 +33,7 @@ type SearchEngineInterface interface { // IndexChannel indexes a given channel. The userIDs are only populated // for private channels. IndexChannel(rctx request.CTX, channel *model.Channel, userIDs, teamMemberIDs []string) *model.AppError - SearchChannels(teamId, userID, term string, isGuest bool) ([]string, *model.AppError) + SearchChannels(teamId, userID, term string, isGuest, includeDeleted bool) ([]string, *model.AppError) DeleteChannel(channel *model.Channel) *model.AppError IndexUser(rctx request.CTX, user *model.User, teamsIds, channelsIds []string) *model.AppError SearchUsersInChannel(teamId, channelId string, restrictedToChannels []string, term string, options *model.UserSearchOptions) ([]string, []string, *model.AppError) diff --git a/server/platform/services/searchengine/mocks/SearchEngineInterface.go b/server/platform/services/searchengine/mocks/SearchEngineInterface.go index 14f0afa5397..9baa6eaece5 100644 --- a/server/platform/services/searchengine/mocks/SearchEngineInterface.go +++ b/server/platform/services/searchengine/mocks/SearchEngineInterface.go @@ -557,9 +557,9 @@ func (_m *SearchEngineInterface) RefreshIndexes(rctx request.CTX) *model.AppErro return r0 } -// SearchChannels provides a mock function with given fields: teamId, userID, term, isGuest -func (_m *SearchEngineInterface) SearchChannels(teamId string, userID string, term string, isGuest bool) ([]string, *model.AppError) { - ret := _m.Called(teamId, userID, term, isGuest) +// SearchChannels provides a mock function with given fields: teamId, userID, term, isGuest, includeDeleted +func (_m *SearchEngineInterface) SearchChannels(teamId string, userID string, term string, isGuest bool, includeDeleted bool) ([]string, *model.AppError) { + ret := _m.Called(teamId, userID, term, isGuest, includeDeleted) if len(ret) == 0 { panic("no return value specified for SearchChannels") @@ -567,19 +567,19 @@ func (_m *SearchEngineInterface) SearchChannels(teamId string, userID string, te var r0 []string var r1 *model.AppError - if rf, ok := ret.Get(0).(func(string, string, string, bool) ([]string, *model.AppError)); ok { - return rf(teamId, userID, term, isGuest) + if rf, ok := ret.Get(0).(func(string, string, string, bool, bool) ([]string, *model.AppError)); ok { + return rf(teamId, userID, term, isGuest, includeDeleted) } - if rf, ok := ret.Get(0).(func(string, string, string, bool) []string); ok { - r0 = rf(teamId, userID, term, isGuest) + if rf, ok := ret.Get(0).(func(string, string, string, bool, bool) []string); ok { + r0 = rf(teamId, userID, term, isGuest, includeDeleted) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]string) } } - if rf, ok := ret.Get(1).(func(string, string, string, bool) *model.AppError); ok { - r1 = rf(teamId, userID, term, isGuest) + if rf, ok := ret.Get(1).(func(string, string, string, bool, bool) *model.AppError); ok { + r1 = rf(teamId, userID, term, isGuest, includeDeleted) } else { if ret.Get(1) != nil { r1 = ret.Get(1).(*model.AppError)