mirror of
https://github.com/mattermost/mattermost.git
synced 2026-05-28 04:35:04 -04:00
[AI assisted]: MM-62295: Search and index archived channels as well. (#29796)
We add a new field delete_at in the channels template. This field is then searched in the SearchChannels function. Also added tests to verify that archived channels are searched properly, and also indexed correctly. https://mattermost.atlassian.net/browse/MM-62295 ```release-note - Now archived channels are searchable with ES/OS if TeamSettings.ExperimentalViewArchivedChannels is enabled. - If there are old channels which were archived before a bulk index was run, users would need to purge indexes, and do bulk index again. Because those old archived channels are removed from the index when a bulk index is run. ``` Co-authored-by: Mattermost Build <build@mattermost.com>
This commit is contained in:
parent
57f13549d7
commit
afbd9d64c3
11 changed files with 158 additions and 60 deletions
|
|
@ -229,7 +229,7 @@ func (c *SearchChannelStore) AutocompleteInTeam(rctx request.CTX, teamID, userID
|
|||
}
|
||||
|
||||
func (c *SearchChannelStore) searchAutocompleteChannels(engine searchengine.SearchEngineInterface, teamId, userID, term string, includeDeleted, isGuest bool) (model.ChannelList, error) {
|
||||
channelIds, err := engine.SearchChannels(teamId, userID, term, isGuest)
|
||||
channelIds, err := engine.SearchChannels(teamId, userID, term, isGuest, includeDeleted)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -247,7 +247,7 @@ func (c *SearchChannelStore) searchAutocompleteChannels(engine searchengine.Sear
|
|||
}
|
||||
|
||||
func (c *SearchChannelStore) searchAutocompleteChannelsAllTeams(engine searchengine.SearchEngineInterface, userID, term string, includeDeleted, isGuest bool) (model.ChannelListWithTeamData, error) {
|
||||
channelIds, err := engine.SearchChannels("", userID, term, isGuest)
|
||||
channelIds, err := engine.SearchChannels("", userID, term, isGuest, includeDeleted)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -68,6 +68,7 @@ type ESFile struct {
|
|||
type ESChannel struct {
|
||||
Id string `json:"id"`
|
||||
Type model.ChannelType `json:"type"`
|
||||
DeleteAt int64 `json:"delete_at"`
|
||||
UserIDs []string `json:"user_ids"`
|
||||
TeamId string `json:"team_id"`
|
||||
TeamMemberIDs []string `json:"team_member_ids"`
|
||||
|
|
@ -206,6 +207,7 @@ func ESChannelFromChannel(channel *model.Channel, userIDs, teamMemberIDs []strin
|
|||
return &ESChannel{
|
||||
Id: channel.Id,
|
||||
Type: channel.Type,
|
||||
DeleteAt: channel.DeleteAt,
|
||||
UserIDs: userIDs,
|
||||
TeamId: channel.TeamId,
|
||||
TeamMemberIDs: teamMemberIDs,
|
||||
|
|
|
|||
|
|
@ -408,18 +408,18 @@ func (worker *IndexerWorker) BulkIndexPosts(posts []*model.PostForIndexing, prog
|
|||
|
||||
data, err := json.Marshal(searchPost)
|
||||
if err != nil {
|
||||
worker.logger.Warn("Failed to marshal JSON, skipping this post.", mlog.String("post_id", post.Id))
|
||||
worker.logger.Warn("Failed to marshal JSON, skipping this post.", mlog.String("post_id", post.Id), mlog.Err(err))
|
||||
continue
|
||||
}
|
||||
|
||||
err = worker.addItemToBulkProcessor(indexName, indexOp, searchPost.Id, bytes.NewReader(data))
|
||||
if err != nil {
|
||||
worker.logger.Warn("Failed to add item to bulk processor", mlog.String("indexName", indexName))
|
||||
worker.logger.Warn("Failed to add item to bulk processor", mlog.String("indexName", indexName), mlog.Err(err))
|
||||
}
|
||||
} else {
|
||||
err := worker.addItemToBulkProcessor(indexName, deleteOp, post.Id, nil)
|
||||
if err != nil {
|
||||
worker.logger.Warn("Failed to add item to bulk processor", mlog.String("indexName", indexName))
|
||||
worker.logger.Warn("Failed to add item to bulk processor", mlog.String("indexName", indexName), mlog.Err(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -484,18 +484,18 @@ func (worker *IndexerWorker) BulkIndexFiles(files []*model.FileForIndexing, prog
|
|||
|
||||
data, err := json.Marshal(searchFile)
|
||||
if err != nil {
|
||||
worker.logger.Warn("Failed to marshal JSON")
|
||||
worker.logger.Warn("Failed to marshal JSON", mlog.Err(err))
|
||||
continue
|
||||
}
|
||||
|
||||
err = worker.addItemToBulkProcessor(indexName, indexOp, searchFile.Id, bytes.NewReader(data))
|
||||
if err != nil {
|
||||
worker.logger.Warn("Failed to add item to bulk processor", mlog.String("indexName", indexName))
|
||||
worker.logger.Warn("Failed to add item to bulk processor", mlog.String("indexName", indexName), mlog.Err(err))
|
||||
}
|
||||
} else {
|
||||
err := worker.addItemToBulkProcessor(indexName, deleteOp, file.Id, nil)
|
||||
if err != nil {
|
||||
worker.logger.Warn("Failed to add item to bulk processor", mlog.String("indexName", indexName))
|
||||
worker.logger.Warn("Failed to add item to bulk processor", mlog.String("indexName", indexName), mlog.Err(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -556,42 +556,35 @@ func BulkIndexChannels(config *model.Config,
|
|||
logger mlog.LoggerIFace,
|
||||
addItemToBulkProcessorFn func(indexName string, indexOp string, docID string, body io.ReadSeeker) error,
|
||||
channels []*model.Channel,
|
||||
progress IndexingProgress) (*model.Channel, *model.AppError) {
|
||||
_ IndexingProgress) (*model.Channel, *model.AppError) {
|
||||
for _, channel := range channels {
|
||||
indexName := *config.ElasticsearchSettings.IndexPrefix + IndexBaseChannels
|
||||
|
||||
if channel.DeleteAt == 0 {
|
||||
var userIDs []string
|
||||
var err error
|
||||
if channel.Type == model.ChannelTypePrivate {
|
||||
userIDs, err = store.Channel().GetAllChannelMemberIdsByChannelId(channel.Id)
|
||||
if err != nil {
|
||||
return nil, model.NewAppError("IndexerWorker.BulkIndexChannels", "ent.elasticsearch.getAllChannelMembers.error", nil, "", http.StatusInternalServerError).Wrap(err)
|
||||
}
|
||||
}
|
||||
|
||||
teamMemberIDs, err := store.Channel().GetTeamMembersForChannel(channel.Id)
|
||||
var userIDs []string
|
||||
var err error
|
||||
if channel.Type == model.ChannelTypePrivate {
|
||||
userIDs, err = store.Channel().GetAllChannelMemberIdsByChannelId(channel.Id)
|
||||
if err != nil {
|
||||
return nil, model.NewAppError("IndexerWorker.BulkIndexChannels", "ent.elasticsearch.getAllTeamMembers.error", nil, "", http.StatusInternalServerError).Wrap(err)
|
||||
return nil, model.NewAppError("IndexerWorker.BulkIndexChannels", "ent.elasticsearch.getAllChannelMembers.error", nil, "", http.StatusInternalServerError).Wrap(err)
|
||||
}
|
||||
}
|
||||
|
||||
searchChannel := ESChannelFromChannel(channel, userIDs, teamMemberIDs)
|
||||
teamMemberIDs, err := store.Channel().GetTeamMembersForChannel(channel.Id)
|
||||
if err != nil {
|
||||
return nil, model.NewAppError("IndexerWorker.BulkIndexChannels", "ent.elasticsearch.getAllTeamMembers.error", nil, "", http.StatusInternalServerError).Wrap(err)
|
||||
}
|
||||
|
||||
data, err := json.Marshal(searchChannel)
|
||||
if err != nil {
|
||||
logger.Warn("Failed to marshal JSON")
|
||||
continue
|
||||
}
|
||||
searchChannel := ESChannelFromChannel(channel, userIDs, teamMemberIDs)
|
||||
|
||||
err = addItemToBulkProcessorFn(indexName, indexOp, searchChannel.Id, bytes.NewReader(data))
|
||||
if err != nil {
|
||||
logger.Warn("Failed to add item to bulk processor", mlog.String("indexName", indexName))
|
||||
}
|
||||
} else {
|
||||
err := addItemToBulkProcessorFn(indexName, deleteOp, channel.Id, nil)
|
||||
if err != nil {
|
||||
logger.Warn("Failed to add item to bulk processor", mlog.String("indexName", indexName))
|
||||
}
|
||||
data, err := json.Marshal(searchChannel)
|
||||
if err != nil {
|
||||
logger.Warn("Failed to marshal JSON", mlog.Err(err))
|
||||
continue
|
||||
}
|
||||
|
||||
err = addItemToBulkProcessorFn(indexName, indexOp, searchChannel.Id, bytes.NewReader(data))
|
||||
if err != nil {
|
||||
logger.Warn("Failed to add item to bulk processor", mlog.String("indexName", indexName), mlog.Err(err))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -653,13 +646,13 @@ func (worker *IndexerWorker) BulkIndexUsers(users []*model.UserForIndexing, prog
|
|||
|
||||
data, err := json.Marshal(searchUser)
|
||||
if err != nil {
|
||||
worker.logger.Warn("Failed to marshal JSON")
|
||||
worker.logger.Warn("Failed to marshal JSON", mlog.Err(err))
|
||||
continue
|
||||
}
|
||||
|
||||
err = worker.addItemToBulkProcessor(indexName, indexOp, searchUser.Id, bytes.NewReader(data))
|
||||
if err != nil {
|
||||
worker.logger.Warn("Failed to add item to bulk processor", mlog.String("indexName", indexName))
|
||||
worker.logger.Warn("Failed to add item to bulk processor", mlog.String("indexName", indexName), mlog.Err(err))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
64
server/enterprise/elasticsearch/common/indexing_job_test.go
Normal file
64
server/enterprise/elasticsearch/common/indexing_job_test.go
Normal file
|
|
@ -0,0 +1,64 @@
|
|||
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
||||
// See LICENSE.enterprise for license information.
|
||||
|
||||
package common
|
||||
|
||||
import (
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/mattermost/mattermost/server/public/model"
|
||||
"github.com/mattermost/mattermost/server/public/shared/mlog"
|
||||
"github.com/mattermost/mattermost/server/v8/channels/store/storetest/mocks"
|
||||
)
|
||||
|
||||
func TestBulkIndexChannelsWithDeletedChannels(t *testing.T) {
|
||||
// Create test channels - one active, one deleted
|
||||
activeChannel := &model.Channel{
|
||||
Id: "ch1",
|
||||
Type: model.ChannelTypeOpen,
|
||||
DeleteAt: 0,
|
||||
}
|
||||
deletedChannel := &model.Channel{
|
||||
Id: "ch2",
|
||||
Type: model.ChannelTypeOpen,
|
||||
DeleteAt: 123456,
|
||||
}
|
||||
channels := []*model.Channel{activeChannel, deletedChannel}
|
||||
|
||||
// Mock store
|
||||
mockStore := &mocks.Store{}
|
||||
mockChannelStore := &mocks.ChannelStore{}
|
||||
mockStore.On("Channel").Return(mockChannelStore)
|
||||
defer mockStore.AssertExpectations(t)
|
||||
|
||||
// Since these are open channels, GetAllChannelMemberIdsByChannelId won't be called
|
||||
// But GetTeamMembersForChannel will be called for both channels
|
||||
mockChannelStore.On("GetTeamMembersForChannel", "ch1").Return([]string{"team1"}, nil)
|
||||
mockChannelStore.On("GetTeamMembersForChannel", "ch2").Return([]string{"team1"}, nil)
|
||||
|
||||
// Track which channels were actually indexed
|
||||
indexedChannels := make(map[string]bool)
|
||||
|
||||
// Mock bulk processor function
|
||||
addItemToBulkProcessorFn := func(_, op, id string, _ io.ReadSeeker) error {
|
||||
assert.Equal(t, indexOp, op) // Should always be index, not delete
|
||||
indexedChannels[id] = true
|
||||
return nil
|
||||
}
|
||||
|
||||
config := &model.Config{}
|
||||
config.ElasticsearchSettings.IndexPrefix = model.NewPointer("test_")
|
||||
|
||||
// Call the function
|
||||
lastChannel, appErr := BulkIndexChannels(config, mockStore, mlog.CreateConsoleTestLogger(t), addItemToBulkProcessorFn, channels, IndexingProgress{})
|
||||
|
||||
// Verify results
|
||||
require.Nil(t, appErr)
|
||||
assert.Equal(t, deletedChannel, lastChannel)
|
||||
assert.True(t, indexedChannels["ch1"], "Active channel should be indexed")
|
||||
assert.True(t, indexedChannels["ch2"], "Deleted channel should also be indexed")
|
||||
}
|
||||
|
|
@ -181,6 +181,9 @@ func GetChannelTemplate(cfg *model.Config) *putindextemplate.Request {
|
|||
"type": types.KeywordProperty{
|
||||
Type: "keyword",
|
||||
},
|
||||
"delete_at": types.LongNumberProperty{
|
||||
Type: "long",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -770,20 +770,40 @@ func (c *CommonTestSuite) TestSearchChannels() {
|
|||
|
||||
c.NoError(c.RefreshIndexFn())
|
||||
|
||||
// Private channels should be returned for right user.
|
||||
ids, appErr := c.ESImpl.SearchChannels("", c.TH.BasicUser.Id, "Channel", false)
|
||||
for _, includeDeleted := range []bool{true, false} {
|
||||
// Private channels should be returned for right user.
|
||||
ids, appErr := c.ESImpl.SearchChannels("", c.TH.BasicUser.Id, "Channel", false, includeDeleted)
|
||||
c.Nil(appErr)
|
||||
c.Len(ids, 2)
|
||||
|
||||
// No private channels if user is guest
|
||||
ids, appErr = c.ESImpl.SearchChannels("", c.TH.BasicUser.Id, "Channel", true, includeDeleted)
|
||||
c.Nil(appErr)
|
||||
c.Len(ids, 1)
|
||||
c.Equal(channel.Id, ids[0])
|
||||
|
||||
// No Private channels should be returned for wrong user.
|
||||
ids, appErr = c.ESImpl.SearchChannels("", "otheruser", "Channel", false, includeDeleted)
|
||||
c.Nil(appErr)
|
||||
c.Len(ids, 1)
|
||||
c.Equal(channel.Id, ids[0])
|
||||
}
|
||||
|
||||
// Adding a deleted channel
|
||||
channelDel := createChannel(c.TH.BasicTeam.Id, "channelD", "Channel Open- Deleted", model.ChannelTypeOpen)
|
||||
channelDel.DeleteAt = 123
|
||||
c.Nil(c.ESImpl.IndexChannel(c.TH.Context, channelDel, []string{}, []string{c.TH.BasicUser.Id, "otheruser"}))
|
||||
c.NoError(c.RefreshIndexFn())
|
||||
|
||||
ids, appErr := c.ESImpl.SearchChannels("", c.TH.BasicUser.Id, "Channel", false, false)
|
||||
c.Nil(appErr)
|
||||
c.Len(ids, 2)
|
||||
|
||||
// No private channels if user is guest
|
||||
ids, appErr = c.ESImpl.SearchChannels("", c.TH.BasicUser.Id, "Channel", true)
|
||||
ids, appErr = c.ESImpl.SearchChannels("", c.TH.BasicUser.Id, "Channel", false, true)
|
||||
c.Nil(appErr)
|
||||
c.Len(ids, 1)
|
||||
c.Equal(channel.Id, ids[0])
|
||||
c.Len(ids, 3)
|
||||
|
||||
// No Private channels should be returned for wrong user.
|
||||
ids, appErr = c.ESImpl.SearchChannels("", "otheruser", "Channel", false)
|
||||
ids, appErr = c.ESImpl.SearchChannels("", c.TH.BasicUser.Id, "Deleted", false, true)
|
||||
c.Nil(appErr)
|
||||
c.Len(ids, 1)
|
||||
c.Equal(channel.Id, ids[0])
|
||||
}
|
||||
|
|
|
|||
|
|
@ -778,7 +778,7 @@ func (es *ElasticsearchInterfaceImpl) IndexChannel(rctx request.CTX, channel *mo
|
|||
return nil
|
||||
}
|
||||
|
||||
func (es *ElasticsearchInterfaceImpl) SearchChannels(teamId, userID string, term string, isGuest bool) ([]string, *model.AppError) {
|
||||
func (es *ElasticsearchInterfaceImpl) SearchChannels(teamId, userID string, term string, isGuest, includeDeleted bool) ([]string, *model.AppError) {
|
||||
es.mutex.RLock()
|
||||
defer es.mutex.RUnlock()
|
||||
|
||||
|
|
@ -843,6 +843,14 @@ func (es *ElasticsearchInterfaceImpl) SearchChannels(teamId, userID string, term
|
|||
})
|
||||
}
|
||||
|
||||
if !includeDeleted {
|
||||
query.Filter = append(query.Filter, types.Query{
|
||||
Term: map[string]types.TermQuery{
|
||||
"delete_at": {Value: 0},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
search := es.client.Search().
|
||||
Index(*es.Platform.Config().ElasticsearchSettings.IndexPrefix + common.IndexBaseChannels).
|
||||
Request(&search.Request{
|
||||
|
|
|
|||
|
|
@ -860,7 +860,7 @@ func (os *OpensearchInterfaceImpl) IndexChannel(rctx request.CTX, channel *model
|
|||
return nil
|
||||
}
|
||||
|
||||
func (os *OpensearchInterfaceImpl) SearchChannels(teamId, userID string, term string, isGuest bool) ([]string, *model.AppError) {
|
||||
func (os *OpensearchInterfaceImpl) SearchChannels(teamId, userID string, term string, isGuest, includeDeleted bool) ([]string, *model.AppError) {
|
||||
os.mutex.RLock()
|
||||
defer os.mutex.RUnlock()
|
||||
|
||||
|
|
@ -925,6 +925,14 @@ func (os *OpensearchInterfaceImpl) SearchChannels(teamId, userID string, term st
|
|||
})
|
||||
}
|
||||
|
||||
if !includeDeleted {
|
||||
query.Filter = append(query.Filter, types.Query{
|
||||
Term: map[string]types.TermQuery{
|
||||
"delete_at": {Value: 0},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
buf, err := json.Marshal(search.Request{
|
||||
Query: &types.Query{Bool: query},
|
||||
})
|
||||
|
|
|
|||
|
|
@ -315,7 +315,7 @@ func (b *BleveEngine) IndexChannel(_ request.CTX, channel *model.Channel, userID
|
|||
return nil
|
||||
}
|
||||
|
||||
func (b *BleveEngine) SearchChannels(teamId, userID, term string, isGuest bool) ([]string, *model.AppError) {
|
||||
func (b *BleveEngine) SearchChannels(teamId, userID, term string, isGuest, _ bool) ([]string, *model.AppError) {
|
||||
// This query essentially boils down to (if teamID is passed):
|
||||
// match teamID == <>
|
||||
// AND
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ type SearchEngineInterface interface {
|
|||
// IndexChannel indexes a given channel. The userIDs are only populated
|
||||
// for private channels.
|
||||
IndexChannel(rctx request.CTX, channel *model.Channel, userIDs, teamMemberIDs []string) *model.AppError
|
||||
SearchChannels(teamId, userID, term string, isGuest bool) ([]string, *model.AppError)
|
||||
SearchChannels(teamId, userID, term string, isGuest, includeDeleted bool) ([]string, *model.AppError)
|
||||
DeleteChannel(channel *model.Channel) *model.AppError
|
||||
IndexUser(rctx request.CTX, user *model.User, teamsIds, channelsIds []string) *model.AppError
|
||||
SearchUsersInChannel(teamId, channelId string, restrictedToChannels []string, term string, options *model.UserSearchOptions) ([]string, []string, *model.AppError)
|
||||
|
|
|
|||
|
|
@ -557,9 +557,9 @@ func (_m *SearchEngineInterface) RefreshIndexes(rctx request.CTX) *model.AppErro
|
|||
return r0
|
||||
}
|
||||
|
||||
// SearchChannels provides a mock function with given fields: teamId, userID, term, isGuest
|
||||
func (_m *SearchEngineInterface) SearchChannels(teamId string, userID string, term string, isGuest bool) ([]string, *model.AppError) {
|
||||
ret := _m.Called(teamId, userID, term, isGuest)
|
||||
// SearchChannels provides a mock function with given fields: teamId, userID, term, isGuest, includeDeleted
|
||||
func (_m *SearchEngineInterface) SearchChannels(teamId string, userID string, term string, isGuest bool, includeDeleted bool) ([]string, *model.AppError) {
|
||||
ret := _m.Called(teamId, userID, term, isGuest, includeDeleted)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for SearchChannels")
|
||||
|
|
@ -567,19 +567,19 @@ func (_m *SearchEngineInterface) SearchChannels(teamId string, userID string, te
|
|||
|
||||
var r0 []string
|
||||
var r1 *model.AppError
|
||||
if rf, ok := ret.Get(0).(func(string, string, string, bool) ([]string, *model.AppError)); ok {
|
||||
return rf(teamId, userID, term, isGuest)
|
||||
if rf, ok := ret.Get(0).(func(string, string, string, bool, bool) ([]string, *model.AppError)); ok {
|
||||
return rf(teamId, userID, term, isGuest, includeDeleted)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(string, string, string, bool) []string); ok {
|
||||
r0 = rf(teamId, userID, term, isGuest)
|
||||
if rf, ok := ret.Get(0).(func(string, string, string, bool, bool) []string); ok {
|
||||
r0 = rf(teamId, userID, term, isGuest, includeDeleted)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).([]string)
|
||||
}
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(string, string, string, bool) *model.AppError); ok {
|
||||
r1 = rf(teamId, userID, term, isGuest)
|
||||
if rf, ok := ret.Get(1).(func(string, string, string, bool, bool) *model.AppError); ok {
|
||||
r1 = rf(teamId, userID, term, isGuest, includeDeleted)
|
||||
} else {
|
||||
if ret.Get(1) != nil {
|
||||
r1 = ret.Get(1).(*model.AppError)
|
||||
|
|
|
|||
Loading…
Reference in a new issue