MM-11886: materialize channel search (#9349)

* materialize PublicChannels table

Introduce triggers for each supported database that automatically maintain a subset of the Channels table corresponding to only public channels. This improves corresponding queries that no longer need to filter out 99% DM channels.

This initial commit modifies the channel store directly for easier code reviewing, but the next wraps an experimental version around it to enable a kill switch in case there are unforeseen performance regressions.

This addresses [MM-11886](https://mattermost.atlassian.net/browse/MM-11886) and [MM-11945](https://mattermost.atlassian.net/browse/MM-11945).

* extract the experimental public channels materialization

Wrap the original channel store with an experimental version that
leverages the materialized public channels, but can be disabled to
fallback to the original implementation.

This addresses MM-11947.

* s/ExperimentalPublicChannelsMaterialization/EnablePublicChannelsMaterialization/

* simplify error handling

* move experimental config listener until after store is initialized
This commit is contained in:
Jesse Hallam 2018-09-13 13:47:17 -04:00 committed by GitHub
parent 0a5f792d2d
commit 8b17bf9e42
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 1794 additions and 628 deletions

View file

@ -212,6 +212,14 @@ func New(options ...Option) (outApp *App, outErr error) {
}
app.Srv.Store = app.newStore()
app.AddConfigListener(func(_, current *model.Config) {
if current.SqlSettings.EnablePublicChannelsMaterialization != nil && !*current.SqlSettings.EnablePublicChannelsMaterialization {
app.Srv.Store.Channel().DisableExperimentalPublicChannelsMaterialization()
} else {
app.Srv.Store.Channel().EnableExperimentalPublicChannelsMaterialization()
}
})
if err := app.ensureAsymmetricSigningKey(); err != nil {
return nil, errors.Wrapf(err, "unable to ensure asymmetric signing key")
}

View file

@ -130,7 +130,8 @@
"MaxOpenConns": 300,
"Trace": false,
"AtRestEncryptKey": "",
"QueryTimeout": 30
"QueryTimeout": 30,
"EnablePublicChannelsMaterialization": true
},
"LogSettings": {
"EnableConsole": true,

View file

@ -644,16 +644,17 @@ type SSOSettings struct {
}
type SqlSettings struct {
DriverName *string
DataSource *string
DataSourceReplicas []string
DataSourceSearchReplicas []string
MaxIdleConns *int
ConnMaxLifetimeMilliseconds *int
MaxOpenConns *int
Trace bool
AtRestEncryptKey string
QueryTimeout *int
DriverName *string
DataSource *string
DataSourceReplicas []string
DataSourceSearchReplicas []string
MaxIdleConns *int
ConnMaxLifetimeMilliseconds *int
MaxOpenConns *int
Trace bool
AtRestEncryptKey string
QueryTimeout *int
EnablePublicChannelsMaterialization *bool
}
func (s *SqlSettings) SetDefaults() {
@ -684,6 +685,10 @@ func (s *SqlSettings) SetDefaults() {
if s.QueryTimeout == nil {
s.QueryTimeout = NewInt(30)
}
if s.EnablePublicChannelsMaterialization == nil {
s.EnablePublicChannelsMaterialization = NewBool(true)
}
}
type LogSettings struct {

View file

@ -301,6 +301,21 @@ func (s SqlChannelStore) CreateIndexesIfNotExists() {
s.CreateFullTextIndexIfNotExists("idx_channel_search_txt", "Channels", "Name, DisplayName, Purpose")
}
func (s SqlChannelStore) CreateTriggersIfNotExists() error {
// See SqlChannelStoreExperimental
return nil
}
func (s SqlChannelStore) MigratePublicChannels() error {
// See SqlChannelStoreExperimental
return nil
}
func (s SqlChannelStore) DropPublicChannels() error {
// See SqlChannelStoreExperimental
return nil
}
func (s SqlChannelStore) Save(channel *model.Channel, maxChannelsPerTeam int64) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
if channel.DeleteAt != 0 {
@ -804,12 +819,12 @@ func (s SqlChannelStore) GetTeamChannels(teamId string) store.StoreChannel {
_, err := s.GetReplica().Select(data, "SELECT * FROM Channels WHERE TeamId = :TeamId And Type != 'D' ORDER BY DisplayName", map[string]interface{}{"TeamId": teamId})
if err != nil {
result.Err = model.NewAppError("SqlChannelStore.GetChannels", "store.sql_channel.get_channels.get.app_error", nil, "teamId="+teamId+", err="+err.Error(), http.StatusInternalServerError)
result.Err = model.NewAppError("SqlChannelStore.GetTeamChannels", "store.sql_channel.get_channels.get.app_error", nil, "teamId="+teamId+", err="+err.Error(), http.StatusInternalServerError)
return
}
if len(*data) == 0 {
result.Err = model.NewAppError("SqlChannelStore.GetChannels", "store.sql_channel.get_channels.not_found.app_error", nil, "teamId="+teamId, http.StatusNotFound)
result.Err = model.NewAppError("SqlChannelStore.GetTeamChannels", "store.sql_channel.get_channels.not_found.app_error", nil, "teamId="+teamId, http.StatusNotFound)
return
}
@ -962,16 +977,16 @@ var CHANNEL_MEMBERS_WITH_SCHEME_SELECT_QUERY = `
TeamScheme.DefaultChannelAdminRole TeamSchemeDefaultAdminRole,
ChannelScheme.DefaultChannelUserRole ChannelSchemeDefaultUserRole,
ChannelScheme.DefaultChannelAdminRole ChannelSchemeDefaultAdminRole
FROM
FROM
ChannelMembers
INNER JOIN
INNER JOIN
Channels ON ChannelMembers.ChannelId = Channels.Id
LEFT JOIN
Schemes ChannelScheme ON Channels.SchemeId = ChannelScheme.Id
LEFT JOIN
Teams ON Channels.TeamId = Teams.Id
LEFT JOIN
Schemes TeamScheme ON Teams.SchemeId = TeamScheme.Id
Schemes TeamScheme ON Teams.SchemeId = TeamScheme.Id
`
func (s SqlChannelStore) SaveMember(member *model.ChannelMember) store.StoreChannel {
@ -1988,3 +2003,16 @@ func (s SqlChannelStore) ResetLastPostAt() store.StoreChannel {
}
})
}
func (s SqlChannelStore) EnableExperimentalPublicChannelsMaterialization() {
// See SqlChannelStoreExperimental
}
func (s SqlChannelStore) DisableExperimentalPublicChannelsMaterialization() {
// See SqlChannelStoreExperimental
}
func (s SqlChannelStore) IsExperimentalPublicChannelsMaterializationEnabled() bool {
// See SqlChannelStoreExperimental
return false
}

View file

@ -0,0 +1,819 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See License.txt for license information.
package sqlstore
import (
"fmt"
"net/http"
"sort"
"strconv"
"strings"
"sync/atomic"
"github.com/pkg/errors"
"github.com/mattermost/mattermost-server/einterfaces"
"github.com/mattermost/mattermost-server/mlog"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/store"
)
// publicChannel is a subset of the metadata corresponding to public channels only.
type publicChannel struct {
Id string `json:"id"`
DeleteAt int64 `json:"delete_at"`
TeamId string `json:"team_id"`
DisplayName string `json:"display_name"`
Name string `json:"name"`
Header string `json:"header"`
Purpose string `json:"purpose"`
}
type SqlChannelStoreExperimental struct {
SqlChannelStore
experimentalPublicChannelsMaterializationDisabled *uint32
}
func NewSqlChannelStoreExperimental(sqlStore SqlStore, metrics einterfaces.MetricsInterface, enabled bool) store.ChannelStore {
s := &SqlChannelStoreExperimental{
SqlChannelStore: *NewSqlChannelStore(sqlStore, metrics).(*SqlChannelStore),
experimentalPublicChannelsMaterializationDisabled: new(uint32),
}
if enabled {
// Forcibly log, since the default state is enabled and we want this on startup.
mlog.Info("Enabling experimental public channels materialization")
s.EnableExperimentalPublicChannelsMaterialization()
} else {
s.DisableExperimentalPublicChannelsMaterialization()
}
if s.IsExperimentalPublicChannelsMaterializationEnabled() {
for _, db := range sqlStore.GetAllConns() {
tablePublicChannels := db.AddTableWithName(publicChannel{}, "PublicChannels").SetKeys(false, "Id")
tablePublicChannels.ColMap("Id").SetMaxSize(26)
tablePublicChannels.ColMap("TeamId").SetMaxSize(26)
tablePublicChannels.ColMap("DisplayName").SetMaxSize(64)
tablePublicChannels.ColMap("Name").SetMaxSize(64)
tablePublicChannels.SetUniqueTogether("Name", "TeamId")
tablePublicChannels.ColMap("Header").SetMaxSize(1024)
tablePublicChannels.ColMap("Purpose").SetMaxSize(250)
}
}
return s
}
// migratePublicChannels initializes the PublicChannels table with data created before the triggers
// took over keeping it up-to-date.
func (s SqlChannelStoreExperimental) MigratePublicChannels() error {
if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
return s.SqlChannelStore.MigratePublicChannels()
}
transaction, err := s.GetMaster().Begin()
if err != nil {
return err
}
if _, err := transaction.Exec(`
INSERT INTO PublicChannels
(Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose)
SELECT
c.Id, c.DeleteAt, c.TeamId, c.DisplayName, c.Name, c.Header, c.Purpose
FROM
Channels c
LEFT JOIN
PublicChannels pc ON (pc.Id = c.Id)
WHERE
c.Type = 'O'
AND pc.Id IS NULL
`); err != nil {
return err
}
if err := transaction.Commit(); err != nil {
return err
}
return nil
}
// DropPublicChannels removes the public channels table and all associated triggers.
func (s SqlChannelStoreExperimental) DropPublicChannels() error {
// Only PostgreSQL will honour the transaction when executing the DDL changes below.
transaction, err := s.GetMaster().Begin()
if err != nil {
return err
}
if s.DriverName() == model.DATABASE_DRIVER_POSTGRES {
if _, err := transaction.Exec(`
DROP TRIGGER IF EXISTS trigger_channels ON Channels
`); err != nil {
return err
}
if _, err := transaction.Exec(`
DROP FUNCTION IF EXISTS channels_copy_to_public_channels
`); err != nil {
return err
}
} else if s.DriverName() == model.DATABASE_DRIVER_MYSQL {
if _, err := transaction.Exec(`
DROP TRIGGER IF EXISTS trigger_channels_insert
`); err != nil {
return err
}
if _, err := transaction.Exec(`
DROP TRIGGER IF EXISTS trigger_channels_update
`); err != nil {
return err
}
if _, err := transaction.Exec(`
DROP TRIGGER IF EXISTS trigger_channels_delete
`); err != nil {
return err
}
} else if s.DriverName() == model.DATABASE_DRIVER_SQLITE {
if _, err := transaction.Exec(`
DROP TRIGGER IF EXISTS trigger_channels_insert
`); err != nil {
return err
}
if _, err := transaction.Exec(`
DROP TRIGGER IF EXISTS trigger_channels_update_delete
`); err != nil {
return err
}
if _, err := transaction.Exec(`
DROP TRIGGER IF EXISTS trigger_channels_update
`); err != nil {
return err
}
if _, err := transaction.Exec(`
DROP TRIGGER IF EXISTS trigger_channels_delete
`); err != nil {
return err
}
} else {
return errors.New("failed to create trigger because of missing driver")
}
if _, err := transaction.Exec(`
DROP TABLE IF EXISTS PublicChannels
`); err != nil {
return err
}
if err := transaction.Commit(); err != nil {
return err
}
return nil
}
func (s SqlChannelStoreExperimental) CreateIndexesIfNotExists() {
s.SqlChannelStore.CreateIndexesIfNotExists()
if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
return
}
s.CreateIndexIfNotExists("idx_publicchannels_team_id", "PublicChannels", "TeamId")
s.CreateIndexIfNotExists("idx_publicchannels_name", "PublicChannels", "Name")
s.CreateIndexIfNotExists("idx_publicchannels_delete_at", "PublicChannels", "DeleteAt")
if s.DriverName() == model.DATABASE_DRIVER_POSTGRES {
s.CreateIndexIfNotExists("idx_publicchannels_name_lower", "PublicChannels", "lower(Name)")
s.CreateIndexIfNotExists("idx_publicchannels_displayname_lower", "PublicChannels", "lower(DisplayName)")
}
s.CreateFullTextIndexIfNotExists("idx_publicchannels_search_txt", "PublicChannels", "Name, DisplayName, Purpose")
}
func (s SqlChannelStoreExperimental) CreateTriggersIfNotExists() error {
s.SqlChannelStore.CreateTriggersIfNotExists()
if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
return nil
}
if s.DriverName() == model.DATABASE_DRIVER_POSTGRES {
if !s.DoesTriggerExist("trigger_channels") {
transaction, err := s.GetMaster().Begin()
if err != nil {
return errors.Wrap(err, "failed to create trigger function")
}
if _, err := transaction.ExecNoTimeout(`
CREATE OR REPLACE FUNCTION channels_copy_to_public_channels() RETURNS TRIGGER
SECURITY DEFINER
LANGUAGE plpgsql
AS $$
DECLARE
counter int := 0;
BEGIN
IF (TG_OP = 'DELETE' AND OLD.Type = 'O') OR (TG_OP = 'UPDATE' AND NEW.Type != 'O') THEN
DELETE FROM
PublicChannels
WHERE
Id = OLD.Id;
ELSEIF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') AND NEW.Type = 'O' THEN
UPDATE
PublicChannels
SET
DeleteAt = NEW.DeleteAt,
TeamId = NEW.TeamId,
DisplayName = NEW.DisplayName,
Name = NEW.Name,
Header = NEW.Header,
Purpose = NEW.Purpose
WHERE
Id = NEW.Id;
-- There's a race condition here where the INSERT might fail, though this should only occur
-- if PublicChannels had been modified outside of the triggers. We could improve this with
-- the UPSERT functionality in Postgres 9.5+ once we support same.
IF NOT FOUND THEN
INSERT INTO
PublicChannels(Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose)
VALUES
(NEW.Id, NEW.DeleteAt, NEW.TeamId, NEW.DisplayName, NEW.Name, NEW.Header, NEW.Purpose);
END IF;
END IF;
RETURN NULL;
END
$$;
`); err != nil {
return errors.Wrap(err, "failed to create trigger function")
}
if _, err := transaction.ExecNoTimeout(`
CREATE TRIGGER
trigger_channels
AFTER INSERT OR UPDATE OR DELETE ON
Channels
FOR EACH ROW EXECUTE PROCEDURE
channels_copy_to_public_channels();
`); err != nil {
return errors.Wrap(err, "failed to create trigger")
}
if err := transaction.Commit(); err != nil {
return errors.Wrap(err, "failed to create trigger function")
}
}
} else if s.DriverName() == model.DATABASE_DRIVER_MYSQL {
// Note that DDL statements in MySQL (CREATE TABLE, CREATE TRIGGER, etc.) cannot
// be rolled back inside a transaction (unlike PostgreSQL), so there's no point in
// wrapping what follows inside a transaction.
if !s.DoesTriggerExist("trigger_channels_insert") {
if _, err := s.GetMaster().ExecNoTimeout(`
CREATE TRIGGER
trigger_channels_insert
AFTER INSERT ON
Channels
FOR EACH ROW
BEGIN
IF NEW.Type = 'O' THEN
INSERT INTO
PublicChannels(Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose)
VALUES
(NEW.Id, NEW.DeleteAt, NEW.TeamId, NEW.DisplayName, NEW.Name, NEW.Header, NEW.Purpose)
ON DUPLICATE KEY UPDATE
DeleteAt = NEW.DeleteAt,
TeamId = NEW.TeamId,
DisplayName = NEW.DisplayName,
Name = NEW.Name,
Header = NEW.Header,
Purpose = NEW.Purpose;
END IF;
END;
`); err != nil {
return errors.Wrap(err, "failed to create trigger_channels_insert trigger")
}
}
if !s.DoesTriggerExist("trigger_channels_update") {
if _, err := s.GetMaster().ExecNoTimeout(`
CREATE TRIGGER
trigger_channels_update
AFTER UPDATE ON
Channels
FOR EACH ROW
BEGIN
IF OLD.Type = 'O' AND NEW.Type != 'O' THEN
DELETE FROM
PublicChannels
WHERE
Id = NEW.Id;
ELSEIF NEW.Type = 'O' THEN
INSERT INTO
PublicChannels(Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose)
VALUES
(NEW.Id, NEW.DeleteAt, NEW.TeamId, NEW.DisplayName, NEW.Name, NEW.Header, NEW.Purpose)
ON DUPLICATE KEY UPDATE
DeleteAt = NEW.DeleteAt,
TeamId = NEW.TeamId,
DisplayName = NEW.DisplayName,
Name = NEW.Name,
Header = NEW.Header,
Purpose = NEW.Purpose;
END IF;
END;
`); err != nil {
return errors.Wrap(err, "failed to create trigger_channels_update trigger")
}
}
if !s.DoesTriggerExist("trigger_channels_delete") {
if _, err := s.GetMaster().ExecNoTimeout(`
CREATE TRIGGER
trigger_channels_delete
AFTER DELETE ON
Channels
FOR EACH ROW
BEGIN
IF OLD.Type = 'O' THEN
DELETE FROM
PublicChannels
WHERE
Id = OLD.Id;
END IF;
END;
`); err != nil {
return errors.Wrap(err, "failed to create trigger_channels_delete trigger")
}
}
} else if s.DriverName() == model.DATABASE_DRIVER_SQLITE {
if _, err := s.GetMaster().ExecNoTimeout(`
CREATE TRIGGER IF NOT EXISTS
trigger_channels_insert
AFTER INSERT ON
Channels
FOR EACH ROW
WHEN NEW.Type = 'O'
BEGIN
-- Ideally, we'd leverage ON CONFLICT DO UPDATE below and make this INSERT resilient to pre-existing
-- data. However, the version of Sqlite we're compiling against doesn't support this. This isn't
-- critical, though, since we don't support Sqlite in production.
INSERT INTO
PublicChannels(Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose)
VALUES
(NEW.Id, NEW.DeleteAt, NEW.TeamId, NEW.DisplayName, NEW.Name, NEW.Header, NEW.Purpose);
END;
`); err != nil {
return errors.Wrap(err, "failed to create trigger_channels_insert trigger")
}
if _, err := s.GetMaster().ExecNoTimeout(`
CREATE TRIGGER IF NOT EXISTS
trigger_channels_update_delete
AFTER UPDATE ON
Channels
FOR EACH ROW
WHEN
OLD.Type = 'O'
AND NEW.Type != 'O'
BEGIN
DELETE FROM
PublicChannels
WHERE
Id = NEW.Id;
END;
`); err != nil {
return errors.Wrap(err, "failed to create trigger_channels_update_delete trigger")
}
if _, err := s.GetMaster().ExecNoTimeout(`
CREATE TRIGGER IF NOT EXISTS
trigger_channels_update
AFTER UPDATE ON
Channels
FOR EACH ROW
WHEN
OLD.Type != 'O'
AND NEW.Type = 'O'
BEGIN
-- See comments re: ON CONFLICT DO UPDATE above that would apply here as well.
UPDATE
PublicChannels
SET
DeleteAt = NEW.DeleteAt,
TeamId = NEW.TeamId,
DisplayName = NEW.DisplayName,
Name = NEW.Name,
Header = NEW.Header,
Purpose = NEW.Purpose
WHERE
Id = NEW.Id;
END;
`); err != nil {
return errors.Wrap(err, "failed to create trigger_channels_update trigger")
}
if _, err := s.GetMaster().ExecNoTimeout(`
CREATE TRIGGER IF NOT EXISTS
trigger_channels_delete
AFTER UPDATE ON
Channels
FOR EACH ROW
WHEN
OLD.Type = 'O'
BEGIN
DELETE FROM
PublicChannels
WHERE
Id = OLD.Id;
END;
`); err != nil {
return errors.Wrap(err, "failed to create trigger_channels_delete trigger")
}
} else {
return errors.New("failed to create trigger because of missing driver")
}
return nil
}
func (s SqlChannelStoreExperimental) GetMoreChannels(teamId string, userId string, offset int, limit int) store.StoreChannel {
if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
return s.SqlChannelStore.GetMoreChannels(teamId, userId, offset, limit)
}
return store.Do(func(result *store.StoreResult) {
data := &model.ChannelList{}
_, err := s.GetReplica().Select(data, `
SELECT
Channels.*
FROM
Channels
JOIN
PublicChannels c ON (c.Id = Channels.Id)
WHERE
c.TeamId = :TeamId
AND c.DeleteAt = 0
AND c.Id NOT IN (
SELECT
c.Id
FROM
PublicChannels c
JOIN
ChannelMembers cm ON (cm.ChannelId = c.Id)
WHERE
c.TeamId = :TeamId
AND cm.UserId = :UserId
AND c.DeleteAt = 0
)
ORDER BY
c.DisplayName
LIMIT :Limit
OFFSET :Offset
`, map[string]interface{}{
"TeamId": teamId,
"UserId": userId,
"Limit": limit,
"Offset": offset,
})
if err != nil {
result.Err = model.NewAppError("SqlChannelStore.GetMoreChannels", "store.sql_channel.get_more_channels.get.app_error", nil, "teamId="+teamId+", userId="+userId+", err="+err.Error(), http.StatusInternalServerError)
return
}
result.Data = data
})
}
func (s SqlChannelStoreExperimental) GetPublicChannelsForTeam(teamId string, offset int, limit int) store.StoreChannel {
if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
return s.SqlChannelStore.GetPublicChannelsForTeam(teamId, offset, limit)
}
return store.Do(func(result *store.StoreResult) {
data := &model.ChannelList{}
_, err := s.GetReplica().Select(data, `
SELECT
Channels.*
FROM
Channels
JOIN
PublicChannels pc ON (pc.Id = Channels.Id)
WHERE
pc.TeamId = :TeamId
AND pc.DeleteAt = 0
ORDER BY pc.DisplayName
LIMIT :Limit
OFFSET :Offset
`, map[string]interface{}{
"TeamId": teamId,
"Limit": limit,
"Offset": offset,
})
if err != nil {
result.Err = model.NewAppError("SqlChannelStore.GetPublicChannelsForTeam", "store.sql_channel.get_public_channels.get.app_error", nil, "teamId="+teamId+", err="+err.Error(), http.StatusInternalServerError)
return
}
result.Data = data
})
}
func (s SqlChannelStoreExperimental) GetPublicChannelsByIdsForTeam(teamId string, channelIds []string) store.StoreChannel {
if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
return s.SqlChannelStore.GetPublicChannelsByIdsForTeam(teamId, channelIds)
}
return store.Do(func(result *store.StoreResult) {
props := make(map[string]interface{})
props["teamId"] = teamId
idQuery := ""
for index, channelId := range channelIds {
if len(idQuery) > 0 {
idQuery += ", "
}
props["channelId"+strconv.Itoa(index)] = channelId
idQuery += ":channelId" + strconv.Itoa(index)
}
data := &model.ChannelList{}
_, err := s.GetReplica().Select(data, `
SELECT
Channels.*
FROM
Channels
JOIN
PublicChannels pc ON (pc.Id = Channels.Id)
WHERE
pc.TeamId = :teamId
AND pc.DeleteAt = 0
AND pc.Id IN (`+idQuery+`)
ORDER BY pc.DisplayName
`, props)
if err != nil {
result.Err = model.NewAppError("SqlChannelStore.GetPublicChannelsByIdsForTeam", "store.sql_channel.get_channels_by_ids.get.app_error", nil, err.Error(), http.StatusInternalServerError)
}
if len(*data) == 0 {
result.Err = model.NewAppError("SqlChannelStore.GetPublicChannelsByIdsForTeam", "store.sql_channel.get_channels_by_ids.not_found.app_error", nil, "", http.StatusNotFound)
}
result.Data = data
})
}
func (s SqlChannelStoreExperimental) AutocompleteInTeam(teamId string, term string, includeDeleted bool) store.StoreChannel {
if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
return s.SqlChannelStore.AutocompleteInTeam(teamId, term, includeDeleted)
}
return store.Do(func(result *store.StoreResult) {
deleteFilter := "AND c.DeleteAt = 0"
if includeDeleted {
deleteFilter = ""
}
queryFormat := `
SELECT
Channels.*
FROM
Channels
JOIN
PublicChannels c ON (c.Id = Channels.Id)
WHERE
c.TeamId = :TeamId
` + deleteFilter + `
%v
LIMIT 50
`
var channels model.ChannelList
if likeClause, likeTerm := s.buildLIKEClause(term); likeClause == "" {
if _, err := s.GetReplica().Select(&channels, fmt.Sprintf(queryFormat, ""), map[string]interface{}{"TeamId": teamId}); err != nil {
result.Err = model.NewAppError("SqlChannelStore.AutocompleteInTeam", "store.sql_channel.search.app_error", nil, "term="+term+", "+", "+err.Error(), http.StatusInternalServerError)
}
} else {
// Using a UNION results in index_merge and fulltext queries and is much faster than the ref
// query you would get using an OR of the LIKE and full-text clauses.
fulltextClause, fulltextTerm := s.buildFulltextClause(term)
likeQuery := fmt.Sprintf(queryFormat, "AND "+likeClause)
fulltextQuery := fmt.Sprintf(queryFormat, "AND "+fulltextClause)
query := fmt.Sprintf("(%v) UNION (%v) LIMIT 50", likeQuery, fulltextQuery)
if _, err := s.GetReplica().Select(&channels, query, map[string]interface{}{"TeamId": teamId, "LikeTerm": likeTerm, "FulltextTerm": fulltextTerm}); err != nil {
result.Err = model.NewAppError("SqlChannelStore.AutocompleteInTeam", "store.sql_channel.search.app_error", nil, "term="+term+", "+", "+err.Error(), http.StatusInternalServerError)
}
}
sort.Slice(channels, func(a, b int) bool {
return strings.ToLower(channels[a].DisplayName) < strings.ToLower(channels[b].DisplayName)
})
result.Data = &channels
})
}
func (s SqlChannelStoreExperimental) SearchInTeam(teamId string, term string, includeDeleted bool) store.StoreChannel {
if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
return s.SqlChannelStore.SearchInTeam(teamId, term, includeDeleted)
}
return store.Do(func(result *store.StoreResult) {
deleteFilter := "AND c.DeleteAt = 0"
if includeDeleted {
deleteFilter = ""
}
*result = s.performSearch(`
SELECT
Channels.*
FROM
Channels
JOIN
PublicChannels c ON (c.Id = Channels.Id)
WHERE
c.TeamId = :TeamId
`+deleteFilter+`
SEARCH_CLAUSE
ORDER BY c.DisplayName
LIMIT 100
`, term, map[string]interface{}{
"TeamId": teamId,
})
})
}
func (s SqlChannelStoreExperimental) SearchMore(userId string, teamId string, term string) store.StoreChannel {
if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
return s.SqlChannelStore.SearchMore(userId, teamId, term)
}
return store.Do(func(result *store.StoreResult) {
*result = s.performSearch(`
SELECT
Channels.*
FROM
Channels
JOIN
PublicChannels c ON (c.Id = Channels.Id)
WHERE
c.TeamId = :TeamId
AND c.DeleteAt = 0
AND c.Id NOT IN (
SELECT
c.Id
FROM
PublicChannels c
JOIN
ChannelMembers cm ON (cm.ChannelId = c.Id)
WHERE
c.TeamId = :TeamId
AND cm.UserId = :UserId
AND c.DeleteAt = 0
)
SEARCH_CLAUSE
ORDER BY c.DisplayName
LIMIT 100
`, term, map[string]interface{}{
"TeamId": teamId,
"UserId": userId,
})
})
}
func (s SqlChannelStoreExperimental) buildLIKEClause(term string) (likeClause, likeTerm string) {
if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
return s.SqlChannelStore.buildLIKEClause(term)
}
likeTerm = term
searchColumns := "c.Name, c.DisplayName, c.Purpose"
// These chars must be removed from the like query.
for _, c := range ignoreLikeSearchChar {
likeTerm = strings.Replace(likeTerm, c, "", -1)
}
// These chars must be escaped in the like query.
for _, c := range escapeLikeSearchChar {
likeTerm = strings.Replace(likeTerm, c, "*"+c, -1)
}
if likeTerm == "" {
return
}
// Prepare the LIKE portion of the query.
var searchFields []string
for _, field := range strings.Split(searchColumns, ", ") {
if s.DriverName() == model.DATABASE_DRIVER_POSTGRES {
searchFields = append(searchFields, fmt.Sprintf("lower(%s) LIKE lower(%s) escape '*'", field, ":LikeTerm"))
} else {
searchFields = append(searchFields, fmt.Sprintf("%s LIKE %s escape '*'", field, ":LikeTerm"))
}
}
likeClause = fmt.Sprintf("(%s)", strings.Join(searchFields, " OR "))
likeTerm += "%"
return
}
func (s SqlChannelStoreExperimental) buildFulltextClause(term string) (fulltextClause, fulltextTerm string) {
if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
return s.SqlChannelStore.buildFulltextClause(term)
}
// Copy the terms as we will need to prepare them differently for each search type.
fulltextTerm = term
searchColumns := "c.Name, c.DisplayName, c.Purpose"
// These chars must be treated as spaces in the fulltext query.
for _, c := range spaceFulltextSearchChar {
fulltextTerm = strings.Replace(fulltextTerm, c, " ", -1)
}
// Prepare the FULLTEXT portion of the query.
if s.DriverName() == model.DATABASE_DRIVER_POSTGRES {
fulltextTerm = strings.Replace(fulltextTerm, "|", "", -1)
splitTerm := strings.Fields(fulltextTerm)
for i, t := range strings.Fields(fulltextTerm) {
if i == len(splitTerm)-1 {
splitTerm[i] = t + ":*"
} else {
splitTerm[i] = t + ":* &"
}
}
fulltextTerm = strings.Join(splitTerm, " ")
fulltextClause = fmt.Sprintf("((%s) @@ to_tsquery(:FulltextTerm))", convertMySQLFullTextColumnsToPostgres(searchColumns))
} else if s.DriverName() == model.DATABASE_DRIVER_MYSQL {
splitTerm := strings.Fields(fulltextTerm)
for i, t := range strings.Fields(fulltextTerm) {
splitTerm[i] = "+" + t + "*"
}
fulltextTerm = strings.Join(splitTerm, " ")
fulltextClause = fmt.Sprintf("MATCH(%s) AGAINST (:FulltextTerm IN BOOLEAN MODE)", searchColumns)
}
return
}
func (s SqlChannelStoreExperimental) performSearch(searchQuery string, term string, parameters map[string]interface{}) store.StoreResult {
if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
return s.SqlChannelStore.performSearch(searchQuery, term, parameters)
}
result := store.StoreResult{}
likeClause, likeTerm := s.buildLIKEClause(term)
if likeTerm == "" {
// If the likeTerm is empty after preparing, then don't bother searching.
searchQuery = strings.Replace(searchQuery, "SEARCH_CLAUSE", "", 1)
} else {
parameters["LikeTerm"] = likeTerm
fulltextClause, fulltextTerm := s.buildFulltextClause(term)
parameters["FulltextTerm"] = fulltextTerm
searchQuery = strings.Replace(searchQuery, "SEARCH_CLAUSE", "AND ("+likeClause+" OR "+fulltextClause+")", 1)
}
var channels model.ChannelList
if _, err := s.GetReplica().Select(&channels, searchQuery, parameters); err != nil {
result.Err = model.NewAppError("SqlChannelStore.Search", "store.sql_channel.search.app_error", nil, "term="+term+", "+", "+err.Error(), http.StatusInternalServerError)
return result
}
result.Data = &channels
return result
}
func (s SqlChannelStoreExperimental) EnableExperimentalPublicChannelsMaterialization() {
if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
mlog.Info("Enabling experimental public channels materialization")
}
atomic.StoreUint32(s.experimentalPublicChannelsMaterializationDisabled, 0)
}
func (s SqlChannelStoreExperimental) DisableExperimentalPublicChannelsMaterialization() {
if s.IsExperimentalPublicChannelsMaterializationEnabled() {
mlog.Info("Disabling experimental public channels materialization")
}
atomic.StoreUint32(s.experimentalPublicChannelsMaterializationDisabled, 1)
}
func (s SqlChannelStoreExperimental) IsExperimentalPublicChannelsMaterializationEnabled() bool {
return atomic.LoadUint32(s.experimentalPublicChannelsMaterializationDisabled) == 0
}

