diff --git a/api4/user.go b/api4/user.go index b827857a776..fe9d331c33e 100644 --- a/api4/user.go +++ b/api4/user.go @@ -845,7 +845,7 @@ func updateUserActive(c *Context, w http.ResponseWriter, r *http.Request) { c.LogAuditWithUserId(user.Id, fmt.Sprintf("active=%v", active)) if isSelfDeactive { - c.App.Go(func() { + c.App.Srv.Go(func() { if err = c.App.SendDeactivateAccountEmail(user.Email, user.Locale, c.App.GetSiteURL()); err != nil { mlog.Error(err.Error()) } diff --git a/app/admin.go b/app/admin.go index 6055803a5bb..ba4bd2581af 100644 --- a/app/admin.go +++ b/app/admin.go @@ -139,7 +139,7 @@ func (a *App) InvalidateAllCaches() *model.AppError { func (a *App) InvalidateAllCachesSkipSend() { mlog.Info("Purging all caches") - a.sessionCache.Purge() + a.Srv.sessionCache.Purge() ClearStatusCache() a.Srv.Store.Channel().ClearCaches() a.Srv.Store.User().ClearCaches() @@ -212,8 +212,8 @@ func (a *App) RecycleDatabaseConnection() { oldStore := a.Srv.Store mlog.Warn("Attempting to recycle the database connection.") - a.Srv.Store = a.newStore() - a.Jobs.Store = a.Srv.Store + a.Srv.Store = a.Srv.newStore() + a.Srv.Jobs.Store = a.Srv.Store if a.Srv.Store != oldStore { time.Sleep(20 * time.Second) diff --git a/app/app.go b/app/app.go index e3a919bba42..f14f915e943 100644 --- a/app/app.go +++ b/app/app.go @@ -4,19 +4,15 @@ package app import ( - "crypto/ecdsa" "fmt" "html/template" "net/http" "path" "reflect" "strconv" - "sync" - "sync/atomic" "github.com/gorilla/mux" "github.com/pkg/errors" - "github.com/throttled/throttled" "github.com/mattermost/mattermost-server/einterfaces" ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs" @@ -24,7 +20,6 @@ import ( tjobs "github.com/mattermost/mattermost-server/jobs/interfaces" "github.com/mattermost/mattermost-server/mlog" "github.com/mattermost/mattermost-server/model" - "github.com/mattermost/mattermost-server/plugin" "github.com/mattermost/mattermost-server/services/httpservice" "github.com/mattermost/mattermost-server/store" "github.com/mattermost/mattermost-server/store/sqlstore" @@ -35,26 +30,10 @@ const ADVANCED_PERMISSIONS_MIGRATION_KEY = "AdvancedPermissionsMigrationComplete const EMOJIS_PERMISSIONS_MIGRATION_KEY = "EmojisPermissionsMigrationComplete" type App struct { - goroutineCount int32 - goroutineExitSignal chan struct{} - Srv *Server Log *mlog.Logger - Plugins *plugin.Environment - PluginConfigListenerId string - - EmailBatching *EmailBatchingJob - EmailRateLimiter *throttled.GCRARateLimiter - - Hubs []*Hub - HubsStopCheckingForDeadlock chan bool - - PushNotificationsHub PushNotificationsHub - - Jobs *jobs.JobServer - AccountMigration einterfaces.AccountMigrationInterface Cluster einterfaces.ClusterInterface Compliance einterfaces.ComplianceInterface @@ -66,42 +45,6 @@ type App struct { Mfa einterfaces.MfaInterface Saml einterfaces.SamlInterface - config atomic.Value - envConfig map[string]interface{} - configFile string - configListeners map[string]func(*model.Config, *model.Config) - clusterLeaderListeners sync.Map - - licenseValue atomic.Value - clientLicenseValue atomic.Value - licenseListeners map[string]func() - - timezones atomic.Value - - siteURL string - - newStore func() store.Store - - htmlTemplateWatcher *utils.HTMLTemplateWatcher - sessionCache *utils.Cache - configListenerId string - licenseListenerId string - logListenerId string - clusterLeaderListenerId string - disableConfigWatch bool - configWatcher *utils.ConfigWatcher - asymmetricSigningKey *ecdsa.PrivateKey - - pluginCommands []*PluginCommand - pluginCommandsLock sync.RWMutex - - clientConfig map[string]string - clientConfigHash string - limitedClientConfig map[string]string - diagnosticId string - - phase2PermissionsMigrationComplete bool - HTTPService httpservice.HTTPService } @@ -118,15 +61,15 @@ func New(options ...Option) (outApp *App, outErr error) { rootRouter := mux.NewRouter() app := &App{ - goroutineExitSignal: make(chan struct{}, 1), Srv: &Server{ - RootRouter: rootRouter, + goroutineExitSignal: make(chan struct{}, 1), + RootRouter: rootRouter, + configFile: "config.json", + configListeners: make(map[string]func(*model.Config, *model.Config)), + licenseListeners: map[string]func(){}, + sessionCache: utils.NewLru(model.SESSION_CACHE_SIZE), + clientConfig: make(map[string]string), }, - sessionCache: utils.NewLru(model.SESSION_CACHE_SIZE), - configFile: "config.json", - configListeners: make(map[string]func(*model.Config, *model.Config)), - clientConfig: make(map[string]string), - licenseListeners: map[string]func(){}, } app.HTTPService = httpservice.MakeHTTPService(app) @@ -151,7 +94,7 @@ func New(options ...Option) (outApp *App, outErr error) { } model.AppErrorInit(utils.T) - if err := app.LoadConfig(app.configFile); err != nil { + if err := app.LoadConfig(app.Srv.configFile); err != nil { return nil, err } @@ -164,7 +107,7 @@ func New(options ...Option) (outApp *App, outErr error) { // Use this app logger as the global logger (eventually remove all instances of global logging) mlog.InitGlobalLogger(app.Log) - app.logListenerId = app.AddConfigListener(func(_, after *model.Config) { + app.Srv.logListenerId = app.AddConfigListener(func(_, after *model.Config) { app.Log.ChangeLevels(utils.MloggerConfigFromLoggerConfig(&after.LogSettings)) }) @@ -176,22 +119,22 @@ func New(options ...Option) (outApp *App, outErr error) { return nil, errors.Wrapf(err, "unable to load Mattermost translation files") } - app.configListenerId = app.AddConfigListener(func(_, _ *model.Config) { + app.Srv.configListenerId = app.AddConfigListener(func(_, _ *model.Config) { app.configOrLicenseListener() message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CONFIG_CHANGED, "", "", "", nil) message.Add("config", app.ClientConfigWithComputed()) - app.Go(func() { + app.Srv.Go(func() { app.Publish(message) }) }) - app.licenseListenerId = app.AddLicenseListener(func() { + app.Srv.licenseListenerId = app.AddLicenseListener(func() { app.configOrLicenseListener() message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_LICENSE_CHANGED, "", "", "", nil) message.Add("license", app.GetSanitizedClientLicense()) - app.Go(func() { + app.Srv.Go(func() { app.Publish(message) }) @@ -205,8 +148,8 @@ func New(options ...Option) (outApp *App, outErr error) { app.initEnterprise() - if app.newStore == nil { - app.newStore = func() store.Store { + if app.Srv.newStore == nil { + app.Srv.newStore = func() store.Store { return store.NewLayeredStore(sqlstore.NewSqlSupplier(app.Config().SqlSettings, app.Metrics), app.Metrics, app.Cluster) } } @@ -214,10 +157,10 @@ func New(options ...Option) (outApp *App, outErr error) { if htmlTemplateWatcher, err := utils.NewHTMLTemplateWatcher("templates"); err != nil { mlog.Error(fmt.Sprintf("Failed to parse server templates %v", err)) } else { - app.htmlTemplateWatcher = htmlTemplateWatcher + app.Srv.htmlTemplateWatcher = htmlTemplateWatcher } - app.Srv.Store = app.newStore() + app.Srv.Store = app.Srv.newStore() if err := app.ensureAsymmetricSigningKey(); err != nil { return nil, errors.Wrapf(err, "unable to ensure asymmetric signing key") @@ -235,9 +178,9 @@ func New(options ...Option) (outApp *App, outErr error) { app.initJobs() }) - app.clusterLeaderListenerId = app.AddClusterLeaderChangedListener(func() { + app.Srv.clusterLeaderListenerId = app.AddClusterLeaderChangedListener(func() { mlog.Info("Cluster leader changed. Determining if job schedulers should be running:", mlog.Bool("isLeader", app.IsLeader())) - app.Jobs.Schedulers.HandleClusterLeaderChange(app.IsLeader()) + app.Srv.Jobs.Schedulers.HandleClusterLeaderChange(app.IsLeader()) }) subpath, err := utils.GetSubpathFromConfig(app.Config()) @@ -279,26 +222,26 @@ func (a *App) Shutdown() { a.StopPushNotificationsHubWorkers() a.ShutDownPlugins() - a.WaitForGoroutines() + a.Srv.WaitForGoroutines() if a.Srv.Store != nil { a.Srv.Store.Close() } - a.Srv = nil - if a.htmlTemplateWatcher != nil { - a.htmlTemplateWatcher.Close() + if a.Srv.htmlTemplateWatcher != nil { + a.Srv.htmlTemplateWatcher.Close() } - a.RemoveConfigListener(a.configListenerId) - a.RemoveLicenseListener(a.licenseListenerId) - a.RemoveConfigListener(a.logListenerId) - a.RemoveClusterLeaderChangedListener(a.clusterLeaderListenerId) + a.RemoveConfigListener(a.Srv.configListenerId) + a.RemoveLicenseListener(a.Srv.licenseListenerId) + a.RemoveConfigListener(a.Srv.logListenerId) + a.RemoveClusterLeaderChangedListener(a.Srv.clusterLeaderListenerId) mlog.Info("Server stopped") a.DisableConfigWatch() a.HTTPService.Close() + a.Srv = nil } var accountMigrationInterface func(*App) einterfaces.AccountMigrationInterface @@ -439,39 +382,39 @@ func (a *App) initEnterprise() { } func (a *App) initJobs() { - a.Jobs = jobs.NewJobServer(a, a.Srv.Store) + a.Srv.Jobs = jobs.NewJobServer(a, a.Srv.Store) if jobsDataRetentionJobInterface != nil { - a.Jobs.DataRetentionJob = jobsDataRetentionJobInterface(a) + a.Srv.Jobs.DataRetentionJob = jobsDataRetentionJobInterface(a) } if jobsMessageExportJobInterface != nil { - a.Jobs.MessageExportJob = jobsMessageExportJobInterface(a) + a.Srv.Jobs.MessageExportJob = jobsMessageExportJobInterface(a) } if jobsElasticsearchAggregatorInterface != nil { - a.Jobs.ElasticsearchAggregator = jobsElasticsearchAggregatorInterface(a) + a.Srv.Jobs.ElasticsearchAggregator = jobsElasticsearchAggregatorInterface(a) } if jobsElasticsearchIndexerInterface != nil { - a.Jobs.ElasticsearchIndexer = jobsElasticsearchIndexerInterface(a) + a.Srv.Jobs.ElasticsearchIndexer = jobsElasticsearchIndexerInterface(a) } if jobsLdapSyncInterface != nil { - a.Jobs.LdapSync = jobsLdapSyncInterface(a) + a.Srv.Jobs.LdapSync = jobsLdapSyncInterface(a) } if jobsMigrationsInterface != nil { - a.Jobs.Migrations = jobsMigrationsInterface(a) + a.Srv.Jobs.Migrations = jobsMigrationsInterface(a) } - a.Jobs.Workers = a.Jobs.InitWorkers() - a.Jobs.Schedulers = a.Jobs.InitSchedulers() + a.Srv.Jobs.Workers = a.Srv.Jobs.InitWorkers() + a.Srv.Jobs.Schedulers = a.Srv.Jobs.InitSchedulers() } func (a *App) DiagnosticId() string { - return a.diagnosticId + return a.Srv.diagnosticId } func (a *App) SetDiagnosticId(id string) { - a.diagnosticId = id + a.Srv.diagnosticId = id } func (a *App) EnsureDiagnosticId() { - if a.diagnosticId != "" { + if a.Srv.diagnosticId != "" { return } if result := <-a.Srv.Store.System().Get(); result.Err == nil { @@ -484,36 +427,13 @@ func (a *App) EnsureDiagnosticId() { <-a.Srv.Store.System().Save(systemId) } - a.diagnosticId = id - } -} - -// Go creates a goroutine, but maintains a record of it to ensure that execution completes before -// the app is destroyed. -func (a *App) Go(f func()) { - atomic.AddInt32(&a.goroutineCount, 1) - - go func() { - f() - - atomic.AddInt32(&a.goroutineCount, -1) - select { - case a.goroutineExitSignal <- struct{}{}: - default: - } - }() -} - -// WaitForGoroutines blocks until all goroutines created by App.Go exit. -func (a *App) WaitForGoroutines() { - for atomic.LoadInt32(&a.goroutineCount) != 0 { - <-a.goroutineExitSignal + a.Srv.diagnosticId = id } } func (a *App) HTMLTemplates() *template.Template { - if a.htmlTemplateWatcher != nil { - return a.htmlTemplateWatcher.Templates() + if a.Srv.htmlTemplateWatcher != nil { + return a.Srv.htmlTemplateWatcher.Templates() } return nil @@ -596,7 +516,7 @@ func (a *App) SetPhase2PermissionsMigrationStatus(isComplete bool) error { return res.Err } } - a.phase2PermissionsMigrationComplete = isComplete + a.Srv.phase2PermissionsMigrationComplete = isComplete return nil } @@ -670,7 +590,7 @@ func (a *App) DoEmojisPermissionsMigration() { } func (a *App) StartElasticsearch() { - a.Go(func() { + a.Srv.Go(func() { if err := a.Elasticsearch.Start(); err != nil { mlog.Error(err.Error()) } @@ -678,19 +598,19 @@ func (a *App) StartElasticsearch() { a.AddConfigListener(func(oldConfig *model.Config, newConfig *model.Config) { if !*oldConfig.ElasticsearchSettings.EnableIndexing && *newConfig.ElasticsearchSettings.EnableIndexing { - a.Go(func() { + a.Srv.Go(func() { if err := a.Elasticsearch.Start(); err != nil { mlog.Error(err.Error()) } }) } else if *oldConfig.ElasticsearchSettings.EnableIndexing && !*newConfig.ElasticsearchSettings.EnableIndexing { - a.Go(func() { + a.Srv.Go(func() { if err := a.Elasticsearch.Stop(); err != nil { mlog.Error(err.Error()) } }) } else if *oldConfig.ElasticsearchSettings.Password != *newConfig.ElasticsearchSettings.Password || *oldConfig.ElasticsearchSettings.Username != *newConfig.ElasticsearchSettings.Username || *oldConfig.ElasticsearchSettings.ConnectionUrl != *newConfig.ElasticsearchSettings.ConnectionUrl || *oldConfig.ElasticsearchSettings.Sniff != *newConfig.ElasticsearchSettings.Sniff { - a.Go(func() { + a.Srv.Go(func() { if *oldConfig.ElasticsearchSettings.EnableIndexing { if err := a.Elasticsearch.Stop(); err != nil { mlog.Error(err.Error()) @@ -705,13 +625,13 @@ func (a *App) StartElasticsearch() { a.AddLicenseListener(func() { if a.License() != nil { - a.Go(func() { + a.Srv.Go(func() { if err := a.Elasticsearch.Start(); err != nil { mlog.Error(err.Error()) } }) } else { - a.Go(func() { + a.Srv.Go(func() { if err := a.Elasticsearch.Stop(); err != nil { mlog.Error(err.Error()) } diff --git a/app/app_test.go b/app/app_test.go index de9378d1a4b..00929401983 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -246,8 +246,12 @@ func TestDoAdvancedPermissionsMigration(t *testing.T) { restrictPrivateChannel := *th.App.Config().TeamSettings.DEPRECATED_DO_NOT_USE_RestrictPrivateChannelManagement defer func() { - th.App.UpdateConfig(func(cfg *model.Config) { *cfg.TeamSettings.DEPRECATED_DO_NOT_USE_RestrictPublicChannelManagement = restrictPublicChannel }) - th.App.UpdateConfig(func(cfg *model.Config) { *cfg.TeamSettings.DEPRECATED_DO_NOT_USE_RestrictPrivateChannelManagement = restrictPrivateChannel }) + th.App.UpdateConfig(func(cfg *model.Config) { + *cfg.TeamSettings.DEPRECATED_DO_NOT_USE_RestrictPublicChannelManagement = restrictPublicChannel + }) + th.App.UpdateConfig(func(cfg *model.Config) { + *cfg.TeamSettings.DEPRECATED_DO_NOT_USE_RestrictPrivateChannelManagement = restrictPrivateChannel + }) }() th.App.UpdateConfig(func(cfg *model.Config) { @@ -433,8 +437,8 @@ func TestDoAdvancedPermissionsMigration(t *testing.T) { postEditTimeLimit := *th.App.Config().ServiceSettings.PostEditTimeLimit defer func() { - th.App.UpdateConfig(func(cfg *model.Config) { *cfg.ServiceSettings.DEPRECATED_DO_NOT_USE_AllowEditPost = allowEditPost}) - th.App.UpdateConfig(func(cfg *model.Config) { *cfg.ServiceSettings.PostEditTimeLimit = postEditTimeLimit}) + th.App.UpdateConfig(func(cfg *model.Config) { *cfg.ServiceSettings.DEPRECATED_DO_NOT_USE_AllowEditPost = allowEditPost }) + th.App.UpdateConfig(func(cfg *model.Config) { *cfg.ServiceSettings.PostEditTimeLimit = postEditTimeLimit }) }() th.App.UpdateConfig(func(cfg *model.Config) { diff --git a/app/channel.go b/app/channel.go index dee856b94d6..14cd74d87bb 100644 --- a/app/channel.go +++ b/app/channel.go @@ -212,9 +212,9 @@ func (a *App) CreateChannel(channel *model.Channel, addMember bool) (*model.Chan } if a.PluginsReady() { - a.Go(func() { + a.Srv.Go(func() { pluginContext := &plugin.Context{} - a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { + a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { hooks.ChannelHasBeenCreated(pluginContext, sc) return true }, plugin.ChannelHasBeenCreatedId) @@ -239,9 +239,9 @@ func (a *App) CreateDirectChannel(userId string, otherUserId string) (*model.Cha a.InvalidateCacheForUser(otherUserId) if a.PluginsReady() { - a.Go(func() { + a.Srv.Go(func() { pluginContext := &plugin.Context{} - a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { + a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { hooks.ChannelHasBeenCreated(pluginContext, channel) return true }, plugin.ChannelHasBeenCreatedId) @@ -854,9 +854,9 @@ func (a *App) AddChannelMember(userId string, channel *model.Channel, userReques } if a.PluginsReady() { - a.Go(func() { + a.Srv.Go(func() { pluginContext := &plugin.Context{} - a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { + a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { hooks.UserHasJoinedChannel(pluginContext, cm, userRequestor) return true }, plugin.UserHasJoinedChannelId) @@ -866,7 +866,7 @@ func (a *App) AddChannelMember(userId string, channel *model.Channel, userReques if userRequestorId == "" || userId == userRequestorId { a.postJoinChannelMessage(user, channel) } else { - a.Go(func() { + a.Srv.Go(func() { a.PostAddToChannelMessage(userRequestor, user, channel, postRootId) }) } @@ -1244,9 +1244,9 @@ func (a *App) JoinChannel(channel *model.Channel, userId string) *model.AppError } if a.PluginsReady() { - a.Go(func() { + a.Srv.Go(func() { pluginContext := &plugin.Context{} - a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { + a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { hooks.UserHasJoinedChannel(pluginContext, cm, nil) return true }, plugin.UserHasJoinedChannelId) @@ -1336,7 +1336,7 @@ func (a *App) LeaveChannel(channelId string, userId string) *model.AppError { return nil } - a.Go(func() { + a.Srv.Go(func() { a.postLeaveChannelMessage(user, channel) }) @@ -1451,9 +1451,9 @@ func (a *App) removeUserFromChannel(userIdToRemove string, removerUserId string, actorUser, _ = a.GetUser(removerUserId) } - a.Go(func() { + a.Srv.Go(func() { pluginContext := &plugin.Context{} - a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { + a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { hooks.UserHasLeftChannel(pluginContext, cm, actorUser) return true }, plugin.UserHasLeftChannelId) @@ -1489,7 +1489,7 @@ func (a *App) RemoveUserFromChannel(userIdToRemove string, removerUserId string, if userIdToRemove == removerUserId { a.postLeaveChannelMessage(user, channel) } else { - a.Go(func() { + a.Srv.Go(func() { a.postRemoveFromChannelMessage(removerUserId, user, channel) }) } diff --git a/app/cluster.go b/app/cluster.go index 22b9843b8c1..e166521f16c 100644 --- a/app/cluster.go +++ b/app/cluster.go @@ -13,19 +13,19 @@ import ( // be called. func (a *App) AddClusterLeaderChangedListener(listener func()) string { id := model.NewId() - a.clusterLeaderListeners.Store(id, listener) + a.Srv.clusterLeaderListeners.Store(id, listener) return id } // Removes a listener function by the unique ID returned when AddConfigListener was called func (a *App) RemoveClusterLeaderChangedListener(id string) { - a.clusterLeaderListeners.Delete(id) + a.Srv.clusterLeaderListeners.Delete(id) } func (a *App) InvokeClusterLeaderChangedListeners() { mlog.Info("Cluster leader changed. Invoking ClusterLeaderChanged listeners.") - a.Go(func() { - a.clusterLeaderListeners.Range(func(_, listener interface{}) bool { + a.Srv.Go(func() { + a.Srv.clusterLeaderListeners.Range(func(_, listener interface{}) bool { listener.(func())() return true }) diff --git a/app/command_echo.go b/app/command_echo.go index f0851964bce..14b6e11b9f8 100644 --- a/app/command_echo.go +++ b/app/command_echo.go @@ -78,7 +78,7 @@ func (me *EchoProvider) DoCommand(a *App, args *model.CommandArgs, message strin } echoSem <- true - a.Go(func() { + a.Srv.Go(func() { defer func() { <-echoSem }() post := &model.Post{} post.ChannelId = args.ChannelId diff --git a/app/compliance.go b/app/compliance.go index d46e75b245c..2d8dd2b517f 100644 --- a/app/compliance.go +++ b/app/compliance.go @@ -36,7 +36,7 @@ func (a *App) SaveComplianceReport(job *model.Compliance) (*model.Compliance, *m } job = result.Data.(*model.Compliance) - a.Go(func() { + a.Srv.Go(func() { a.Compliance.RunComplianceJob(job) }) diff --git a/app/config.go b/app/config.go index 34f4dc7519e..450e205268a 100644 --- a/app/config.go +++ b/app/config.go @@ -28,15 +28,15 @@ const ( ) func (a *App) Config() *model.Config { - if cfg := a.config.Load(); cfg != nil { + if cfg := a.Srv.config.Load(); cfg != nil { return cfg.(*model.Config) } return &model.Config{} } func (a *App) EnvironmentConfig() map[string]interface{} { - if a.envConfig != nil { - return a.envConfig + if a.Srv.envConfig != nil { + return a.Srv.envConfig } return map[string]interface{}{} } @@ -45,7 +45,7 @@ func (a *App) UpdateConfig(f func(*model.Config)) { old := a.Config() updated := old.Clone() f(updated) - a.config.Store(updated) + a.Srv.config.Store(updated) a.InvokeConfigListeners(old, updated) } @@ -62,11 +62,10 @@ func (a *App) LoadConfig(configFile string) *model.AppError { return err } *cfg.ServiceSettings.SiteURL = strings.TrimRight(*cfg.ServiceSettings.SiteURL, "/") - a.config.Store(cfg) + a.Srv.config.Store(cfg) - a.configFile = configPath - a.envConfig = envConfig - a.siteURL = *cfg.ServiceSettings.SiteURL + a.Srv.configFile = configPath + a.Srv.envConfig = envConfig a.InvokeConfigListeners(old, cfg) return nil @@ -74,7 +73,7 @@ func (a *App) LoadConfig(configFile string) *model.AppError { func (a *App) ReloadConfig() *model.AppError { debug.FreeOSMemory() - if err := a.LoadConfig(a.configFile); err != nil { + if err := a.LoadConfig(a.Srv.configFile); err != nil { return err } @@ -84,37 +83,37 @@ func (a *App) ReloadConfig() *model.AppError { } func (a *App) ConfigFileName() string { - return a.configFile + return a.Srv.configFile } func (a *App) ClientConfig() map[string]string { - return a.clientConfig + return a.Srv.clientConfig } func (a *App) ClientConfigHash() string { - return a.clientConfigHash + return a.Srv.clientConfigHash } func (a *App) LimitedClientConfig() map[string]string { - return a.limitedClientConfig + return a.Srv.limitedClientConfig } func (a *App) EnableConfigWatch() { - if a.configWatcher == nil && !a.disableConfigWatch { + if a.Srv.configWatcher == nil && !a.Srv.disableConfigWatch { configWatcher, err := utils.NewConfigWatcher(a.ConfigFileName(), func() { a.ReloadConfig() }) if err != nil { mlog.Error(fmt.Sprint(err)) } - a.configWatcher = configWatcher + a.Srv.configWatcher = configWatcher } } func (a *App) DisableConfigWatch() { - if a.configWatcher != nil { - a.configWatcher.Close() - a.configWatcher = nil + if a.Srv.configWatcher != nil { + a.Srv.configWatcher.Close() + a.Srv.configWatcher = nil } } @@ -123,17 +122,17 @@ func (a *App) DisableConfigWatch() { // for the listener that can later be used to remove it. func (a *App) AddConfigListener(listener func(*model.Config, *model.Config)) string { id := model.NewId() - a.configListeners[id] = listener + a.Srv.configListeners[id] = listener return id } // Removes a listener function by the unique ID returned when AddConfigListener was called func (a *App) RemoveConfigListener(id string) { - delete(a.configListeners, id) + delete(a.Srv.configListeners, id) } func (a *App) InvokeConfigListeners(old, current *model.Config) { - for _, listener := range a.configListeners { + for _, listener := range a.Srv.configListeners { listener(old, current) } } @@ -141,7 +140,7 @@ func (a *App) InvokeConfigListeners(old, current *model.Config) { // EnsureAsymmetricSigningKey ensures that an asymmetric signing key exists and future calls to // AsymmetricSigningKey will always return a valid signing key. func (a *App) ensureAsymmetricSigningKey() error { - if a.asymmetricSigningKey != nil { + if a.Srv.asymmetricSigningKey != nil { return nil } @@ -202,7 +201,7 @@ func (a *App) ensureAsymmetricSigningKey() error { default: return fmt.Errorf("unknown curve: " + key.ECDSAKey.Curve) } - a.asymmetricSigningKey = &ecdsa.PrivateKey{ + a.Srv.asymmetricSigningKey = &ecdsa.PrivateKey{ PublicKey: ecdsa.PublicKey{ Curve: curve, X: key.ECDSAKey.X, @@ -240,31 +239,31 @@ func (a *App) ensureInstallationDate() error { // AsymmetricSigningKey will return a private key that can be used for asymmetric signing. func (a *App) AsymmetricSigningKey() *ecdsa.PrivateKey { - return a.asymmetricSigningKey + return a.Srv.asymmetricSigningKey } func (a *App) regenerateClientConfig() { - a.clientConfig = utils.GenerateClientConfig(a.Config(), a.DiagnosticId(), a.License()) + a.Srv.clientConfig = utils.GenerateClientConfig(a.Config(), a.DiagnosticId(), a.License()) - if a.clientConfig["EnableCustomTermsOfService"] == "true" { + if a.Srv.clientConfig["EnableCustomTermsOfService"] == "true" { termsOfService, err := a.GetLatestTermsOfService() if err != nil { mlog.Err(err) } else { - a.clientConfig["CustomTermsOfServiceId"] = termsOfService.Id + a.Srv.clientConfig["CustomTermsOfServiceId"] = termsOfService.Id } } - a.limitedClientConfig = utils.GenerateLimitedClientConfig(a.Config(), a.DiagnosticId(), a.License()) + a.Srv.limitedClientConfig = utils.GenerateLimitedClientConfig(a.Config(), a.DiagnosticId(), a.License()) if key := a.AsymmetricSigningKey(); key != nil { der, _ := x509.MarshalPKIXPublicKey(&key.PublicKey) - a.clientConfig["AsymmetricSigningPublicKey"] = base64.StdEncoding.EncodeToString(der) - a.limitedClientConfig["AsymmetricSigningPublicKey"] = base64.StdEncoding.EncodeToString(der) + a.Srv.clientConfig["AsymmetricSigningPublicKey"] = base64.StdEncoding.EncodeToString(der) + a.Srv.limitedClientConfig["AsymmetricSigningPublicKey"] = base64.StdEncoding.EncodeToString(der) } - clientConfigJSON, _ := json.Marshal(a.clientConfig) - a.clientConfigHash = fmt.Sprintf("%x", md5.Sum(clientConfigJSON)) + clientConfigJSON, _ := json.Marshal(a.Srv.clientConfig) + a.Srv.clientConfigHash = fmt.Sprintf("%x", md5.Sum(clientConfigJSON)) } func (a *App) Desanitize(cfg *model.Config) { @@ -322,7 +321,7 @@ func (a *App) GetCookieDomain() string { } func (a *App) GetSiteURL() string { - return a.siteURL + return *a.Config().ServiceSettings.SiteURL } // ClientConfigWithComputed gets the configuration in a format suitable for sending to the client. diff --git a/app/config_test.go b/app/config_test.go index 1c1811b94bd..a885eb62f31 100644 --- a/app/config_test.go +++ b/app/config_test.go @@ -35,11 +35,12 @@ func TestLoadConfig(t *testing.T) { require.Nil(t, err) tempConfig.Close() - a := App{} + a := App{ + Srv: &Server{}, + } appErr := a.LoadConfig(tempConfig.Name()) require.Nil(t, appErr) - assert.Equal(t, "http://localhost:8065", a.siteURL) assert.Equal(t, "http://localhost:8065", *a.GetConfig().ServiceSettings.SiteURL) } diff --git a/app/diagnostics.go b/app/diagnostics.go index bc2684f9a43..fdbf9cab7c5 100644 --- a/app/diagnostics.go +++ b/app/diagnostics.go @@ -590,7 +590,7 @@ func (a *App) trackPlugins() { settingsCount := 0 pluginStates := a.Config().PluginSettings.PluginStates - plugins, _ := a.Plugins.Available() + plugins, _ := a.Srv.Plugins.Available() if pluginStates != nil && plugins != nil { for _, plugin := range plugins { diff --git a/app/email.go b/app/email.go index 143d4a052f9..ed9a71a1d52 100644 --- a/app/email.go +++ b/app/email.go @@ -42,7 +42,7 @@ func (a *App) SetupInviteEmailRateLimiting() error { return errors.Wrap(err, "Unable to setup email rate limiting GCRA rate limiter.") } - a.EmailRateLimiter = rateLimiter + a.Srv.EmailRateLimiter = rateLimiter return nil } @@ -286,11 +286,11 @@ func (a *App) SendMfaChangeEmail(email string, activated bool, locale, siteURL s } func (a *App) SendInviteEmails(team *model.Team, senderName string, senderUserId string, invites []string, siteURL string) { - if a.EmailRateLimiter == nil { + if a.Srv.EmailRateLimiter == nil { a.Log.Error("Email invite not sent, rate limiting could not be setup.", mlog.String("user_id", senderUserId), mlog.String("team_id", team.Id)) return } - rateLimited, result, err := a.EmailRateLimiter.RateLimit(senderUserId, len(invites)) + rateLimited, result, err := a.Srv.EmailRateLimiter.RateLimit(senderUserId, len(invites)) if err != nil { a.Log.Error("Error rate limiting invite email.", mlog.String("user_id", senderUserId), mlog.String("team_id", team.Id), mlog.Err(err)) return diff --git a/app/email_batching.go b/app/email_batching.go index ceee47f3246..6adb2b33b27 100644 --- a/app/email_batching.go +++ b/app/email_batching.go @@ -25,13 +25,13 @@ const ( func (a *App) InitEmailBatching() { if *a.Config().EmailSettings.EnableEmailBatching { - if a.EmailBatching == nil { - a.EmailBatching = NewEmailBatchingJob(a, *a.Config().EmailSettings.EmailBatchingBufferSize) + if a.Srv.EmailBatching == nil { + a.Srv.EmailBatching = NewEmailBatchingJob(a, *a.Config().EmailSettings.EmailBatchingBufferSize) } // note that we don't support changing EmailBatchingBufferSize without restarting the server - a.EmailBatching.Start() + a.Srv.EmailBatching.Start() } } @@ -40,7 +40,7 @@ func (a *App) AddNotificationEmailToBatch(user *model.User, post *model.Post, te return model.NewAppError("AddNotificationEmailToBatch", "api.email_batching.add_notification_email_to_batch.disabled.app_error", nil, "", http.StatusNotImplemented) } - if !a.EmailBatching.Add(user, post, team) { + if !a.Srv.EmailBatching.Add(user, post, team) { mlog.Error("Email batching job's receiving channel was full. Please increase the EmailBatchingBufferSize.") return model.NewAppError("AddNotificationEmailToBatch", "api.email_batching.add_notification_email_to_batch.channel_full.app_error", nil, "", http.StatusInternalServerError) } @@ -188,7 +188,7 @@ func (job *EmailBatchingJob) checkPendingNotifications(now time.Time, handler fu // send the email notification if it's been long enough if now.Sub(time.Unix(batchStartTime/1000, 0)) > time.Duration(interval)*time.Second { - job.app.Go(func(userId string, notifications []*batchedNotification) func() { + job.app.Srv.Go(func(userId string, notifications []*batchedNotification) func() { return func() { handler(userId, notifications) } diff --git a/app/file.go b/app/file.go index 8d2a7fc15b0..1901a8beeb9 100644 --- a/app/file.go +++ b/app/file.go @@ -444,7 +444,7 @@ func (a *App) DoUploadFileExpectModification(now time.Time, rawTeamId string, ra if a.PluginsReady() { var rejectionError *model.AppError pluginContext := &plugin.Context{} - a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { + a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { var newBytes bytes.Buffer replacementInfo, rejectionReason := hooks.FileWillBeUploaded(pluginContext, info, bytes.NewReader(data), &newBytes) if rejectionReason != "" { diff --git a/app/job.go b/app/job.go index 782301de45d..50ce346f096 100644 --- a/app/job.go +++ b/app/job.go @@ -40,9 +40,9 @@ func (a *App) GetJobsByType(jobType string, offset int, limit int) ([]*model.Job } func (a *App) CreateJob(job *model.Job) (*model.Job, *model.AppError) { - return a.Jobs.CreateJob(job.Type, job.Data) + return a.Srv.Jobs.CreateJob(job.Type, job.Data) } func (a *App) CancelJob(jobId string) *model.AppError { - return a.Jobs.RequestCancellation(jobId) + return a.Srv.Jobs.RequestCancellation(jobId) } diff --git a/app/ldap.go b/app/ldap.go index 544905b702c..d254c656cd3 100644 --- a/app/ldap.go +++ b/app/ldap.go @@ -13,7 +13,7 @@ import ( ) func (a *App) SyncLdap() { - a.Go(func() { + a.Srv.Go(func() { if license := a.License(); license != nil && *license.Features.LDAP && *a.Config().LdapSettings.EnableSync { if ldapI := a.Ldap; ldapI != nil { @@ -67,7 +67,7 @@ func (a *App) SwitchEmailToLdap(email, password, code, ldapLoginId, ldapPassword return "", err } - a.Go(func() { + a.Srv.Go(func() { if err := a.SendSignInChangeEmail(user.Email, "AD/LDAP", user.Locale, a.GetSiteURL()); err != nil { mlog.Error(err.Error()) } @@ -113,7 +113,7 @@ func (a *App) SwitchLdapToEmail(ldapPassword, code, email, newPassword string) ( T := utils.GetUserTranslations(user.Locale) - a.Go(func() { + a.Srv.Go(func() { if err := a.SendSignInChangeEmail(user.Email, T("api.templates.signin_change_email.body.method_email"), user.Locale, a.GetSiteURL()); err != nil { mlog.Error(err.Error()) } diff --git a/app/license.go b/app/license.go index ec18ec31869..ea7f7d4b0c0 100644 --- a/app/license.go +++ b/app/license.go @@ -93,10 +93,10 @@ func (a *App) SaveLicense(licenseBytes []byte) (*model.License, *model.AppError) // doesn't start until the server is restarted, which prevents the 'run job now' buttons in system console from // functioning as expected if *a.Config().JobSettings.RunJobs { - a.Jobs.StartWorkers() + a.Srv.Jobs.StartWorkers() } if *a.Config().JobSettings.RunScheduler { - a.Jobs.StartSchedulers() + a.Srv.Jobs.StartSchedulers() } return license, nil @@ -104,13 +104,13 @@ func (a *App) SaveLicense(licenseBytes []byte) (*model.License, *model.AppError) // License returns the currently active license or nil if the application is unlicensed. func (a *App) License() *model.License { - license, _ := a.licenseValue.Load().(*model.License) + license, _ := a.Srv.licenseValue.Load().(*model.License) return license } func (a *App) SetLicense(license *model.License) bool { defer func() { - for _, listener := range a.licenseListeners { + for _, listener := range a.Srv.licenseListeners { listener() } }() @@ -119,14 +119,14 @@ func (a *App) SetLicense(license *model.License) bool { license.Features.SetDefaults() if !license.IsExpired() { - a.licenseValue.Store(license) - a.clientLicenseValue.Store(utils.GetClientLicense(license)) + a.Srv.licenseValue.Store(license) + a.Srv.clientLicenseValue.Store(utils.GetClientLicense(license)) return true } } - a.licenseValue.Store((*model.License)(nil)) - a.clientLicenseValue.Store(map[string]string(nil)) + a.Srv.licenseValue.Store((*model.License)(nil)) + a.Srv.clientLicenseValue.Store(map[string]string(nil)) return false } @@ -141,18 +141,18 @@ func (a *App) ValidateAndSetLicenseBytes(b []byte) { } func (a *App) SetClientLicense(m map[string]string) { - a.clientLicenseValue.Store(m) + a.Srv.clientLicenseValue.Store(m) } func (a *App) ClientLicense() map[string]string { - if clientLicense, _ := a.clientLicenseValue.Load().(map[string]string); clientLicense != nil { + if clientLicense, _ := a.Srv.clientLicenseValue.Load().(map[string]string); clientLicense != nil { return clientLicense } return map[string]string{"IsLicensed": "false"} } func (a *App) RemoveLicense() *model.AppError { - if license, _ := a.licenseValue.Load().(*model.License); license == nil { + if license, _ := a.Srv.licenseValue.Load().(*model.License); license == nil { return nil } @@ -174,12 +174,12 @@ func (a *App) RemoveLicense() *model.AppError { func (a *App) AddLicenseListener(listener func()) string { id := model.NewId() - a.licenseListeners[id] = listener + a.Srv.licenseListeners[id] = listener return id } func (a *App) RemoveLicenseListener(id string) { - delete(a.licenseListeners, id) + delete(a.Srv.licenseListeners, id) } func (a *App) GetClientLicenseEtag(useSanitized bool) string { diff --git a/app/login.go b/app/login.go index 01cdde38630..28f207f75f8 100644 --- a/app/login.go +++ b/app/login.go @@ -69,7 +69,7 @@ func (a *App) AuthenticateUserForLogin(id, loginId, password, mfaToken string, l if a.PluginsReady() { var rejectionReason string pluginContext := &plugin.Context{} - a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { + a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { rejectionReason = hooks.UserWillLogIn(pluginContext, user) return rejectionReason == "" }, plugin.UserWillLogInId) @@ -78,9 +78,9 @@ func (a *App) AuthenticateUserForLogin(id, loginId, password, mfaToken string, l return nil, model.NewAppError("AuthenticateUserForLogin", "Login rejected by plugin: "+rejectionReason, nil, "", http.StatusBadRequest) } - a.Go(func() { + a.Srv.Go(func() { pluginContext := &plugin.Context{} - a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { + a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { hooks.UserHasLoggedIn(pluginContext, user) return true }, plugin.UserHasLoggedInId) diff --git a/app/notification.go b/app/notification.go index 54f1f470da7..4501b311d14 100644 --- a/app/notification.go +++ b/app/notification.go @@ -78,7 +78,7 @@ func (a *App) SendNotifications(post *model.Post, team *model.Team, channel *mod } if post.Type != model.POST_AUTO_RESPONDER { - a.Go(func() { + a.Srv.Go(func() { a.SendAutoResponse(channel, otherUser) }) } @@ -127,7 +127,7 @@ func (a *App) SendNotifications(post *model.Post, team *model.Team, channel *mod if result := <-a.Srv.Store.User().GetProfilesByUsernames(m.OtherPotentialMentions, team.Id); result.Err == nil { outOfChannelMentions := result.Data.([]*model.User) if channel.Type != model.CHANNEL_GROUP { - a.Go(func() { + a.Srv.Go(func() { a.sendOutOfChannelMentions(sender, post, outOfChannelMentions) }) } diff --git a/app/notification_email.go b/app/notification_email.go index 9eb842084dc..8333ee80989 100644 --- a/app/notification_email.go +++ b/app/notification_email.go @@ -103,7 +103,7 @@ func (a *App) sendNotificationEmail(notification *postNotification, user *model. teamURL := a.GetSiteURL() + "/" + team.Name var bodyText = a.getNotificationEmailBody(user, post, channel, channelName, senderName, team.Name, teamURL, emailNotificationContentsType, useMilitaryTime, translateFunc) - a.Go(func() { + a.Srv.Go(func() { if err := a.SendMail(user.Email, html.UnescapeString(subjectText), bodyText); err != nil { mlog.Error(fmt.Sprint("Error to send the email", user.Email, err)) } diff --git a/app/notification_push.go b/app/notification_push.go index a17ccb3755c..eb392bc220b 100644 --- a/app/notification_push.go +++ b/app/notification_push.go @@ -132,7 +132,7 @@ func (a *App) sendPushNotification(notification *postNotification, user *model.U channelName := notification.GetChannelName(nameFormat, user.Id) senderName := notification.GetSenderName(nameFormat, cfg.ServiceSettings.EnablePostUsernameOverride) - c := a.PushNotificationsHub.GetGoChannelFromUserId(user.Id) + c := a.Srv.PushNotificationsHub.GetGoChannelFromUserId(user.Id) c <- PushNotification{ notificationType: NOTIFICATION_TYPE_MESSAGE, post: post, @@ -217,7 +217,7 @@ func (a *App) ClearPushNotificationSync(userId string, channelId string) { } func (a *App) ClearPushNotification(userId string, channelId string) { - channel := a.PushNotificationsHub.GetGoChannelFromUserId(userId) + channel := a.Srv.PushNotificationsHub.GetGoChannelFromUserId(userId) channel <- PushNotification{ notificationType: NOTIFICATION_TYPE_CLEAR, userId: userId, @@ -232,7 +232,7 @@ func (a *App) CreatePushNotificationsHub() { for x := 0; x < PUSH_NOTIFICATION_HUB_WORKERS; x++ { hub.Channels = append(hub.Channels, make(chan PushNotification, PUSH_NOTIFICATIONS_HUB_BUFFER_PER_WORKER)) } - a.PushNotificationsHub = hub + a.Srv.PushNotificationsHub = hub } func (a *App) pushNotificationWorker(notifications chan PushNotification) { @@ -259,13 +259,13 @@ func (a *App) pushNotificationWorker(notifications chan PushNotification) { func (a *App) StartPushNotificationsHubWorkers() { for x := 0; x < PUSH_NOTIFICATION_HUB_WORKERS; x++ { - channel := a.PushNotificationsHub.Channels[x] - a.Go(func() { a.pushNotificationWorker(channel) }) + channel := a.Srv.PushNotificationsHub.Channels[x] + a.Srv.Go(func() { a.pushNotificationWorker(channel) }) } } func (a *App) StopPushNotificationsHubWorkers() { - for _, channel := range a.PushNotificationsHub.Channels { + for _, channel := range a.Srv.PushNotificationsHub.Channels { close(channel) } } diff --git a/app/oauth.go b/app/oauth.go index 645d502f5e6..e1e02f929bd 100644 --- a/app/oauth.go +++ b/app/oauth.go @@ -601,7 +601,7 @@ func (a *App) CompleteSwitchWithOAuth(service string, userData io.ReadCloser, em return nil, result.Err } - a.Go(func() { + a.Srv.Go(func() { if err := a.SendSignInChangeEmail(user.Email, strings.Title(service)+" SSO", user.Locale, a.GetSiteURL()); err != nil { mlog.Error(err.Error()) } @@ -859,7 +859,7 @@ func (a *App) SwitchOAuthToEmail(email, password, requesterId string) (string, * T := utils.GetUserTranslations(user.Locale) - a.Go(func() { + a.Srv.Go(func() { if err := a.SendSignInChangeEmail(user.Email, T("api.templates.signin_change_email.body.method_email"), user.Locale, a.GetSiteURL()); err != nil { mlog.Error(err.Error()) } diff --git a/app/options.go b/app/options.go index 4645660242b..d490ecb06c8 100644 --- a/app/options.go +++ b/app/options.go @@ -17,11 +17,11 @@ func StoreOverride(override interface{}) Option { return func(a *App) { switch o := override.(type) { case store.Store: - a.newStore = func() store.Store { + a.Srv.newStore = func() store.Store { return o } case func(*App) store.Store: - a.newStore = func() store.Store { + a.Srv.newStore = func() store.Store { return o(a) } default: @@ -32,10 +32,10 @@ func StoreOverride(override interface{}) Option { func ConfigFile(file string) Option { return func(a *App) { - a.configFile = file + a.Srv.configFile = file } } func DisableConfigWatch(a *App) { - a.disableConfigWatch = true + a.Srv.disableConfigWatch = true } diff --git a/app/plugin.go b/app/plugin.go index 2c18c6cecfa..2c1eb3862ad 100644 --- a/app/plugin.go +++ b/app/plugin.go @@ -16,21 +16,21 @@ import ( ) func (a *App) SyncPluginsActiveState() { - if a.Plugins == nil { + if a.Srv.Plugins == nil { return } config := a.Config().PluginSettings if *config.Enable { - availablePlugins, err := a.Plugins.Available() + availablePlugins, err := a.Srv.Plugins.Available() if err != nil { a.Log.Error("Unable to get available plugins", mlog.Err(err)) return } // Deactivate any plugins that have been disabled. - for _, plugin := range a.Plugins.Active() { + for _, plugin := range a.Srv.Plugins.Active() { // Determine if plugin is enabled pluginId := plugin.Manifest.Id pluginEnabled := false @@ -40,7 +40,7 @@ func (a *App) SyncPluginsActiveState() { // If it's not enabled we need to deactivate it if !pluginEnabled { - deactivated := a.Plugins.Deactivate(pluginId) + deactivated := a.Srv.Plugins.Deactivate(pluginId) if deactivated && plugin.Manifest.HasClient() { message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PLUGIN_DISABLED, "", "", "", nil) message.Add("manifest", plugin.Manifest.ClientManifest()) @@ -65,7 +65,7 @@ func (a *App) SyncPluginsActiveState() { // Activate plugin if enabled if pluginEnabled { - updatedManifest, activated, err := a.Plugins.Activate(pluginId) + updatedManifest, activated, err := a.Srv.Plugins.Activate(pluginId) if err != nil { plugin.WrapLogger(a.Log).Error("Unable to activate plugin", mlog.Err(err)) continue @@ -79,7 +79,7 @@ func (a *App) SyncPluginsActiveState() { } } } else { // If plugins are disabled, shutdown plugins. - a.Plugins.Shutdown() + a.Srv.Plugins.Shutdown() } if err := a.notifyPluginStatusesChanged(); err != nil { @@ -92,7 +92,7 @@ func (a *App) NewPluginAPI(manifest *model.Manifest) plugin.API { } func (a *App) InitPlugins(pluginDir, webappPluginDir string) { - if a.Plugins != nil || !*a.Config().PluginSettings.Enable { + if a.Srv.Plugins != nil || !*a.Config().PluginSettings.Enable { a.SyncPluginsActiveState() return } @@ -113,7 +113,7 @@ func (a *App) InitPlugins(pluginDir, webappPluginDir string) { mlog.Error("Failed to start up plugins", mlog.Err(err)) return } else { - a.Plugins = env + a.Srv.Plugins = env } prepackagedPluginsDir, found := utils.FindDir("prepackaged_plugins") @@ -136,10 +136,10 @@ func (a *App) InitPlugins(pluginDir, webappPluginDir string) { } // Sync plugin active state when config changes. Also notify plugins. - a.RemoveConfigListener(a.PluginConfigListenerId) - a.PluginConfigListenerId = a.AddConfigListener(func(*model.Config, *model.Config) { + a.RemoveConfigListener(a.Srv.PluginConfigListenerId) + a.Srv.PluginConfigListenerId = a.AddConfigListener(func(*model.Config, *model.Config) { a.SyncPluginsActiveState() - a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { + a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { hooks.OnConfigurationChange() return true }, plugin.OnConfigurationChangeId) @@ -150,25 +150,25 @@ func (a *App) InitPlugins(pluginDir, webappPluginDir string) { } func (a *App) ShutDownPlugins() { - if a.Plugins == nil { + if a.Srv.Plugins == nil { return } mlog.Info("Shutting down plugins") - a.Plugins.Shutdown() + a.Srv.Plugins.Shutdown() - a.RemoveConfigListener(a.PluginConfigListenerId) - a.PluginConfigListenerId = "" - a.Plugins = nil + a.RemoveConfigListener(a.Srv.PluginConfigListenerId) + a.Srv.PluginConfigListenerId = "" + a.Srv.Plugins = nil } func (a *App) GetActivePluginManifests() ([]*model.Manifest, *model.AppError) { - if a.Plugins == nil || !*a.Config().PluginSettings.Enable { + if a.Srv.Plugins == nil || !*a.Config().PluginSettings.Enable { return nil, model.NewAppError("GetActivePluginManifests", "app.plugin.disabled.app_error", nil, "", http.StatusNotImplemented) } - plugins := a.Plugins.Active() + plugins := a.Srv.Plugins.Active() manifests := make([]*model.Manifest, len(plugins)) for i, plugin := range plugins { @@ -181,11 +181,11 @@ func (a *App) GetActivePluginManifests() ([]*model.Manifest, *model.AppError) { // EnablePlugin will set the config for an installed plugin to enabled, triggering asynchronous // activation if inactive anywhere in the cluster. func (a *App) EnablePlugin(id string) *model.AppError { - if a.Plugins == nil || !*a.Config().PluginSettings.Enable { + if a.Srv.Plugins == nil || !*a.Config().PluginSettings.Enable { return model.NewAppError("EnablePlugin", "app.plugin.disabled.app_error", nil, "", http.StatusNotImplemented) } - plugins, err := a.Plugins.Available() + plugins, err := a.Srv.Plugins.Available() if err != nil { return model.NewAppError("EnablePlugin", "app.plugin.config.app_error", nil, err.Error(), http.StatusInternalServerError) } @@ -221,11 +221,11 @@ func (a *App) EnablePlugin(id string) *model.AppError { // DisablePlugin will set the config for an installed plugin to disabled, triggering deactivation if active. func (a *App) DisablePlugin(id string) *model.AppError { - if a.Plugins == nil || !*a.Config().PluginSettings.Enable { + if a.Srv.Plugins == nil || !*a.Config().PluginSettings.Enable { return model.NewAppError("DisablePlugin", "app.plugin.disabled.app_error", nil, "", http.StatusNotImplemented) } - plugins, err := a.Plugins.Available() + plugins, err := a.Srv.Plugins.Available() if err != nil { return model.NewAppError("DisablePlugin", "app.plugin.config.app_error", nil, err.Error(), http.StatusInternalServerError) } @@ -256,7 +256,7 @@ func (a *App) DisablePlugin(id string) *model.AppError { } func (a *App) PluginsReady() bool { - return a.Plugins != nil && *a.Config().PluginSettings.Enable + return a.Srv.Plugins != nil && *a.Config().PluginSettings.Enable } func (a *App) GetPlugins() (*model.PluginsResponse, *model.AppError) { @@ -264,7 +264,7 @@ func (a *App) GetPlugins() (*model.PluginsResponse, *model.AppError) { return nil, model.NewAppError("GetPlugins", "app.plugin.disabled.app_error", nil, "", http.StatusNotImplemented) } - availablePlugins, err := a.Plugins.Available() + availablePlugins, err := a.Srv.Plugins.Available() if err != nil { return nil, model.NewAppError("GetPlugins", "app.plugin.get_plugins.app_error", nil, err.Error(), http.StatusInternalServerError) } @@ -278,7 +278,7 @@ func (a *App) GetPlugins() (*model.PluginsResponse, *model.AppError) { Manifest: *plugin.Manifest, } - if a.Plugins.IsActive(plugin.Manifest.Id) { + if a.Srv.Plugins.IsActive(plugin.Manifest.Id) { resp.Active = append(resp.Active, info) } else { resp.Inactive = append(resp.Inactive, info) diff --git a/app/plugin_api_test.go b/app/plugin_api_test.go index d95ae38fb97..6c51189deea 100644 --- a/app/plugin_api_test.go +++ b/app/plugin_api_test.go @@ -33,7 +33,7 @@ func setupPluginApiTest(t *testing.T, pluginCode string, pluginManifest string, ioutil.WriteFile(filepath.Join(pluginDir, pluginId, "plugin.json"), []byte(pluginManifest), 0600) env.Activate(pluginId) - app.Plugins = env + app.Srv.Plugins = env } func TestPluginAPIUpdateUserStatus(t *testing.T) { @@ -120,7 +120,7 @@ func TestPluginAPILoadPluginConfiguration(t *testing.T) { } ] }}`, "testloadpluginconfig", th.App) - hooks, err := th.App.Plugins.HooksForPlugin("testloadpluginconfig") + hooks, err := th.App.Srv.Plugins.HooksForPlugin("testloadpluginconfig") assert.NoError(t, err) _, ret := hooks.MessageWillBePosted(nil, nil) assert.Equal(t, "str32true", ret) @@ -194,7 +194,7 @@ func TestPluginAPILoadPluginConfigurationDefaults(t *testing.T) { } ] }}`, "testloadpluginconfig", th.App) - hooks, err := th.App.Plugins.HooksForPlugin("testloadpluginconfig") + hooks, err := th.App.Srv.Plugins.HooksForPlugin("testloadpluginconfig") assert.NoError(t, err) _, ret := hooks.MessageWillBePosted(nil, nil) assert.Equal(t, "override35true", ret) diff --git a/app/plugin_commands.go b/app/plugin_commands.go index daa92ce5ac6..cbaeb507c95 100644 --- a/app/plugin_commands.go +++ b/app/plugin_commands.go @@ -31,10 +31,10 @@ func (a *App) RegisterPluginCommand(pluginId string, command *model.Command) err DisplayName: command.DisplayName, } - a.pluginCommandsLock.Lock() - defer a.pluginCommandsLock.Unlock() + a.Srv.pluginCommandsLock.Lock() + defer a.Srv.pluginCommandsLock.Unlock() - for _, pc := range a.pluginCommands { + for _, pc := range a.Srv.pluginCommands { if pc.Command.Trigger == command.Trigger && pc.Command.TeamId == command.TeamId { if pc.PluginId == pluginId { pc.Command = command @@ -43,7 +43,7 @@ func (a *App) RegisterPluginCommand(pluginId string, command *model.Command) err } } - a.pluginCommands = append(a.pluginCommands, &PluginCommand{ + a.Srv.pluginCommands = append(a.Srv.pluginCommands, &PluginCommand{ Command: command, PluginId: pluginId, }) @@ -53,37 +53,37 @@ func (a *App) RegisterPluginCommand(pluginId string, command *model.Command) err func (a *App) UnregisterPluginCommand(pluginId, teamId, trigger string) { trigger = strings.ToLower(trigger) - a.pluginCommandsLock.Lock() - defer a.pluginCommandsLock.Unlock() + a.Srv.pluginCommandsLock.Lock() + defer a.Srv.pluginCommandsLock.Unlock() var remaining []*PluginCommand - for _, pc := range a.pluginCommands { + for _, pc := range a.Srv.pluginCommands { if pc.Command.TeamId != teamId || pc.Command.Trigger != trigger { remaining = append(remaining, pc) } } - a.pluginCommands = remaining + a.Srv.pluginCommands = remaining } func (a *App) UnregisterPluginCommands(pluginId string) { - a.pluginCommandsLock.Lock() - defer a.pluginCommandsLock.Unlock() + a.Srv.pluginCommandsLock.Lock() + defer a.Srv.pluginCommandsLock.Unlock() var remaining []*PluginCommand - for _, pc := range a.pluginCommands { + for _, pc := range a.Srv.pluginCommands { if pc.PluginId != pluginId { remaining = append(remaining, pc) } } - a.pluginCommands = remaining + a.Srv.pluginCommands = remaining } func (a *App) PluginCommandsForTeam(teamId string) []*model.Command { - a.pluginCommandsLock.RLock() - defer a.pluginCommandsLock.RUnlock() + a.Srv.pluginCommandsLock.RLock() + defer a.Srv.pluginCommandsLock.RUnlock() var commands []*model.Command - for _, pc := range a.pluginCommands { + for _, pc := range a.Srv.pluginCommands { if pc.Command.TeamId == "" || pc.Command.TeamId == teamId { commands = append(commands, pc.Command) } @@ -96,12 +96,12 @@ func (a *App) ExecutePluginCommand(args *model.CommandArgs) (*model.Command, *mo trigger := parts[0][1:] trigger = strings.ToLower(trigger) - a.pluginCommandsLock.RLock() - defer a.pluginCommandsLock.RUnlock() + a.Srv.pluginCommandsLock.RLock() + defer a.Srv.pluginCommandsLock.RUnlock() - for _, pc := range a.pluginCommands { + for _, pc := range a.Srv.pluginCommands { if (pc.Command.TeamId == "" || pc.Command.TeamId == args.TeamId) && pc.Command.Trigger == trigger { - pluginHooks, err := a.Plugins.HooksForPlugin(pc.PluginId) + pluginHooks, err := a.Srv.Plugins.HooksForPlugin(pc.PluginId) if err != nil { return pc.Command, nil, model.NewAppError("ExecutePluginCommand", "model.plugin_command.error.app_error", nil, "err="+err.Error(), http.StatusInternalServerError) } diff --git a/app/plugin_hooks_test.go b/app/plugin_hooks_test.go index 766385af419..9016b84e46d 100644 --- a/app/plugin_hooks_test.go +++ b/app/plugin_hooks_test.go @@ -44,7 +44,7 @@ func SetAppEnvironmentWithPlugins(t *testing.T, pluginCode []string, app *App, a env, err := plugin.NewEnvironment(apiFunc, pluginDir, webappPluginDir, app.Log) require.NoError(t, err) - app.Plugins = env + app.Srv.Plugins = env pluginIds := []string{} activationErrors := []error{} for _, code := range pluginCode { diff --git a/app/plugin_install.go b/app/plugin_install.go index 65e69735620..aaa6e46a533 100644 --- a/app/plugin_install.go +++ b/app/plugin_install.go @@ -22,7 +22,7 @@ func (a *App) InstallPlugin(pluginFile io.Reader, replace bool) (*model.Manifest } func (a *App) installPlugin(pluginFile io.Reader, replace bool) (*model.Manifest, *model.AppError) { - if a.Plugins == nil || !*a.Config().PluginSettings.Enable { + if a.Srv.Plugins == nil || !*a.Config().PluginSettings.Enable { return nil, model.NewAppError("installPlugin", "app.plugin.disabled.app_error", nil, "", http.StatusNotImplemented) } @@ -55,7 +55,7 @@ func (a *App) installPlugin(pluginFile io.Reader, replace bool) (*model.Manifest return nil, model.NewAppError("installPlugin", "app.plugin.invalid_id.app_error", map[string]interface{}{"Min": plugin.MinIdLength, "Max": plugin.MaxIdLength, "Regex": plugin.ValidIdRegex}, "", http.StatusBadRequest) } - bundles, err := a.Plugins.Available() + bundles, err := a.Srv.Plugins.Available() if err != nil { return nil, model.NewAppError("installPlugin", "app.plugin.install.app_error", nil, err.Error(), http.StatusInternalServerError) } @@ -91,11 +91,11 @@ func (a *App) RemovePlugin(id string) *model.AppError { } func (a *App) removePlugin(id string) *model.AppError { - if a.Plugins == nil || !*a.Config().PluginSettings.Enable { + if a.Srv.Plugins == nil || !*a.Config().PluginSettings.Enable { return model.NewAppError("removePlugin", "app.plugin.disabled.app_error", nil, "", http.StatusNotImplemented) } - plugins, err := a.Plugins.Available() + plugins, err := a.Srv.Plugins.Available() if err != nil { return model.NewAppError("removePlugin", "app.plugin.deactivate.app_error", nil, err.Error(), http.StatusBadRequest) } @@ -114,13 +114,13 @@ func (a *App) removePlugin(id string) *model.AppError { return model.NewAppError("removePlugin", "app.plugin.not_installed.app_error", nil, "", http.StatusBadRequest) } - if a.Plugins.IsActive(id) && manifest.HasClient() { + if a.Srv.Plugins.IsActive(id) && manifest.HasClient() { message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PLUGIN_DISABLED, "", "", "", nil) message.Add("manifest", manifest.ClientManifest()) a.Publish(message) } - a.Plugins.Deactivate(id) + a.Srv.Plugins.Deactivate(id) a.UnregisterPluginCommands(id) err = os.RemoveAll(pluginPath) diff --git a/app/plugin_requests.go b/app/plugin_requests.go index cf1033ce4a9..4070a060f43 100644 --- a/app/plugin_requests.go +++ b/app/plugin_requests.go @@ -19,7 +19,7 @@ import ( ) func (a *App) ServePluginRequest(w http.ResponseWriter, r *http.Request) { - if a.Plugins == nil || !*a.Config().PluginSettings.Enable { + if a.Srv.Plugins == nil || !*a.Config().PluginSettings.Enable { err := model.NewAppError("ServePluginRequest", "app.plugin.disabled.app_error", nil, "Enable plugins to serve plugin requests", http.StatusNotImplemented) a.Log.Error(err.Error()) w.WriteHeader(err.StatusCode) @@ -29,7 +29,7 @@ func (a *App) ServePluginRequest(w http.ResponseWriter, r *http.Request) { } params := mux.Vars(r) - hooks, err := a.Plugins.HooksForPlugin(params["plugin_id"]) + hooks, err := a.Srv.Plugins.HooksForPlugin(params["plugin_id"]) if err != nil { a.Log.Error("Access to route for non-existent plugin", mlog.String("missing_plugin_id", params["plugin_id"]), mlog.Err(err)) http.NotFound(w, r) diff --git a/app/plugin_statuses.go b/app/plugin_statuses.go index 4922f0fc1c1..df6477f89bd 100644 --- a/app/plugin_statuses.go +++ b/app/plugin_statuses.go @@ -11,11 +11,11 @@ import ( // GetPluginStatuses returns the status for plugins installed on this server. func (a *App) GetPluginStatuses() (model.PluginStatuses, *model.AppError) { - if a.Plugins == nil || !*a.Config().PluginSettings.Enable { + if a.Srv.Plugins == nil || !*a.Config().PluginSettings.Enable { return nil, model.NewAppError("GetPluginStatuses", "app.plugin.disabled.app_error", nil, "", http.StatusNotImplemented) } - pluginStatuses, err := a.Plugins.Statuses() + pluginStatuses, err := a.Srv.Plugins.Statuses() if err != nil { return nil, model.NewAppError("GetPluginStatuses", "app.plugin.get_statuses.app_error", nil, err.Error(), http.StatusInternalServerError) } diff --git a/app/post.go b/app/post.go index c882fc0562d..019cf207f52 100644 --- a/app/post.go +++ b/app/post.go @@ -150,7 +150,7 @@ func (a *App) CreatePost(post *model.Post, channel *model.Channel, triggerWebhoo if a.PluginsReady() { var rejectionError *model.AppError pluginContext := &plugin.Context{} - a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { + a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { replacementPost, rejectionReason := hooks.MessageWillBePosted(pluginContext, post) if rejectionReason != "" { rejectionError = model.NewAppError("createPost", "Post rejected by plugin. "+rejectionReason, nil, "", http.StatusBadRequest) @@ -174,9 +174,9 @@ func (a *App) CreatePost(post *model.Post, channel *model.Channel, triggerWebhoo rpost := result.Data.(*model.Post) if a.PluginsReady() { - a.Go(func() { + a.Srv.Go(func() { pluginContext := &plugin.Context{} - a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { + a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { hooks.MessageHasBeenPosted(pluginContext, rpost) return true }, plugin.MessageHasBeenPostedId) @@ -185,7 +185,7 @@ func (a *App) CreatePost(post *model.Post, channel *model.Channel, triggerWebhoo esInterface := a.Elasticsearch if esInterface != nil && *a.Config().ElasticsearchSettings.EnableIndexing { - a.Go(func() { + a.Srv.Go(func() { esInterface.IndexPost(rpost, channel.TeamId) }) } @@ -277,7 +277,7 @@ func (a *App) handlePostEvents(post *model.Post, user *model.User, channel *mode } if triggerWebhooks { - a.Go(func() { + a.Srv.Go(func() { if err := a.handleWebhookEvents(post, team, channel, user); err != nil { mlog.Error(err.Error()) } @@ -362,7 +362,7 @@ func (a *App) UpdatePost(post *model.Post, safeUpdate bool) (*model.Post, *model if a.PluginsReady() { var rejectionReason string pluginContext := &plugin.Context{} - a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { + a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { newPost, rejectionReason = hooks.MessageWillBeUpdated(pluginContext, newPost, oldPost) return post != nil }, plugin.MessageWillBeUpdatedId) @@ -378,9 +378,9 @@ func (a *App) UpdatePost(post *model.Post, safeUpdate bool) (*model.Post, *model rpost := result.Data.(*model.Post) if a.PluginsReady() { - a.Go(func() { + a.Srv.Go(func() { pluginContext := &plugin.Context{} - a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { + a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { hooks.MessageHasBeenUpdated(pluginContext, newPost, oldPost) return true }, plugin.MessageHasBeenUpdatedId) @@ -389,7 +389,7 @@ func (a *App) UpdatePost(post *model.Post, safeUpdate bool) (*model.Post, *model esInterface := a.Elasticsearch if esInterface != nil && *a.Config().ElasticsearchSettings.EnableIndexing { - a.Go(func() { + a.Srv.Go(func() { rchannel := <-a.Srv.Store.Channel().GetForPost(rpost.Id) if rchannel.Err != nil { mlog.Error(fmt.Sprintf("Couldn't get channel %v for post %v for Elasticsearch indexing.", rpost.ChannelId, rpost.Id)) @@ -567,16 +567,16 @@ func (a *App) DeletePost(postId, deleteByID string) (*model.Post, *model.AppErro message.Add("post", a.PostWithProxyAddedToImageURLs(post).ToJson()) a.Publish(message) - a.Go(func() { + a.Srv.Go(func() { a.DeletePostFiles(post) }) - a.Go(func() { + a.Srv.Go(func() { a.DeleteFlaggedPosts(post.Id) }) esInterface := a.Elasticsearch if esInterface != nil && *a.Config().ElasticsearchSettings.EnableIndexing { - a.Go(func() { + a.Srv.Go(func() { esInterface.DeletePost(post) }) } diff --git a/app/post_test.go b/app/post_test.go index 5d93d3f0f5a..3b5bc910cd4 100644 --- a/app/post_test.go +++ b/app/post_test.go @@ -648,9 +648,9 @@ func TestMaxPostSize(t *testing.T) { app := App{ Srv: &Server{ - Store: mockStore, + Store: mockStore, + config: atomic.Value{}, }, - config: atomic.Value{}, } assert.Equal(t, testCase.ExpectedMaxPostSize, app.MaxPostSize()) diff --git a/app/reaction.go b/app/reaction.go index 41fc7fca41f..9b30e610fcd 100644 --- a/app/reaction.go +++ b/app/reaction.go @@ -42,7 +42,7 @@ func (a *App) SaveReactionForPost(reaction *model.Reaction) (*model.Reaction, *m reaction = result.Data.(*model.Reaction) - a.Go(func() { + a.Srv.Go(func() { a.sendReactionEvent(model.WEBSOCKET_EVENT_REACTION_ADDED, reaction, post, true) }) @@ -92,7 +92,7 @@ func (a *App) DeleteReactionForPost(reaction *model.Reaction) *model.AppError { return result.Err } - a.Go(func() { + a.Srv.Go(func() { a.sendReactionEvent(model.WEBSOCKET_EVENT_REACTION_REMOVED, reaction, post, hasReactions) }) diff --git a/app/role.go b/app/role.go index 8a0e9deceab..81415750767 100644 --- a/app/role.go +++ b/app/role.go @@ -104,7 +104,7 @@ func (a *App) sendUpdatedRoleEvent(role *model.Role) { message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_ROLE_UPDATED, "", "", "", nil) message.Add("role", role.ToJson()) - a.Go(func() { + a.Srv.Go(func() { a.Publish(message) }) } diff --git a/app/scheme.go b/app/scheme.go index 0fa2647b2e2..b3bbb2614c1 100644 --- a/app/scheme.go +++ b/app/scheme.go @@ -152,7 +152,7 @@ func (a *App) GetChannelsForScheme(scheme *model.Scheme, offset int, limit int) } func (a *App) IsPhase2MigrationCompleted() *model.AppError { - if a.phase2PermissionsMigrationComplete { + if a.Srv.phase2PermissionsMigrationComplete { return nil } @@ -160,7 +160,7 @@ func (a *App) IsPhase2MigrationCompleted() *model.AppError { return model.NewAppError("App.IsPhase2MigrationCompleted", "app.schemes.is_phase_2_migration_completed.not_completed.app_error", nil, result.Err.Error(), http.StatusNotImplemented) } - a.phase2PermissionsMigrationComplete = true + a.Srv.phase2PermissionsMigrationComplete = true return nil } diff --git a/app/server.go b/app/server.go index b95059c8407..53e5e039da1 100644 --- a/app/server.go +++ b/app/server.go @@ -5,6 +5,7 @@ package app import ( "context" + "crypto/ecdsa" "crypto/tls" "fmt" "io" @@ -14,16 +15,21 @@ import ( "net/url" "os" "strings" + "sync" + "sync/atomic" "time" "github.com/gorilla/handlers" "github.com/gorilla/mux" "github.com/pkg/errors" "github.com/rs/cors" + "github.com/throttled/throttled" "golang.org/x/crypto/acme/autocert" + "github.com/mattermost/mattermost-server/jobs" "github.com/mattermost/mattermost-server/mlog" "github.com/mattermost/mattermost-server/model" + "github.com/mattermost/mattermost-server/plugin" "github.com/mattermost/mattermost-server/store" "github.com/mattermost/mattermost-server/utils" ) @@ -44,6 +50,79 @@ type Server struct { RateLimiter *RateLimiter didFinishListen chan struct{} + + goroutineCount int32 + goroutineExitSignal chan struct{} + + Plugins *plugin.Environment + PluginConfigListenerId string + + EmailBatching *EmailBatchingJob + EmailRateLimiter *throttled.GCRARateLimiter + + Hubs []*Hub + HubsStopCheckingForDeadlock chan bool + + PushNotificationsHub PushNotificationsHub + + Jobs *jobs.JobServer + + config atomic.Value + envConfig map[string]interface{} + configFile string + configListeners map[string]func(*model.Config, *model.Config) + clusterLeaderListeners sync.Map + + licenseValue atomic.Value + clientLicenseValue atomic.Value + licenseListeners map[string]func() + + timezones atomic.Value + + newStore func() store.Store + + htmlTemplateWatcher *utils.HTMLTemplateWatcher + sessionCache *utils.Cache + configListenerId string + licenseListenerId string + logListenerId string + clusterLeaderListenerId string + disableConfigWatch bool + configWatcher *utils.ConfigWatcher + asymmetricSigningKey *ecdsa.PrivateKey + + pluginCommands []*PluginCommand + pluginCommandsLock sync.RWMutex + + clientConfig map[string]string + clientConfigHash string + limitedClientConfig map[string]string + diagnosticId string + + phase2PermissionsMigrationComplete bool +} + +// Go creates a goroutine, but maintains a record of it to ensure that execution completes before +// the app is destroyed. +func (s *Server) Go(f func()) { + atomic.AddInt32(&s.goroutineCount, 1) + + go func() { + f() + + atomic.AddInt32(&s.goroutineCount, -1) + select { + case s.goroutineExitSignal <- struct{}{}: + default: + } + }() +} + +// WaitForGoroutines blocks until all goroutines created by App.Go exit. +func (s *Server) WaitForGoroutines() { + for atomic.LoadInt32(&s.goroutineCount) != 0 { + <-s.goroutineExitSignal + } } var corsAllowedMethods = []string{ diff --git a/app/session.go b/app/session.go index 6d46f3d3a94..e30e03bc1e2 100644 --- a/app/session.go +++ b/app/session.go @@ -29,7 +29,7 @@ func (a *App) GetSession(token string) (*model.Session, *model.AppError) { metrics := a.Metrics var session *model.Session - if ts, ok := a.sessionCache.Get(token); ok { + if ts, ok := a.Srv.sessionCache.Get(token); ok { session = ts.(*model.Session) if metrics != nil { metrics.IncrementMemCacheHitCounterSession() @@ -137,13 +137,13 @@ func (a *App) ClearSessionCacheForUser(userId string) { } func (a *App) ClearSessionCacheForUserSkipClusterSend(userId string) { - keys := a.sessionCache.Keys() + keys := a.Srv.sessionCache.Keys() for _, key := range keys { - if ts, ok := a.sessionCache.Get(key); ok { + if ts, ok := a.Srv.sessionCache.Get(key); ok { session := ts.(*model.Session) if session.UserId == userId { - a.sessionCache.Remove(key) + a.Srv.sessionCache.Remove(key) if a.Metrics != nil { a.Metrics.IncrementMemCacheInvalidationCounterSession() } @@ -155,11 +155,11 @@ func (a *App) ClearSessionCacheForUserSkipClusterSend(userId string) { } func (a *App) AddSessionToCache(session *model.Session) { - a.sessionCache.AddWithExpiresInSecs(session.Token, session, int64(*a.Config().ServiceSettings.SessionCacheInMinutes*60)) + a.Srv.sessionCache.AddWithExpiresInSecs(session.Token, session, int64(*a.Config().ServiceSettings.SessionCacheInMinutes*60)) } func (a *App) SessionCacheLength() int { - return a.sessionCache.Len() + return a.Srv.sessionCache.Len() } func (a *App) RevokeSessionsForDeviceId(userId string, deviceId string, currentSessionId string) *model.AppError { diff --git a/app/session_test.go b/app/session_test.go index 8349a4cecf9..4eb6999151b 100644 --- a/app/session_test.go +++ b/app/session_test.go @@ -22,16 +22,16 @@ func TestCache(t *testing.T) { UserId: model.NewId(), } - th.App.sessionCache.AddWithExpiresInSecs(session.Token, session, 5*60) + th.App.Srv.sessionCache.AddWithExpiresInSecs(session.Token, session, 5*60) - keys := th.App.sessionCache.Keys() + keys := th.App.Srv.sessionCache.Keys() if len(keys) <= 0 { t.Fatal("should have items") } th.App.ClearSessionCacheForUser(session.UserId) - rkeys := th.App.sessionCache.Keys() + rkeys := th.App.Srv.sessionCache.Keys() if len(rkeys) != len(keys)-1 { t.Fatal("should have one less") } diff --git a/app/team.go b/app/team.go index 34216b7fc19..6180611048f 100644 --- a/app/team.go +++ b/app/team.go @@ -466,9 +466,9 @@ func (a *App) JoinUserToTeam(team *model.Team, user *model.User, userRequestorId actor, _ = a.GetUser(userRequestorId) } - a.Go(func() { + a.Srv.Go(func() { pluginContext := &plugin.Context{} - a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { + a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { hooks.UserHasJoinedTeam(pluginContext, tm, actor) return true }, plugin.UserHasJoinedTeamId) @@ -789,9 +789,9 @@ func (a *App) LeaveTeam(team *model.Team, user *model.User, requestorId string) actor, _ = a.GetUser(requestorId) } - a.Go(func() { + a.Srv.Go(func() { pluginContext := &plugin.Context{} - a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { + a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool { hooks.UserHasLeftTeam(pluginContext, teamMember, actor) return true }, plugin.UserHasLeftTeamId) diff --git a/app/timezone.go b/app/timezone.go index 84d912da68b..e014eb025c6 100644 --- a/app/timezone.go +++ b/app/timezone.go @@ -9,7 +9,7 @@ import ( ) func (a *App) Timezones() model.SupportedTimezones { - if cfg := a.timezones.Load(); cfg != nil { + if cfg := a.Srv.timezones.Load(); cfg != nil { return cfg.(model.SupportedTimezones) } return model.SupportedTimezones{} @@ -24,5 +24,5 @@ func (a *App) LoadTimezones() { timezoneCfg := utils.LoadTimezones(timezonePath) - a.timezones.Store(timezoneCfg) + a.Srv.timezones.Store(timezoneCfg) } diff --git a/app/user.go b/app/user.go index e86cbbf3ee2..92269e618f3 100644 --- a/app/user.go +++ b/app/user.go @@ -1034,14 +1034,14 @@ func (a *App) UpdateUser(user *model.User, sendNotifications bool) (*model.User, if sendNotifications { if rusers[0].Email != rusers[1].Email { - a.Go(func() { + a.Srv.Go(func() { if err := a.SendEmailChangeEmail(rusers[1].Email, rusers[0].Email, rusers[0].Locale, a.GetSiteURL()); err != nil { mlog.Error(err.Error()) } }) if a.Config().EmailSettings.RequireEmailVerification { - a.Go(func() { + a.Srv.Go(func() { if err := a.SendEmailVerification(rusers[0]); err != nil { mlog.Error(err.Error()) } @@ -1050,7 +1050,7 @@ func (a *App) UpdateUser(user *model.User, sendNotifications bool) (*model.User, } if rusers[0].Username != rusers[1].Username { - a.Go(func() { + a.Srv.Go(func() { if err := a.SendChangeUsernameEmail(rusers[1].Username, rusers[0].Username, rusers[0].Email, rusers[0].Locale, a.GetSiteURL()); err != nil { mlog.Error(err.Error()) } @@ -1090,7 +1090,7 @@ func (a *App) UpdateMfa(activate bool, userId, token string) *model.AppError { } } - a.Go(func() { + a.Srv.Go(func() { user, err := a.GetUser(userId) if err != nil { mlog.Error(err.Error()) @@ -1133,7 +1133,7 @@ func (a *App) UpdatePasswordSendEmail(user *model.User, newPassword, method stri return err } - a.Go(func() { + a.Srv.Go(func() { if err := a.SendPasswordChangeEmail(user.Email, method, user.Locale, a.GetSiteURL()); err != nil { mlog.Error(err.Error()) } diff --git a/app/web_conn.go b/app/web_conn.go index 084480bbd64..7eea5d3bb35 100644 --- a/app/web_conn.go +++ b/app/web_conn.go @@ -45,7 +45,7 @@ type WebConn struct { func (a *App) NewWebConn(ws *websocket.Conn, session model.Session, t goi18n.TranslateFunc, locale string) *WebConn { if len(session.UserId) > 0 { - a.Go(func() { + a.Srv.Go(func() { a.SetStatusOnline(session.UserId, false) a.UpdateLastActivityAtIfNeeded(session) }) @@ -126,7 +126,7 @@ func (c *WebConn) readPump() { c.WebSocket.SetPongHandler(func(string) error { c.WebSocket.SetReadDeadline(time.Now().Add(PONG_WAIT)) if c.IsAuthenticated() { - c.App.Go(func() { + c.App.Srv.Go(func() { c.App.SetStatusAwayIfNeeded(c.UserId, false) }) } @@ -212,7 +212,7 @@ func (c *WebConn) writePump() { } if c.App.Metrics != nil { - c.App.Go(func() { + c.App.Srv.Go(func() { c.App.Metrics.IncrementWebSocketBroadcast(msg.EventType()) }) } diff --git a/app/web_hub.go b/app/web_hub.go index 5bb86ee3823..f65bec931de 100644 --- a/app/web_hub.go +++ b/app/web_hub.go @@ -62,7 +62,7 @@ func (a *App) NewWebHub() *Hub { func (a *App) TotalWebsocketConnections() int { count := int64(0) - for _, hub := range a.Hubs { + for _, hub := range a.Srv.Hubs { count = count + atomic.LoadInt64(&hub.connectionCount) } @@ -74,13 +74,13 @@ func (a *App) HubStart() { numberOfHubs := runtime.NumCPU() * 2 mlog.Info(fmt.Sprintf("Starting %v websocket hubs", numberOfHubs)) - a.Hubs = make([]*Hub, numberOfHubs) - a.HubsStopCheckingForDeadlock = make(chan bool, 1) + a.Srv.Hubs = make([]*Hub, numberOfHubs) + a.Srv.HubsStopCheckingForDeadlock = make(chan bool, 1) - for i := 0; i < len(a.Hubs); i++ { - a.Hubs[i] = a.NewWebHub() - a.Hubs[i].connectionIndex = i - a.Hubs[i].Start() + for i := 0; i < len(a.Srv.Hubs); i++ { + a.Srv.Hubs[i] = a.NewWebHub() + a.Srv.Hubs[i].connectionIndex = i + a.Srv.Hubs[i].Start() } go func() { @@ -93,7 +93,7 @@ func (a *App) HubStart() { for { select { case <-ticker.C: - for _, hub := range a.Hubs { + for _, hub := range a.Srv.Hubs { if len(hub.broadcast) >= DEADLOCK_WARN { mlog.Error(fmt.Sprintf("Hub processing might be deadlock on hub %v goroutine %v with %v events in the buffer", hub.connectionIndex, hub.goroutineId, len(hub.broadcast))) buf := make([]byte, 1<<16) @@ -109,7 +109,7 @@ func (a *App) HubStart() { } } - case <-a.HubsStopCheckingForDeadlock: + case <-a.Srv.HubsStopCheckingForDeadlock: return } } @@ -120,27 +120,27 @@ func (a *App) HubStop() { mlog.Info("stopping websocket hub connections") select { - case a.HubsStopCheckingForDeadlock <- true: + case a.Srv.HubsStopCheckingForDeadlock <- true: default: mlog.Warn("We appear to have already sent the stop checking for deadlocks command") } - for _, hub := range a.Hubs { + for _, hub := range a.Srv.Hubs { hub.Stop() } - a.Hubs = []*Hub{} + a.Srv.Hubs = []*Hub{} } func (a *App) GetHubForUserId(userId string) *Hub { - if len(a.Hubs) == 0 { + if len(a.Srv.Hubs) == 0 { return nil } hash := fnv.New32a() hash.Write([]byte(userId)) - index := hash.Sum32() % uint32(len(a.Hubs)) - return a.Hubs[index] + index := hash.Sum32() % uint32(len(a.Srv.Hubs)) + return a.Srv.Hubs[index] } func (a *App) HubRegister(webConn *WebConn) { @@ -190,7 +190,7 @@ func (a *App) PublishSkipClusterSend(message *model.WebSocketEvent) { hub.Broadcast(message) } } else { - for _, hub := range a.Hubs { + for _, hub := range a.Srv.Hubs { hub.Broadcast(message) } } @@ -416,7 +416,7 @@ func (h *Hub) Start() { conns := connections.ForUser(webCon.UserId) if len(conns) == 0 { - h.app.Go(func() { + h.app.Srv.Go(func() { h.app.SetStatusOffline(webCon.UserId, false) }) } else { @@ -427,7 +427,7 @@ func (h *Hub) Start() { } } if h.app.IsUserAway(latestActivity) { - h.app.Go(func() { + h.app.Srv.Go(func() { h.app.SetStatusLastActivityAt(webCon.UserId, latestActivity) }) } diff --git a/app/webhook.go b/app/webhook.go index 2557a99e6f9..0d62947dfdb 100644 --- a/app/webhook.go +++ b/app/webhook.go @@ -80,7 +80,7 @@ func (a *App) handleWebhookEvents(post *model.Post, team *model.Team, channel *m TriggerWord: triggerWord, FileIds: strings.Join(post.FileIds, ","), } - a.Go(func(hook *model.OutgoingWebhook) func() { + a.Srv.Go(func(hook *model.OutgoingWebhook) func() { return func() { a.TriggerWebhook(payload, hook, post, channel) } @@ -102,7 +102,7 @@ func (a *App) TriggerWebhook(payload *model.OutgoingWebhookPayload, hook *model. } for _, url := range hook.CallbackURLs { - a.Go(func(url string) func() { + a.Srv.Go(func(url string) func() { return func() { req, _ := http.NewRequest("POST", url, body) req.Header.Set("Content-Type", contentType) diff --git a/app/websocket_router.go b/app/websocket_router.go index c7c9943089e..38937d0fa40 100644 --- a/app/websocket_router.go +++ b/app/websocket_router.go @@ -55,7 +55,7 @@ func (wr *WebSocketRouter) ServeWebSocket(conn *WebConn, r *model.WebSocketReque return } - wr.app.Go(func() { + wr.app.Srv.Go(func() { wr.app.SetStatusOnline(session.UserId, false) wr.app.UpdateLastActivityAtIfNeeded(*session) }) diff --git a/cmd/mattermost/commands/jobserver.go b/cmd/mattermost/commands/jobserver.go index 253ada9323a..238fcaca353 100644 --- a/cmd/mattermost/commands/jobserver.go +++ b/cmd/mattermost/commands/jobserver.go @@ -44,12 +44,12 @@ func jobserverCmdF(command *cobra.Command, args []string) { defer mlog.Info("Stopped Mattermost job server") if !noJobs { - a.Jobs.StartWorkers() - defer a.Jobs.StopWorkers() + a.Srv.Jobs.StartWorkers() + defer a.Srv.Jobs.StopWorkers() } if !noSchedule { - a.Jobs.StartSchedulers() - defer a.Jobs.StopSchedulers() + a.Srv.Jobs.StartSchedulers() + defer a.Srv.Jobs.StopSchedulers() } signalChan := make(chan os.Signal, 1) diff --git a/cmd/mattermost/commands/server.go b/cmd/mattermost/commands/server.go index e996e6a2e03..dbced49b7b4 100644 --- a/cmd/mattermost/commands/server.go +++ b/cmd/mattermost/commands/server.go @@ -147,19 +147,19 @@ func runServer(configFileLocation string, disableConfigWatch bool, usedPlatform manualtesting.Init(api) } - a.Go(func() { + a.Srv.Go(func() { runSecurityJob(a) }) - a.Go(func() { + a.Srv.Go(func() { runDiagnosticsJob(a) }) - a.Go(func() { + a.Srv.Go(func() { runSessionCleanupJob(a) }) - a.Go(func() { + a.Srv.Go(func() { runTokenCleanupJob(a) }) - a.Go(func() { + a.Srv.Go(func() { runCommandWebhookCleanupJob(a) }) @@ -181,12 +181,12 @@ func runServer(configFileLocation string, disableConfigWatch bool, usedPlatform } if *a.Config().JobSettings.RunJobs { - a.Jobs.StartWorkers() - defer a.Jobs.StopWorkers() + a.Srv.Jobs.StartWorkers() + defer a.Srv.Jobs.StopWorkers() } if *a.Config().JobSettings.RunScheduler { - a.Jobs.StartSchedulers() - defer a.Jobs.StopSchedulers() + a.Srv.Jobs.StartSchedulers() + defer a.Srv.Jobs.StopSchedulers() } notifyReady() diff --git a/migrations/scheduler.go b/migrations/scheduler.go index 5778c5cb3b2..9baa56e30ac 100644 --- a/migrations/scheduler.go +++ b/migrations/scheduler.go @@ -61,7 +61,7 @@ func (scheduler *Scheduler) ScheduleJob(cfg *model.Config, pendingJobs bool, las // Check the migration job isn't wedged. if job != nil && job.LastActivityAt < model.GetMillis()-MIGRATION_JOB_WEDGED_TIMEOUT_MILLISECONDS && job.CreateAt < model.GetMillis()-MIGRATION_JOB_WEDGED_TIMEOUT_MILLISECONDS { mlog.Warn("Job appears to be wedged. Rescheduling another instance.", mlog.String("scheduler", scheduler.Name()), mlog.String("wedged_job_id", job.Id), mlog.String("migration_key", key)) - if err := scheduler.App.Jobs.SetJobError(job, nil); err != nil { + if err := scheduler.App.Srv.Jobs.SetJobError(job, nil); err != nil { mlog.Error("Worker: Failed to set job error", mlog.String("scheduler", scheduler.Name()), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) } return scheduler.createJob(key, job, scheduler.App.Srv.Store) @@ -102,7 +102,7 @@ func (scheduler *Scheduler) createJob(migrationKey string, lastJob *model.Job, s JOB_DATA_KEY_MIGRATION_LAST_DONE: lastDone, } - if job, err := scheduler.App.Jobs.CreateJob(model.JOB_TYPE_MIGRATIONS, data); err != nil { + if job, err := scheduler.App.Srv.Jobs.CreateJob(model.JOB_TYPE_MIGRATIONS, data); err != nil { return nil, err } else { return job, nil diff --git a/migrations/worker.go b/migrations/worker.go index 7a64dd6093e..e50b5157826 100644 --- a/migrations/worker.go +++ b/migrations/worker.go @@ -33,7 +33,7 @@ func (m *MigrationsJobInterfaceImpl) MakeWorker() model.Worker { stop: make(chan bool, 1), stopped: make(chan bool, 1), jobs: make(chan model.Job), - jobServer: m.App.Jobs, + jobServer: m.App.Srv.Jobs, app: m.App, } @@ -83,7 +83,7 @@ func (worker *Worker) DoJob(job *model.Job) { cancelCtx, cancelCancelWatcher := context.WithCancel(context.Background()) cancelWatcherChan := make(chan interface{}, 1) - go worker.app.Jobs.CancellationWatcher(cancelCtx, job.Id, cancelWatcherChan) + go worker.app.Srv.Jobs.CancellationWatcher(cancelCtx, job.Id, cancelWatcherChan) defer cancelCancelWatcher() @@ -111,7 +111,7 @@ func (worker *Worker) DoJob(job *model.Job) { return } else { job.Data[JOB_DATA_KEY_MIGRATION_LAST_DONE] = progress - if err := worker.app.Jobs.UpdateInProgressJobData(job); err != nil { + if err := worker.app.Srv.Jobs.UpdateInProgressJobData(job); err != nil { mlog.Error("Worker: Failed to update migration status data for job", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) worker.setJobError(job, err) return @@ -122,20 +122,20 @@ func (worker *Worker) DoJob(job *model.Job) { } func (worker *Worker) setJobSuccess(job *model.Job) { - if err := worker.app.Jobs.SetJobSuccess(job); err != nil { + if err := worker.app.Srv.Jobs.SetJobSuccess(job); err != nil { mlog.Error("Worker: Failed to set success for job", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) worker.setJobError(job, err) } } func (worker *Worker) setJobError(job *model.Job, appError *model.AppError) { - if err := worker.app.Jobs.SetJobError(job, appError); err != nil { + if err := worker.app.Srv.Jobs.SetJobError(job, appError); err != nil { mlog.Error("Worker: Failed to set job error", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) } } func (worker *Worker) setJobCanceled(job *model.Job) { - if err := worker.app.Jobs.SetJobCanceled(job); err != nil { + if err := worker.app.Srv.Jobs.SetJobCanceled(job); err != nil { mlog.Error("Worker: Failed to mark job as canceled", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) } } diff --git a/plugin/scheduler/scheduler.go b/plugin/scheduler/scheduler.go index 7214d6cfdf6..fade4fdc17a 100644 --- a/plugin/scheduler/scheduler.go +++ b/plugin/scheduler/scheduler.go @@ -39,7 +39,7 @@ func (scheduler *Scheduler) NextScheduleTime(cfg *model.Config, now time.Time, p func (scheduler *Scheduler) ScheduleJob(cfg *model.Config, pendingJobs bool, lastSuccessfulJob *model.Job) (*model.Job, *model.AppError) { mlog.Debug("Scheduling Job", mlog.String("scheduler", scheduler.Name())) - if job, err := scheduler.App.Jobs.CreateJob(model.JOB_TYPE_PLUGINS, nil); err != nil { + if job, err := scheduler.App.Srv.Jobs.CreateJob(model.JOB_TYPE_PLUGINS, nil); err != nil { return nil, err } else { return job, nil diff --git a/plugin/scheduler/worker.go b/plugin/scheduler/worker.go index 252e100fab1..ec293defea0 100644 --- a/plugin/scheduler/worker.go +++ b/plugin/scheduler/worker.go @@ -25,7 +25,7 @@ func (m *PluginsJobInterfaceImpl) MakeWorker() model.Worker { stop: make(chan bool, 1), stopped: make(chan bool, 1), jobs: make(chan model.Job), - jobServer: m.App.Jobs, + jobServer: m.App.Srv.Jobs, app: m.App, } @@ -86,14 +86,14 @@ func (worker *Worker) DoJob(job *model.Job) { } func (worker *Worker) setJobSuccess(job *model.Job) { - if err := worker.app.Jobs.SetJobSuccess(job); err != nil { + if err := worker.app.Srv.Jobs.SetJobSuccess(job); err != nil { mlog.Error("Worker: Failed to set success for job", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) worker.setJobError(job, err) } } func (worker *Worker) setJobError(job *model.Job, appError *model.AppError) { - if err := worker.app.Jobs.SetJobError(job, appError); err != nil { + if err := worker.app.Srv.Jobs.SetJobError(job, appError); err != nil { mlog.Error("Worker: Failed to set job error", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) } } diff --git a/web/saml.go b/web/saml.go index 773b3498080..a1dc9569c72 100644 --- a/web/saml.go +++ b/web/saml.go @@ -115,7 +115,7 @@ func completeSaml(c *Context, w http.ResponseWriter, r *http.Request) { case model.OAUTH_ACTION_SIGNUP: teamId := relayProps["team_id"] if len(teamId) > 0 { - c.App.Go(func() { + c.App.Srv.Go(func() { if err := c.App.AddUserToTeamByTeamId(teamId, user); err != nil { mlog.Error(err.Error()) } else { @@ -129,7 +129,7 @@ func completeSaml(c *Context, w http.ResponseWriter, r *http.Request) { return } c.LogAuditWithUserId(user.Id, "Revoked all sessions for user") - c.App.Go(func() { + c.App.Srv.Go(func() { if err := c.App.SendSignInChangeEmail(user.Email, strings.Title(model.USER_AUTH_SERVICE_SAML)+" SSO", user.Locale, c.App.GetSiteURL()); err != nil { mlog.Error(err.Error()) }