mirror of
https://github.com/mattermost/mattermost.git
synced 2026-04-15 05:57:37 -04:00
* ci: enable fullyparallel mode for server tests Replace os.Setenv, os.Chdir, and global state mutations with parallel-safe alternatives (t.Setenv, t.Chdir, test hooks) across 37 files. Refactor GetLogRootPath and MM_INSTALL_TYPE to use package-level test hooks instead of environment variables. This enables gotestsum --fullparallel, allowing all test packages to run with maximum parallelism within each shard. Co-authored-by: Claude <claude@anthropic.com> * ci: split fullyparallel from continue-on-error in workflow template - Add new boolean input 'allow-failure' separate from 'fullyparallel' - Change continue-on-error to use allow-failure instead of fullyparallel - Update server-ci.yml to pass allow-failure: true for test coverage job - Allows independent control of parallel execution and failure tolerance Co-authored-by: Claude <claude@anthropic.com> * fix: protect TestOverrideLogRootPath with sync.Mutex for parallel tests - Replace global var TestOverrideLogRootPath with mutex-protected functions - Add SetTestOverrideLogRootPath() and getTestOverrideLogRootPath() functions - Update GetLogRootPath() to use thread-safe getter - Update all test files to use SetTestOverrideLogRootPath() with t.Cleanup() - Fixes race condition when running tests with t.Parallel() Co-authored-by: Claude <claude@anthropic.com> * fix: configure audit settings before server setup in tests - Move ExperimentalAuditSettings from UpdateConfig() to config defaults - Pass audit config via app.Config() option in SetupWithServerOptions() - Fixes audit test setup ordering to configure BEFORE server initialization - Resolves CodeRabbit's audit config timing issue in api4 tests Co-authored-by: Claude <claude@anthropic.com> * fix: implement SetTestOverrideLogRootPath mutex in logger.go The previous commit updated test callers to use SetTestOverrideLogRootPath() but didn't actually create the function in config/logger.go, causing build failures across all CI shards. This commit: - Replaces the exported var TestOverrideLogRootPath with mutex-protected unexported state (testOverrideLogRootPath + testOverrideLogRootMu) - Adds exported SetTestOverrideLogRootPath() setter - Adds unexported getTestOverrideLogRootPath() getter - Updates GetLogRootPath() to use the thread-safe getter - Fixes log_test.go callers that were missed in the previous commit Co-authored-by: Claude <claude@anthropic.com> * fix(test): use SetupConfig for access_control feature flag registration InitAccessControlPolicy() checks FeatureFlags.AttributeBasedAccessControl at route registration time during server startup. Setting the flag via UpdateConfig after Setup() is too late — routes are never registered and API calls return 404. Use SetupConfig() to pass the feature flag in the initial config before server startup, ensuring routes are properly registered. Co-authored-by: Claude <claude@anthropic.com> * fix(test): restore BurnOnRead flag state in TestRevealPost subtest The 'feature not enabled' subtest disables BurnOnRead without restoring it via t.Cleanup. Subsequent subtests inherit the disabled state, which can cause 501 errors when they expect the feature to be available. Add t.Cleanup to restore FeatureFlags.BurnOnRead = true after the subtest completes. Co-authored-by: Claude <claude@anthropic.com> * fix(test): restore EnableSharedChannelsMemberSync flag via t.Cleanup The test disables EnableSharedChannelsMemberSync without restoring it. If the subtest exits early (e.g., require failure), later sibling subtests inherit a disabled flag and become flaky. Add t.Cleanup to restore the flag after the subtest completes. Co-authored-by: Claude <claude@anthropic.com> * Fix test parallelism: use instance-scoped overrides and init-time audit config Replace package-level test globals (TestOverrideInstallType, SetTestOverrideLogRootPath) with fields on PlatformService so each test gets its own instance without process-wide mutation. Fix three audit tests (TestUserLoginAudit, TestLogoutAuditAuthStatus, TestUpdatePasswordAudit) that configured the audit logger after server init — the audit logger only reads config at startup, so pass audit settings via app.Config() at init time instead. Also revert the Go 1.24.13 downgrade and bump mattermost-govet to v2.0.2 for Go 1.25.8 compatibility. * Fix audit unit tests * Fix MMCLOUDURL unit tests * Fixed unit tests using MM_NOTIFY_ADMIN_COOL_OFF_DAYS * Make app migrations idempotent for parallel test safety Change System().Save() to System().SaveOrUpdate() in all migration completion markers. When two parallel tests share a database pool entry, both may race through the check-then-insert migration pattern. Save() causes a duplicate key fatal crash; SaveOrUpdate() makes the second write a harmless no-op. * test: address review feedback on fullyparallel PR - Use SetLogRootPathOverride() setter instead of direct field access in platform/support_packet_test.go and platform/log_test.go (pvev) - Restore TestGetLogRootPath in config/logger_test.go to keep MM_LOG_PATH env var coverage; test uses t.Setenv so it runs serially which is fine (pvev) - Fix misleading comment in config_test.go: code uses t.Setenv, not os.Setenv (jgheithcock) Co-authored-by: Claude <claude@anthropic.com> * fix: add missing os import in post_test.go The os import was dropped during a merge conflict resolution while burn-on-read shared channel tests from master still use os.Setenv. Co-authored-by: Claude <claude@anthropic.com> --------- Co-authored-by: Claude <claude@anthropic.com> Co-authored-by: wiggin77 <wiggin77@warpmail.net> Co-authored-by: Mattermost Build <build@mattermost.com>
665 lines
21 KiB
Go
665 lines
21 KiB
Go
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
|
// See LICENSE.txt for license information.
|
|
|
|
package platform
|
|
|
|
import (
|
|
"crypto/ecdsa"
|
|
"errors"
|
|
"fmt"
|
|
"hash/maphash"
|
|
"net/http"
|
|
"runtime"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/mattermost/mattermost/server/public/model"
|
|
"github.com/mattermost/mattermost/server/public/plugin"
|
|
"github.com/mattermost/mattermost/server/public/shared/mlog"
|
|
"github.com/mattermost/mattermost/server/v8/channels/app/featureflag"
|
|
"github.com/mattermost/mattermost/server/v8/channels/jobs"
|
|
"github.com/mattermost/mattermost/server/v8/channels/store"
|
|
"github.com/mattermost/mattermost/server/v8/channels/store/localcachelayer"
|
|
"github.com/mattermost/mattermost/server/v8/channels/store/retrylayer"
|
|
"github.com/mattermost/mattermost/server/v8/channels/store/searchlayer"
|
|
"github.com/mattermost/mattermost/server/v8/channels/store/sqlstore"
|
|
"github.com/mattermost/mattermost/server/v8/channels/store/timerlayer"
|
|
"github.com/mattermost/mattermost/server/v8/config"
|
|
"github.com/mattermost/mattermost/server/v8/einterfaces"
|
|
"github.com/mattermost/mattermost/server/v8/platform/services/cache"
|
|
"github.com/mattermost/mattermost/server/v8/platform/services/searchengine"
|
|
"github.com/mattermost/mattermost/server/v8/platform/shared/filestore"
|
|
)
|
|
|
|
// PlatformService is the service for the platform related tasks. It is
|
|
// responsible for non-entity related functionalities that are required
|
|
// by a product such as database access, configuration access, licensing etc.
|
|
type PlatformService struct {
|
|
sqlStore *sqlstore.SqlStore
|
|
Store store.Store
|
|
newStore func() (store.Store, error)
|
|
storeOptions []sqlstore.Option
|
|
|
|
WebSocketRouter *WebSocketRouter
|
|
|
|
configStore *config.Store
|
|
|
|
filestore filestore.FileBackend
|
|
exportFilestore filestore.FileBackend
|
|
|
|
// Channel for batching status updates
|
|
statusUpdateChan chan *model.Status
|
|
statusUpdateExitSignal chan struct{}
|
|
statusUpdateDoneSignal chan struct{}
|
|
|
|
cacheProvider cache.Provider
|
|
statusCache cache.Cache
|
|
sessionCache cache.Cache
|
|
|
|
asymmetricSigningKey atomic.Pointer[ecdsa.PrivateKey]
|
|
clientConfig atomic.Value
|
|
clientConfigHash atomic.Value
|
|
limitedClientConfig atomic.Value
|
|
|
|
isFirstUserAccountLock sync.Mutex
|
|
isFirstUserAccount atomic.Bool
|
|
|
|
logger *mlog.Logger
|
|
|
|
startMetrics bool
|
|
metrics *platformMetrics
|
|
metricsIFace einterfaces.MetricsInterface
|
|
|
|
featureFlagSynchronizerMutex sync.Mutex
|
|
featureFlagSynchronizer *featureflag.Synchronizer
|
|
featureFlagStop chan struct{}
|
|
featureFlagStopped chan struct{}
|
|
|
|
licenseValue atomic.Pointer[model.License]
|
|
clientLicenseValue atomic.Value
|
|
licenseListeners map[string]func(*model.License, *model.License)
|
|
licenseManager einterfaces.LicenseInterface
|
|
|
|
telemetryId string
|
|
configListenerId string
|
|
licenseListenerId string
|
|
|
|
clusterLeaderListeners sync.Map
|
|
clusterIFace einterfaces.ClusterInterface
|
|
Busy *Busy
|
|
|
|
SearchEngine *searchengine.Broker
|
|
searchConfigListenerId string
|
|
searchLicenseListenerId string
|
|
|
|
ldapDiagnostic einterfaces.LdapDiagnosticInterface
|
|
|
|
Jobs *jobs.JobServer
|
|
|
|
hubs []*Hub
|
|
hashSeed maphash.Seed
|
|
|
|
goroutineCount int32
|
|
goroutineExitSignal chan struct{}
|
|
goroutineBuffered chan struct{}
|
|
|
|
additionalClusterHandlers map[model.ClusterEvent]einterfaces.ClusterMessageHandler
|
|
|
|
shareChannelServiceMux sync.RWMutex
|
|
sharedChannelService SharedChannelServiceIFace
|
|
|
|
pluginEnv HookRunner
|
|
|
|
// This is a test mode setting used to enable Redis
|
|
// without a license.
|
|
forceEnableRedis bool
|
|
|
|
pdpService einterfaces.PolicyDecisionPointInterface
|
|
|
|
// installTypeOverride overrides MM_INSTALL_TYPE in support packet diagnostics.
|
|
installTypeOverride string
|
|
|
|
// logRootPathOverride overrides MM_LOG_PATH for log root path validation.
|
|
logRootPathOverride string
|
|
}
|
|
|
|
// SetInstallTypeOverride sets the install type override for support packet diagnostics.
|
|
func (ps *PlatformService) SetInstallTypeOverride(v string) {
|
|
ps.installTypeOverride = v
|
|
}
|
|
|
|
// SetLogRootPathOverride sets the log root path override for log file validation.
|
|
func (ps *PlatformService) SetLogRootPathOverride(v string) {
|
|
ps.logRootPathOverride = v
|
|
}
|
|
|
|
type HookRunner interface {
|
|
RunMultiHook(hookRunnerFunc func(hooks plugin.Hooks, _ *model.Manifest) bool, hookId int)
|
|
GetPluginsEnvironment() *plugin.Environment
|
|
}
|
|
|
|
// New creates a new PlatformService.
|
|
func New(sc ServiceConfig, options ...Option) (*PlatformService, error) {
|
|
// Step 0: Create the PlatformService.
|
|
// ConfigStore is and should be handled on a upper level.
|
|
ps := &PlatformService{
|
|
Store: sc.Store,
|
|
clusterIFace: sc.Cluster,
|
|
hashSeed: maphash.MakeSeed(),
|
|
goroutineExitSignal: make(chan struct{}, 1),
|
|
goroutineBuffered: make(chan struct{}, runtime.NumCPU()),
|
|
WebSocketRouter: &WebSocketRouter{
|
|
handlers: make(map[string]webSocketHandler),
|
|
},
|
|
licenseListeners: map[string]func(*model.License, *model.License){},
|
|
additionalClusterHandlers: map[model.ClusterEvent]einterfaces.ClusterMessageHandler{},
|
|
statusUpdateChan: make(chan *model.Status, statusUpdateBufferSize),
|
|
statusUpdateExitSignal: make(chan struct{}),
|
|
statusUpdateDoneSignal: make(chan struct{}),
|
|
}
|
|
|
|
// Assume the first user account has not been created yet. A call to the DB will later check if this is really the case.
|
|
ps.isFirstUserAccount.Store(true)
|
|
|
|
// Apply options, some of the options overrides the default config actually.
|
|
for _, option := range options {
|
|
if err2 := option(ps); err2 != nil {
|
|
return nil, fmt.Errorf("failed to apply option: %w", err2)
|
|
}
|
|
}
|
|
|
|
// the config store is not set, we need to create a new one
|
|
if ps.configStore == nil {
|
|
innerStore, err := config.NewFileStore("config.json", true)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to load config from file: %w", err)
|
|
}
|
|
|
|
configStore, err := config.NewStoreFromBacking(innerStore, nil, false)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to load config from file: %w", err)
|
|
}
|
|
|
|
ps.configStore = configStore
|
|
}
|
|
|
|
// Step 1: Start logging.
|
|
err := ps.initLogging()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize logging: %w", err)
|
|
}
|
|
|
|
ps.Log().Info("Server is initializing...", mlog.String("go_version", runtime.Version()))
|
|
|
|
logCurrentVersion := fmt.Sprintf("Current version is %v (%v/%v/%v/%v)", model.CurrentVersion, model.BuildNumber, model.BuildDate, model.BuildHash, model.BuildHashEnterprise)
|
|
ps.Log().Info(
|
|
logCurrentVersion,
|
|
mlog.String("current_version", model.CurrentVersion),
|
|
mlog.String("build_number", model.BuildNumber),
|
|
mlog.String("build_date", model.BuildDate),
|
|
mlog.String("build_hash", model.BuildHash),
|
|
mlog.String("build_hash_enterprise", model.BuildHashEnterprise),
|
|
mlog.String("service_environment", model.GetServiceEnvironment()),
|
|
)
|
|
|
|
if model.BuildEnterpriseReady == "true" {
|
|
isTrial := false
|
|
if licence := ps.License(); licence != nil {
|
|
isTrial = licence.IsTrial
|
|
}
|
|
ps.Log().Info(
|
|
"Enterprise Build",
|
|
mlog.Bool("enterprise_build", true),
|
|
mlog.Bool("is_trial", isTrial),
|
|
)
|
|
} else {
|
|
ps.Log().Info("Team Edition Build", mlog.Bool("enterprise_build", false))
|
|
}
|
|
|
|
// Step 2: Cache provider.
|
|
cacheConfig := ps.configStore.Get().CacheSettings
|
|
if *cacheConfig.CacheType == model.CacheTypeLRU {
|
|
ps.cacheProvider = cache.NewProvider()
|
|
} else if *cacheConfig.CacheType == model.CacheTypeRedis {
|
|
ps.cacheProvider, err = cache.NewRedisProvider(
|
|
&cache.RedisOptions{
|
|
RedisAddr: *cacheConfig.RedisAddress,
|
|
RedisPassword: *cacheConfig.RedisPassword,
|
|
RedisDB: *cacheConfig.RedisDB,
|
|
RedisCachePrefix: *cacheConfig.RedisCachePrefix,
|
|
DisableCache: *cacheConfig.DisableClientCache,
|
|
},
|
|
)
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to create cache provider: %w", err)
|
|
}
|
|
|
|
res, err := ps.cacheProvider.Connect()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to connect to cache provider: %w", err)
|
|
}
|
|
|
|
ps.Log().Info("Successfully connected to cache backend", mlog.String("backend", *cacheConfig.CacheType), mlog.String("result", res))
|
|
|
|
// Step 3: Search Engine
|
|
searchEngine := searchengine.NewBroker(ps.Config())
|
|
ps.SearchEngine = searchEngine
|
|
|
|
// Step 4: Init Enterprise
|
|
// Depends on step 3 (s.SearchEngine must be non-nil)
|
|
ps.initEnterprise()
|
|
|
|
// Step 5: Init Metrics
|
|
if metricsInterfaceFn != nil && ps.metricsIFace == nil { // if the metrics interface is set by options, do not override it
|
|
ps.metricsIFace = metricsInterfaceFn(ps, *ps.configStore.Get().SqlSettings.DriverName, *ps.configStore.Get().SqlSettings.DataSource)
|
|
}
|
|
|
|
ps.cacheProvider.SetMetrics(ps.metricsIFace)
|
|
|
|
// Step 6: Store.
|
|
// Depends on Step 0 (config), 1 (cacheProvider), 3 (search engine), 5 (metrics) and cluster.
|
|
if ps.newStore == nil {
|
|
ps.newStore = func() (store.Store, error) {
|
|
// The layer cake is as follows: (From bottom to top)
|
|
// SQL layer
|
|
// |
|
|
// Retry layer
|
|
// |
|
|
// Search layer
|
|
// |
|
|
// Timer layer
|
|
// |
|
|
// Cache layer
|
|
opts := append(ps.storeOptions, sqlstore.WithFeatureFlags(func() *model.FeatureFlags {
|
|
return ps.Config().FeatureFlags
|
|
}))
|
|
ps.sqlStore, err = sqlstore.New(ps.Config().SqlSettings, ps.Log(), ps.metricsIFace, opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
searchStore := searchlayer.NewSearchLayer(
|
|
retrylayer.New(ps.sqlStore),
|
|
ps.SearchEngine,
|
|
ps.Config(),
|
|
)
|
|
|
|
ps.AddConfigListener(func(prevCfg, cfg *model.Config) {
|
|
searchStore.UpdateConfig(cfg)
|
|
})
|
|
|
|
lcl, err2 := localcachelayer.NewLocalCacheLayer(
|
|
timerlayer.New(searchStore, ps.metricsIFace),
|
|
ps.metricsIFace,
|
|
ps.clusterIFace,
|
|
ps.cacheProvider,
|
|
ps.Log(),
|
|
)
|
|
if err2 != nil {
|
|
return nil, fmt.Errorf("cannot create local cache layer: %w", err2)
|
|
}
|
|
|
|
license := ps.License()
|
|
ps.sqlStore.UpdateLicense(license)
|
|
ps.AddLicenseListener(func(oldLicense, newLicense *model.License) {
|
|
ps.sqlStore.UpdateLicense(newLicense)
|
|
})
|
|
|
|
return lcl, nil
|
|
}
|
|
}
|
|
|
|
ps.Store, err = ps.newStore()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot create store: %w", err)
|
|
}
|
|
|
|
// Step 7: initialize status and session cache.
|
|
// We need to do this because ps.LoadLicense() called in step 8, could
|
|
// end up calling InvalidateAllCaches, so the status and session caches
|
|
// need to be initialized before that.
|
|
|
|
// Note: we hardcode the session and status cache to LRU because they lead
|
|
// to a lot of SCAN calls in case of Redis. We could potentially have a
|
|
// reverse mapping to avoid the scan, but this needs more complicated code.
|
|
// Leaving this for now.
|
|
ps.statusCache, err = cache.NewProvider().NewCache(&cache.CacheOptions{
|
|
Name: "Status",
|
|
Size: model.StatusCacheSize,
|
|
Striped: true,
|
|
StripedBuckets: max(runtime.NumCPU()-1, 1),
|
|
DefaultExpiry: 30 * time.Minute,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to create status cache: %w", err)
|
|
}
|
|
|
|
ps.sessionCache, err = cache.NewProvider().NewCache(&cache.CacheOptions{
|
|
Name: "Session",
|
|
Size: model.SessionCacheSize,
|
|
Striped: true,
|
|
StripedBuckets: max(runtime.NumCPU()-1, 1),
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not create session cache: %w", err)
|
|
}
|
|
|
|
// Step 8: Init License
|
|
if model.BuildEnterpriseReady == "true" {
|
|
ps.LoadLicense()
|
|
}
|
|
license := ps.License()
|
|
|
|
// This is a hack because ideally we wouldn't even have started the Redis client
|
|
// if the license didn't have clustering. But there's an intricate deadlock
|
|
// where license cannot be loaded before store, and store cannot be loaded before
|
|
// cache. So loading license before loading cache is an uphill battle.
|
|
if (license == nil || !*license.Features.Cluster) && *cacheConfig.CacheType == model.CacheTypeRedis && !ps.forceEnableRedis {
|
|
return nil, fmt.Errorf("Redis cannot be used in an instance without a license or a license without clustering")
|
|
}
|
|
|
|
// Step 9: Initialize filestore
|
|
if ps.filestore == nil {
|
|
insecure := ps.Config().ServiceSettings.EnableInsecureOutgoingConnections
|
|
backend, err2 := filestore.NewFileBackend(filestore.NewFileBackendSettingsFromConfig(&ps.Config().FileSettings, license != nil && *license.Features.Compliance, insecure != nil && *insecure))
|
|
if err2 != nil {
|
|
return nil, fmt.Errorf("failed to initialize filebackend: %w", err2)
|
|
}
|
|
|
|
ps.filestore = backend
|
|
}
|
|
|
|
if ps.exportFilestore == nil {
|
|
ps.exportFilestore = ps.filestore
|
|
if *ps.Config().FileSettings.DedicatedExportStore {
|
|
mlog.Info("Setting up dedicated export filestore", mlog.String("driver_name", *ps.Config().FileSettings.ExportDriverName))
|
|
backend, errFileBack := filestore.NewExportFileBackend(filestore.NewExportFileBackendSettingsFromConfig(&ps.Config().FileSettings, license != nil && *license.Features.Compliance, false))
|
|
if errFileBack != nil {
|
|
return nil, fmt.Errorf("failed to initialize export filebackend: %w", errFileBack)
|
|
}
|
|
|
|
ps.exportFilestore = backend
|
|
}
|
|
}
|
|
|
|
// Step 10: Init Metrics Server depends on step 6 (store) and 8 (license)
|
|
if ps.startMetrics {
|
|
if mErr := ps.resetMetrics(); mErr != nil {
|
|
return nil, mErr
|
|
}
|
|
|
|
ps.configStore.AddListener(func(oldCfg, newCfg *model.Config) {
|
|
if *oldCfg.MetricsSettings.Enable != *newCfg.MetricsSettings.Enable || *oldCfg.MetricsSettings.ListenAddress != *newCfg.MetricsSettings.ListenAddress {
|
|
if mErr := ps.resetMetrics(); mErr != nil {
|
|
mlog.Warn("Failed to reset metrics", mlog.Err(mErr))
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
// Step 11: Init AsymmetricSigningKey depends on step 6 (store)
|
|
if err = ps.EnsureAsymmetricSigningKey(); err != nil {
|
|
return nil, fmt.Errorf("unable to ensure asymmetric signing key: %w", err)
|
|
}
|
|
|
|
ps.Busy = NewBusy(ps.clusterIFace)
|
|
|
|
// Enable developer settings and mmctl local mode if this is a "dev" build
|
|
if model.BuildNumber == "dev" {
|
|
ps.UpdateConfig(func(cfg *model.Config) {
|
|
*cfg.ServiceSettings.EnableDeveloper = true
|
|
*cfg.ServiceSettings.EnableLocalMode = true
|
|
})
|
|
}
|
|
|
|
ps.AddLicenseListener(func(oldLicense, newLicense *model.License) {
|
|
wasLicensed := (oldLicense != nil && *oldLicense.Features.Metrics) || (model.BuildNumber == "dev")
|
|
isLicensed := (newLicense != nil && *newLicense.Features.Metrics) || (model.BuildNumber == "dev")
|
|
|
|
if wasLicensed == isLicensed || !ps.startMetrics {
|
|
return
|
|
}
|
|
|
|
if err := ps.RestartMetrics(); err != nil {
|
|
ps.logger.Error("Failed to reset metrics server", mlog.Err(err))
|
|
}
|
|
})
|
|
|
|
if err := ps.SearchEngine.UpdateConfig(ps.Config()); err != nil {
|
|
ps.logger.Error("Failed to update search engine config", mlog.Err(err))
|
|
}
|
|
|
|
searchConfigListenerId, searchLicenseListenerId := ps.StartSearchEngine()
|
|
ps.searchConfigListenerId = searchConfigListenerId
|
|
ps.searchLicenseListenerId = searchLicenseListenerId
|
|
|
|
return ps, nil
|
|
}
|
|
|
|
func (ps *PlatformService) Start(broadcastHooks map[string]BroadcastHook) error {
|
|
// Start the status update processor.
|
|
// Must be done before hub start.
|
|
go ps.processStatusUpdates()
|
|
|
|
ps.hubStart(broadcastHooks)
|
|
|
|
ps.configListenerId = ps.AddConfigListener(func(_, _ *model.Config) {
|
|
ps.regenerateClientConfig()
|
|
|
|
message := model.NewWebSocketEvent(model.WebsocketEventConfigChanged, "", "", "", nil, "")
|
|
|
|
message.Add("config", ps.ClientConfigWithComputed())
|
|
ps.Go(func() {
|
|
ps.Publish(message)
|
|
})
|
|
|
|
if err := ps.ReconfigureLogger(); err != nil {
|
|
mlog.Error("Error re-configuring logging after config change", mlog.Err(err))
|
|
return
|
|
}
|
|
})
|
|
|
|
ps.licenseListenerId = ps.AddLicenseListener(func(oldLicense, newLicense *model.License) {
|
|
ps.regenerateClientConfig()
|
|
|
|
message := model.NewWebSocketEvent(model.WebsocketEventLicenseChanged, "", "", "", nil, "")
|
|
message.Add("license", ps.GetSanitizedClientLicense())
|
|
ps.Publish(message)
|
|
})
|
|
return nil
|
|
}
|
|
|
|
func (ps *PlatformService) ShutdownMetrics() error {
|
|
if ps.metrics != nil {
|
|
return ps.metrics.stopMetricsServer()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ps *PlatformService) ShutdownConfig() error {
|
|
ps.RemoveConfigListener(ps.configListenerId)
|
|
|
|
if ps.configStore != nil {
|
|
err := ps.configStore.Close()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to close config store: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ps *PlatformService) SetTelemetryId(id string) {
|
|
ps.telemetryId = id
|
|
|
|
ps.PostTelemetryIdHook()
|
|
}
|
|
|
|
// PostTelemetryIdHook triggers necessary events to propagate telemtery ID
|
|
func (ps *PlatformService) PostTelemetryIdHook() {
|
|
ps.regenerateClientConfig()
|
|
}
|
|
|
|
func (ps *PlatformService) SetLogger(logger *mlog.Logger) {
|
|
ps.logger = logger
|
|
}
|
|
|
|
func (ps *PlatformService) initEnterprise() {
|
|
if clusterInterface != nil && ps.clusterIFace == nil {
|
|
ps.clusterIFace = clusterInterface(ps)
|
|
}
|
|
|
|
if elasticsearchInterface != nil {
|
|
ps.SearchEngine.RegisterElasticsearchEngine(elasticsearchInterface(ps))
|
|
}
|
|
|
|
if ldapDiagnosticInterface != nil {
|
|
ps.ldapDiagnostic = ldapDiagnosticInterface(ps)
|
|
}
|
|
|
|
if licenseInterface != nil {
|
|
ps.licenseManager = licenseInterface(ps)
|
|
}
|
|
|
|
if accessControlServiceInterface != nil {
|
|
ps.pdpService = accessControlServiceInterface(ps)
|
|
}
|
|
}
|
|
|
|
func (ps *PlatformService) TotalWebsocketConnections() int {
|
|
// This method is only called after the hub is initialized.
|
|
// Therefore, no mutex is needed to protect s.hubs.
|
|
count := int64(0)
|
|
for _, hub := range ps.hubs {
|
|
count = count + atomic.LoadInt64(&hub.connectionCount)
|
|
}
|
|
|
|
return int(count)
|
|
}
|
|
|
|
func (ps *PlatformService) Shutdown() error {
|
|
ps.HubStop()
|
|
|
|
// Shutdown status processor.
|
|
// Must be done after hub shutdown.
|
|
close(ps.statusUpdateExitSignal)
|
|
// wait for it to be stopped.
|
|
<-ps.statusUpdateDoneSignal
|
|
|
|
ps.RemoveLicenseListener(ps.licenseListenerId)
|
|
|
|
// we need to wait the goroutines to finish before closing the store
|
|
// and this needs to be called after hub stop because hub generates goroutines
|
|
// when it is active. If we wait first we have no mechanism to prevent adding
|
|
// more go routines hence they still going to be invoked.
|
|
ps.waitForGoroutines()
|
|
|
|
if ps.Store != nil {
|
|
ps.Store.Close()
|
|
}
|
|
|
|
if ps.cacheProvider != nil {
|
|
if err := ps.cacheProvider.Close(); err != nil {
|
|
return fmt.Errorf("unable to cleanly shutdown cache: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ps *PlatformService) CacheProvider() cache.Provider {
|
|
return ps.cacheProvider
|
|
}
|
|
|
|
// SetSqlStore is used for plugin testing
|
|
func (ps *PlatformService) SetSqlStore(s *sqlstore.SqlStore) {
|
|
ps.sqlStore = s
|
|
}
|
|
|
|
func (ps *PlatformService) SetSharedChannelService(s SharedChannelServiceIFace) {
|
|
ps.shareChannelServiceMux.Lock()
|
|
defer ps.shareChannelServiceMux.Unlock()
|
|
ps.sharedChannelService = s
|
|
}
|
|
|
|
func (ps *PlatformService) GetSharedChannelService() SharedChannelServiceIFace {
|
|
ps.shareChannelServiceMux.RLock()
|
|
defer ps.shareChannelServiceMux.RUnlock()
|
|
return ps.sharedChannelService
|
|
}
|
|
|
|
func (ps *PlatformService) SetPluginsEnvironment(runner HookRunner) {
|
|
ps.pluginEnv = runner
|
|
}
|
|
|
|
// GetPluginStatuses meant to be used by cluster implementation
|
|
func (ps *PlatformService) GetPluginStatuses() (model.PluginStatuses, *model.AppError) {
|
|
if ps.pluginEnv == nil || ps.pluginEnv.GetPluginsEnvironment() == nil {
|
|
return nil, model.NewAppError("GetPluginStatuses", "app.plugin.disabled.app_error", nil, "", http.StatusNotImplemented)
|
|
}
|
|
|
|
pluginStatuses, err := ps.pluginEnv.GetPluginsEnvironment().Statuses()
|
|
if err != nil {
|
|
return nil, model.NewAppError("GetPluginStatuses", "app.plugin.get_statuses.app_error", nil, "", http.StatusInternalServerError).Wrap(err)
|
|
}
|
|
|
|
// Add our cluster ID
|
|
for _, status := range pluginStatuses {
|
|
if ps.Cluster() != nil {
|
|
status.ClusterId = ps.Cluster().GetClusterId()
|
|
} else {
|
|
status.ClusterId = ""
|
|
}
|
|
}
|
|
|
|
return pluginStatuses, nil
|
|
}
|
|
|
|
func (ps *PlatformService) getPluginManifests() ([]*model.Manifest, error) {
|
|
if ps.pluginEnv == nil {
|
|
return nil, errors.New("plugin environment not initialized")
|
|
}
|
|
|
|
pluginsEnvironment := ps.pluginEnv.GetPluginsEnvironment()
|
|
if pluginsEnvironment == nil {
|
|
return nil, model.NewAppError("getPluginManifests", "app.plugin.disabled.app_error", nil, "", http.StatusNotImplemented)
|
|
}
|
|
|
|
plugins, err := pluginsEnvironment.Available()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get list of available plugins: %w", err)
|
|
}
|
|
|
|
manifests := make([]*model.Manifest, len(plugins))
|
|
for i := range plugins {
|
|
manifests[i] = plugins[i].Manifest
|
|
}
|
|
|
|
return manifests, nil
|
|
}
|
|
|
|
func (ps *PlatformService) FileBackend() filestore.FileBackend {
|
|
return ps.filestore
|
|
}
|
|
|
|
func (ps *PlatformService) ExportFileBackend() filestore.FileBackend {
|
|
return ps.exportFilestore
|
|
}
|
|
|
|
func (ps *PlatformService) LdapDiagnostic() einterfaces.LdapDiagnosticInterface {
|
|
return ps.ldapDiagnostic
|
|
}
|
|
|
|
// DatabaseTypeAndSchemaVersion returns the database type and current version of the schema
|
|
func (ps *PlatformService) DatabaseTypeAndSchemaVersion() (string, string, error) {
|
|
schemaVersion, err := ps.Store.GetDBSchemaVersion()
|
|
if err != nil {
|
|
return "", "", err
|
|
}
|
|
|
|
return model.SafeDereference(ps.Config().SqlSettings.DriverName), strconv.Itoa(schemaVersion), nil
|
|
}
|