View file

@ -14,7 +14,7 @@ import (
)
func TestChannelStore(t *testing.T) {
StoreTest(t, storetest.TestChannelStore)
StoreTestWithSqlSupplier(t, storetest.TestChannelStore)
}
func TestChannelStoreInternalDataTypes(t *testing.T) {

View file

@ -51,6 +51,7 @@ type SqlStore interface {
MarkSystemRanUnitTests()
DoesTableExist(tablename string) bool
DoesColumnExist(tableName string, columName string) bool
DoesTriggerExist(triggerName string) bool
CreateColumnIfNotExists(tableName string, columnName string, mySqlColType string, postgresColType string, defaultValue string) bool
CreateColumnIfNotExistsNoDefault(tableName string, columnName string, mySqlColType string, postgresColType string) bool
RemoveColumnIfExists(tableName string, columnName string) bool

View file

@ -16,10 +16,11 @@ import (
)
var storeTypes = []*struct {
Name string
Func func() (*storetest.RunningContainer, *model.SqlSettings, error)
Container *storetest.RunningContainer
Store store.Store
Name string
Func func() (*storetest.RunningContainer, *model.SqlSettings, error)
Container *storetest.RunningContainer
SqlSupplier *SqlSupplier
Store store.Store
}{
{
Name: "MySQL",
@ -44,6 +45,19 @@ func StoreTest(t *testing.T, f func(*testing.T, store.Store)) {
}
}
func StoreTestWithSqlSupplier(t *testing.T, f func(*testing.T, store.Store, storetest.SqlSupplier)) {
defer func() {
if err := recover(); err != nil {
tearDownStores()
panic(err)
}
}()
for _, st := range storeTypes {
st := st
t.Run(st.Name, func(t *testing.T) { f(t, st.Store, st.SqlSupplier) })
}
}
func initStores() {
defer func() {
if err := recover(); err != nil {
@ -64,7 +78,8 @@ func initStores() {
return
}
st.Container = container
st.Store = store.NewLayeredStore(NewSqlSupplier(*settings, nil), nil, nil)
st.SqlSupplier = NewSqlSupplier(*settings, nil)
st.Store = store.NewLayeredStore(st.SqlSupplier, nil, nil)
st.Store.MarkSystemRanUnitTests()
}()
}

View file

@ -33,6 +33,7 @@ const (
)
const (
EXIT_GENERIC_FAILURE = 1
EXIT_CREATE_TABLE = 100
EXIT_DB_OPEN = 101
EXIT_PING = 102
@ -116,8 +117,13 @@ func NewSqlSupplier(settings model.SqlSettings, metrics einterfaces.MetricsInter
supplier.initConnection()
enableExperimentalPublicChannelsMaterialization := true
if settings.EnablePublicChannelsMaterialization != nil && !*settings.EnablePublicChannelsMaterialization {
enableExperimentalPublicChannelsMaterialization = false
}
supplier.oldStores.team = NewSqlTeamStore(supplier)
supplier.oldStores.channel = NewSqlChannelStore(supplier, metrics)
supplier.oldStores.channel = NewSqlChannelStoreExperimental(supplier, metrics, enableExperimentalPublicChannelsMaterialization)
supplier.oldStores.post = NewSqlPostStore(supplier, metrics)
supplier.oldStores.user = NewSqlUserStore(supplier, metrics)
supplier.oldStores.audit = NewSqlAuditStore(supplier)
@ -151,10 +157,19 @@ func NewSqlSupplier(settings model.SqlSettings, metrics einterfaces.MetricsInter
os.Exit(EXIT_CREATE_TABLE)
}
// This store's triggers should exist before the migration is run to ensure the
// corresponding tables stay in sync. Whether or not a trigger should be created before
// or after a migration is likely to be decided on a case-by-case basis.
if err := supplier.oldStores.channel.(*SqlChannelStoreExperimental).CreateTriggersIfNotExists(); err != nil {
mlog.Critical("Error creating triggers", mlog.Err(err))
time.Sleep(time.Second)
os.Exit(EXIT_GENERIC_FAILURE)
}
UpgradeDatabase(supplier)
supplier.oldStores.team.(*SqlTeamStore).CreateIndexesIfNotExists()
supplier.oldStores.channel.(*SqlChannelStore).CreateIndexesIfNotExists()
supplier.oldStores.channel.(*SqlChannelStoreExperimental).CreateIndexesIfNotExists()
supplier.oldStores.post.(*SqlPostStore).CreateIndexesIfNotExists()
supplier.oldStores.user.(*SqlUserStore).CreateIndexesIfNotExists()
supplier.oldStores.audit.(*SqlAuditStore).CreateIndexesIfNotExists()
@ -461,6 +476,52 @@ func (ss *SqlSupplier) DoesColumnExist(tableName string, columnName string) bool
}
}
func (ss *SqlSupplier) DoesTriggerExist(triggerName string) bool {
if ss.DriverName() == model.DATABASE_DRIVER_POSTGRES {
count, err := ss.GetMaster().SelectInt(`
SELECT
COUNT(0)
FROM
pg_trigger
WHERE
tgname = $1
`, triggerName)
if err != nil {
mlog.Critical(fmt.Sprintf("Failed to check if trigger exists %v", err))
time.Sleep(time.Second)
os.Exit(EXIT_GENERIC_FAILURE)
}
return count > 0
} else if ss.DriverName() == model.DATABASE_DRIVER_MYSQL {
count, err := ss.GetMaster().SelectInt(`
SELECT
COUNT(0)
FROM
information_schema.triggers
WHERE
trigger_schema = DATABASE()
AND trigger_name = ?
`, triggerName)
if err != nil {
mlog.Critical(fmt.Sprintf("Failed to check if trigger exists %v", err))
time.Sleep(time.Second)
os.Exit(EXIT_GENERIC_FAILURE)
}
return count > 0
} else {
mlog.Critical("Failed to check if column exists because of missing driver")
time.Sleep(time.Second)
os.Exit(EXIT_GENERIC_FAILURE)
return false
}
}
func (ss *SqlSupplier) CreateColumnIfNotExists(tableName string, columnName string, mySqlColType string, postgresColType string, defaultValue string) bool {
if ss.DoesColumnExist(tableName, columnName) {

View file

@ -489,7 +489,6 @@ func UpgradeDatabaseToVersion53(sqlStore SqlStore) {
if shouldPerformUpgrade(sqlStore, VERSION_5_2_0, VERSION_5_3_0) {
saveSchemaVersion(sqlStore, VERSION_5_3_0)
}
}
func UpgradeDatabaseToVersion54(sqlStore SqlStore) {
@ -497,6 +496,11 @@ func UpgradeDatabaseToVersion54(sqlStore SqlStore) {
// if shouldPerformUpgrade(sqlStore, VERSION_5_3_0, VERSION_5_4_0) {
sqlStore.AlterColumnTypeIfExists("OutgoingWebhooks", "Description", "varchar(500)", "varchar(500)")
sqlStore.AlterColumnTypeIfExists("IncomingWebhooks", "Description", "varchar(500)", "varchar(500)")
if err := sqlStore.Channel().MigratePublicChannels(); err != nil {
mlog.Critical("Failed to migrate PublicChannels table", mlog.Err(err))
time.Sleep(time.Second)
os.Exit(EXIT_GENERIC_FAILURE)
}
// saveSchemaVersion(sqlStore, VERSION_5_4_0)
// }
}

View file

@ -174,6 +174,11 @@ type ChannelStore interface {
ResetAllChannelSchemes() StoreChannel
ClearAllCustomRoleAssignments() StoreChannel
ResetLastPostAt() StoreChannel
MigratePublicChannels() error
DropPublicChannels() error
EnableExperimentalPublicChannelsMaterialization()
DisableExperimentalPublicChannelsMaterialization()
IsExperimentalPublicChannelsMaterializationEnabled() bool
}
type ChannelMemberHistoryStore interface {

File diff suppressed because it is too large Load diff

View file

@ -130,6 +130,30 @@ func (_m *ChannelStore) Delete(channelId string, time int64) store.StoreChannel
return r0
}
// DisableExperimentalPublicChannelsMaterialization provides a mock function with given fields:
func (_m *ChannelStore) DisableExperimentalPublicChannelsMaterialization() {
_m.Called()
}
// DropPublicChannels provides a mock function with given fields:
func (_m *ChannelStore) DropPublicChannels() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// EnableExperimentalPublicChannelsMaterialization provides a mock function with given fields:
func (_m *ChannelStore) EnableExperimentalPublicChannelsMaterialization() {
_m.Called()
}
// Get provides a mock function with given fields: id, allowFromCache
func (_m *ChannelStore) Get(id string, allowFromCache bool) store.StoreChannel {
ret := _m.Called(id, allowFromCache)
@ -601,6 +625,20 @@ func (_m *ChannelStore) InvalidateMemberCount(channelId string) {
_m.Called(channelId)
}
// IsExperimentalPublicChannelsMaterializationEnabled provides a mock function with given fields:
func (_m *ChannelStore) IsExperimentalPublicChannelsMaterializationEnabled() bool {
ret := _m.Called()
var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// IsUserInChannelUseCache provides a mock function with given fields: userId, channelId
func (_m *ChannelStore) IsUserInChannelUseCache(userId string, channelId string) bool {
ret := _m.Called(userId, channelId)
@ -631,6 +669,20 @@ func (_m *ChannelStore) MigrateChannelMembers(fromChannelId string, fromUserId s
return r0
}
// MigratePublicChannels provides a mock function with given fields:
func (_m *ChannelStore) MigratePublicChannels() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// PermanentDelete provides a mock function with given fields: channelId
func (_m *ChannelStore) PermanentDelete(channelId string) store.StoreChannel {
ret := _m.Called(channelId)

View file

@ -241,6 +241,20 @@ func (_m *SqlStore) DoesTableExist(tablename string) bool {
return r0
}
// DoesTriggerExist provides a mock function with given fields: triggerName
func (_m *SqlStore) DoesTriggerExist(triggerName string) bool {
ret := _m.Called(triggerName)
var r0 bool
if rf, ok := ret.Get(0).(func(string) bool); ok {
r0 = rf(triggerName)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// DriverName provides a mock function with given fields:
func (_m *SqlStore) DriverName() string {
ret := _m.Called()

View file

@ -0,0 +1,29 @@
// Code generated by mockery v1.0.0. DO NOT EDIT.
// Regenerate this file using `make store-mocks`.
package mocks
import gorp "github.com/mattermost/gorp"
import mock "github.com/stretchr/testify/mock"
// SqlSupplier is an autogenerated mock type for the SqlSupplier type
type SqlSupplier struct {
mock.Mock
}
// GetMaster provides a mock function with given fields:
func (_m *SqlSupplier) GetMaster() *gorp.DbMap {
ret := _m.Called()
var r0 *gorp.DbMap
if rf, ok := ret.Get(0).(func() *gorp.DbMap); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*gorp.DbMap)
}
}
return r0
}