mattermost/server/channels/app/server.go
Pavel Zeman 6fdef8c9cc
ci: enable fullyparallel mode for server tests (#35816)
* ci: enable fullyparallel mode for server tests

Replace os.Setenv, os.Chdir, and global state mutations with
parallel-safe alternatives (t.Setenv, t.Chdir, test hooks) across
37 files. Refactor GetLogRootPath and MM_INSTALL_TYPE to use
package-level test hooks instead of environment variables.

This enables gotestsum --fullparallel, allowing all test packages
to run with maximum parallelism within each shard.

Co-authored-by: Claude <claude@anthropic.com>

* ci: split fullyparallel from continue-on-error in workflow template

- Add new boolean input 'allow-failure' separate from 'fullyparallel'
- Change continue-on-error to use allow-failure instead of fullyparallel
- Update server-ci.yml to pass allow-failure: true for test coverage job
- Allows independent control of parallel execution and failure tolerance

Co-authored-by: Claude <claude@anthropic.com>

* fix: protect TestOverrideLogRootPath with sync.Mutex for parallel tests

- Replace global var TestOverrideLogRootPath with mutex-protected functions
- Add SetTestOverrideLogRootPath() and getTestOverrideLogRootPath() functions
- Update GetLogRootPath() to use thread-safe getter
- Update all test files to use SetTestOverrideLogRootPath() with t.Cleanup()
- Fixes race condition when running tests with t.Parallel()

Co-authored-by: Claude <claude@anthropic.com>

* fix: configure audit settings before server setup in tests

- Move ExperimentalAuditSettings from UpdateConfig() to config defaults
- Pass audit config via app.Config() option in SetupWithServerOptions()
- Fixes audit test setup ordering to configure BEFORE server initialization
- Resolves CodeRabbit's audit config timing issue in api4 tests

Co-authored-by: Claude <claude@anthropic.com>

* fix: implement SetTestOverrideLogRootPath mutex in logger.go

The previous commit updated test callers to use SetTestOverrideLogRootPath()
but didn't actually create the function in config/logger.go, causing build
failures across all CI shards. This commit:

- Replaces the exported var TestOverrideLogRootPath with mutex-protected
  unexported state (testOverrideLogRootPath + testOverrideLogRootMu)
- Adds exported SetTestOverrideLogRootPath() setter
- Adds unexported getTestOverrideLogRootPath() getter
- Updates GetLogRootPath() to use the thread-safe getter
- Fixes log_test.go callers that were missed in the previous commit

Co-authored-by: Claude <claude@anthropic.com>

* fix(test): use SetupConfig for access_control feature flag registration

InitAccessControlPolicy() checks FeatureFlags.AttributeBasedAccessControl
at route registration time during server startup. Setting the flag via
UpdateConfig after Setup() is too late — routes are never registered
and API calls return 404.

Use SetupConfig() to pass the feature flag in the initial config before
server startup, ensuring routes are properly registered.

Co-authored-by: Claude <claude@anthropic.com>

* fix(test): restore BurnOnRead flag state in TestRevealPost subtest

The 'feature not enabled' subtest disables BurnOnRead without restoring
it via t.Cleanup. Subsequent subtests inherit the disabled state, which
can cause 501 errors when they expect the feature to be available.

Add t.Cleanup to restore FeatureFlags.BurnOnRead = true after the
subtest completes.

Co-authored-by: Claude <claude@anthropic.com>

* fix(test): restore EnableSharedChannelsMemberSync flag via t.Cleanup

The test disables EnableSharedChannelsMemberSync without restoring it.
If the subtest exits early (e.g., require failure), later sibling
subtests inherit a disabled flag and become flaky.

Add t.Cleanup to restore the flag after the subtest completes.

Co-authored-by: Claude <claude@anthropic.com>

* Fix test parallelism: use instance-scoped overrides and init-time audit config

  Replace package-level test globals (TestOverrideInstallType,
  SetTestOverrideLogRootPath) with fields on PlatformService so each test
  gets its own instance without process-wide mutation. Fix three audit
  tests (TestUserLoginAudit, TestLogoutAuditAuthStatus,
  TestUpdatePasswordAudit) that configured the audit logger after server
  init — the audit logger only reads config at startup, so pass audit
  settings via app.Config() at init time instead.

  Also revert the Go 1.24.13 downgrade and bump mattermost-govet to
  v2.0.2 for Go 1.25.8 compatibility.

* Fix audit unit tests

* Fix MMCLOUDURL unit tests

* Fixed unit tests using MM_NOTIFY_ADMIN_COOL_OFF_DAYS

* Make app migrations idempotent for parallel test safety

  Change System().Save() to System().SaveOrUpdate() in all migration
  completion markers. When two parallel tests share a database pool entry,
  both may race through the check-then-insert migration pattern. Save()
  causes a duplicate key fatal crash; SaveOrUpdate() makes the second
  write a harmless no-op.

* test: address review feedback on fullyparallel PR

- Use SetLogRootPathOverride() setter instead of direct field access
  in platform/support_packet_test.go and platform/log_test.go (pvev)
- Restore TestGetLogRootPath in config/logger_test.go to keep
  MM_LOG_PATH env var coverage; test uses t.Setenv so it runs
  serially which is fine (pvev)
- Fix misleading comment in config_test.go: code uses t.Setenv,
  not os.Setenv (jgheithcock)

Co-authored-by: Claude <claude@anthropic.com>

* fix: add missing os import in post_test.go

The os import was dropped during a merge conflict resolution while
burn-on-read shared channel tests from master still use os.Setenv.

Co-authored-by: Claude <claude@anthropic.com>

---------

Co-authored-by: Claude <claude@anthropic.com>
Co-authored-by: wiggin77 <wiggin77@warpmail.net>
Co-authored-by: Mattermost Build <build@mattermost.com>
2026-04-08 20:48:36 -04:00

1921 lines
61 KiB
Go

// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package app
import (
"bytes"
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"net/url"
"os"
"os/exec"
"path"
"strings"
"sync"
"syscall"
"time"
"github.com/getsentry/sentry-go"
sentryhttp "github.com/getsentry/sentry-go/http"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/rs/cors"
"golang.org/x/crypto/acme/autocert"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/shared/httpservice"
"github.com/mattermost/mattermost/server/public/shared/i18n"
"github.com/mattermost/mattermost/server/public/shared/mlog"
"github.com/mattermost/mattermost/server/public/shared/request"
"github.com/mattermost/mattermost/server/public/shared/timezones"
"github.com/mattermost/mattermost/server/v8/channels/app/email"
"github.com/mattermost/mattermost/server/v8/channels/app/platform"
"github.com/mattermost/mattermost/server/v8/channels/app/properties"
"github.com/mattermost/mattermost/server/v8/channels/app/teams"
"github.com/mattermost/mattermost/server/v8/channels/app/users"
"github.com/mattermost/mattermost/server/v8/channels/audit"
"github.com/mattermost/mattermost/server/v8/channels/jobs"
"github.com/mattermost/mattermost/server/v8/channels/jobs/active_users"
"github.com/mattermost/mattermost/server/v8/channels/jobs/cleanup_desktop_tokens"
"github.com/mattermost/mattermost/server/v8/channels/jobs/delete_dms_preferences_migration"
"github.com/mattermost/mattermost/server/v8/channels/jobs/delete_empty_drafts_migration"
"github.com/mattermost/mattermost/server/v8/channels/jobs/delete_expired_posts"
"github.com/mattermost/mattermost/server/v8/channels/jobs/delete_orphan_drafts_migration"
"github.com/mattermost/mattermost/server/v8/channels/jobs/expirynotify"
"github.com/mattermost/mattermost/server/v8/channels/jobs/export_delete"
"github.com/mattermost/mattermost/server/v8/channels/jobs/export_process"
"github.com/mattermost/mattermost/server/v8/channels/jobs/export_users_to_csv"
"github.com/mattermost/mattermost/server/v8/channels/jobs/extract_content"
"github.com/mattermost/mattermost/server/v8/channels/jobs/hosted_purchase_screening"
"github.com/mattermost/mattermost/server/v8/channels/jobs/import_delete"
"github.com/mattermost/mattermost/server/v8/channels/jobs/import_process"
"github.com/mattermost/mattermost/server/v8/channels/jobs/last_accessible_file"
"github.com/mattermost/mattermost/server/v8/channels/jobs/last_accessible_post"
"github.com/mattermost/mattermost/server/v8/channels/jobs/migrations"
"github.com/mattermost/mattermost/server/v8/channels/jobs/mobile_session_metadata"
"github.com/mattermost/mattermost/server/v8/channels/jobs/notify_admin"
"github.com/mattermost/mattermost/server/v8/channels/jobs/plugins"
"github.com/mattermost/mattermost/server/v8/channels/jobs/post_persistent_notifications"
"github.com/mattermost/mattermost/server/v8/channels/jobs/product_notices"
"github.com/mattermost/mattermost/server/v8/channels/jobs/recap"
"github.com/mattermost/mattermost/server/v8/channels/jobs/refresh_materialized_views"
"github.com/mattermost/mattermost/server/v8/channels/jobs/resend_invitation_email"
"github.com/mattermost/mattermost/server/v8/channels/jobs/s3_path_migration"
"github.com/mattermost/mattermost/server/v8/channels/store"
"github.com/mattermost/mattermost/server/v8/channels/utils"
"github.com/mattermost/mattermost/server/v8/config"
"github.com/mattermost/mattermost/server/v8/einterfaces"
"github.com/mattermost/mattermost/server/v8/platform/services/awsmeter"
"github.com/mattermost/mattermost/server/v8/platform/services/cache"
"github.com/mattermost/mattermost/server/v8/platform/services/remotecluster"
"github.com/mattermost/mattermost/server/v8/platform/services/sharedchannel"
"github.com/mattermost/mattermost/server/v8/platform/services/telemetry"
"github.com/mattermost/mattermost/server/v8/platform/services/upgrader"
"github.com/mattermost/mattermost/server/v8/platform/shared/filestore"
"github.com/mattermost/mattermost/server/v8/platform/shared/mail"
"github.com/mattermost/mattermost/server/v8/platform/shared/templates"
)
const (
scheduledPostJobInterval = 5 * time.Minute
debugScheduledPostJobInterval = 2 * time.Second
)
var SentryDSN = "https://eaf281226106b5bba68694d1316da21c@o94110.ingest.us.sentry.io/5212327"
type Server struct {
// RootRouter is the starting point for all HTTP requests to the server.
RootRouter *mux.Router
// LocalRouter is the starting point for all the local UNIX socket
// requests to the server
LocalRouter *mux.Router
// Router is the starting point for all web, api4 and ws requests to the server. It differs
// from RootRouter only if the SiteURL contains a /subpath.
Router *mux.Router
Server *http.Server
ListenAddr *net.TCPAddr
RateLimiter *RateLimiter
localModeServer *http.Server
didFinishListen chan struct{}
EmailService email.ServiceInterface
httpService httpservice.HTTPService
PushNotificationsHub PushNotificationsHub
pushNotificationClient *http.Client // TODO: move this to it's own package
outgoingWebhookClient *http.Client
runEssentialJobs bool
Jobs *jobs.JobServer
timezones *timezones.Timezones
htmlTemplates *templates.Container
seenPendingPostIdsCache cache.Cache
openGraphDataCache cache.Cache
clusterLeaderListenerId string
loggerLicenseListenerId string
platform *platform.PlatformService
platformOptions []platform.Option
telemetryService *telemetry.TelemetryService
userService *users.UserService
teamService *teams.TeamService
propertyService *properties.PropertyService
serviceMux sync.RWMutex
remoteClusterService remotecluster.RemoteClusterServiceIFace
sharedChannelService SharedChannelServiceIFace // TODO: platform: move to platform package
phase2PermissionsMigrationComplete bool
Audit *audit.Audit
joinCluster bool
skipPostInit bool
Cloud einterfaces.CloudInterface
IPFiltering einterfaces.IPFilteringInterface
OutgoingOAuthConnection einterfaces.OutgoingOAuthConnectionInterface
PushProxy einterfaces.PushProxyInterface
AutoTranslation einterfaces.AutoTranslationInterface
agentsBridgeOverride AgentsBridge
ch *Channels
// cwsTokenOverride overrides CWS_CLOUD_TOKEN for CWS login authentication.
cwsTokenOverride string
// notifyAdminCoolOffDaysOverride overrides MM_NOTIFY_ADMIN_COOL_OFF_DAYS.
notifyAdminCoolOffDaysOverride string
}
// SetCWSTokenOverride sets the CWS token override for CWS login authentication.
func (s *Server) SetCWSTokenOverride(v string) {
s.cwsTokenOverride = v
}
// SetNotifyAdminCoolOffDaysOverride sets the cool-off period override for admin notifications.
func (s *Server) SetNotifyAdminCoolOffDaysOverride(v string) {
s.notifyAdminCoolOffDaysOverride = v
}
func (s *Server) Store() store.Store {
if s.platform != nil {
return s.platform.Store
}
return nil
}
func (s *Server) PropertyService() *properties.PropertyService {
return s.propertyService
}
func (s *Server) SetStore(st store.Store) {
if s.platform != nil {
s.platform.Store = st
}
}
func NewServer(options ...Option) (*Server, error) {
rootRouter := mux.NewRouter()
localRouter := mux.NewRouter()
s := &Server{
RootRouter: rootRouter,
LocalRouter: localRouter,
timezones: timezones.New(),
}
for _, option := range options {
if err := option(s); err != nil {
return nil, errors.Wrap(err, "failed to apply option")
}
}
// Following outlines the specific set of steps
// performed during server bootup. They are sensitive to order
// and has dependency requirements with the previous step.
//
// Step 1: Platform.
if s.platform == nil {
ps, sErr := platform.New(platform.ServiceConfig{}, s.platformOptions...)
if sErr != nil {
return nil, errors.Wrap(sErr, "failed to initialize platform")
}
s.platform = ps
}
subpath, err := utils.GetSubpathFromConfig(s.platform.Config())
if err != nil {
return nil, errors.Wrap(err, "failed to parse SiteURL subpath")
}
s.Router = s.RootRouter.PathPrefix(subpath).Subrouter()
s.httpService = httpservice.MakeHTTPService(s.platform)
// Step 2: Init Enterprise
// Depends on step 1 (s.Platform must be non-nil)
s.initEnterprise()
// Needed to run before loading license.
s.userService, err = users.New(users.ServiceConfig{
UserStore: s.Store().User(),
SessionStore: s.Store().Session(),
OAuthStore: s.Store().OAuth(),
ConfigFn: s.platform.Config,
Metrics: s.GetMetrics(),
Cluster: s.platform.Cluster(),
LicenseFn: s.License,
})
if err != nil {
return nil, errors.Wrapf(err, "unable to create users service")
}
s.teamService, err = teams.New(teams.ServiceConfig{
TeamStore: s.Store().Team(),
ChannelStore: s.Store().Channel(),
GroupStore: s.Store().Group(),
Users: s.userService,
WebHub: s.platform,
ConfigFn: s.platform.Config,
LicenseFn: s.License,
})
if err != nil {
return nil, errors.Wrapf(err, "unable to create teams service")
}
s.propertyService, err = properties.New(properties.ServiceConfig{
PropertyGroupStore: s.Store().PropertyGroup(),
PropertyFieldStore: s.Store().PropertyField(),
PropertyValueStore: s.Store().PropertyValue(),
CallerIDExtractor: func(rctx request.CTX) string {
callerID, _ := CallerIDFromRequestContext(rctx)
return callerID
},
})
if err != nil {
return nil, errors.Wrapf(err, "unable to create properties service")
}
propertyAccessService := properties.NewPropertyAccessService(s.propertyService, func(pluginID string) bool {
if s.ch == nil {
return false
}
_, err := s.ch.GetPluginStatus(pluginID)
return err == nil
})
s.propertyService.SetPropertyAccessService(propertyAccessService)
// Register builtin property groups after fully initializing the propertyService
if err = s.propertyService.RegisterBuiltinGroups([]*model.PropertyGroup{
{Name: model.CustomProfileAttributesPropertyGroupName},
{Name: model.ContentFlaggingGroupName},
}); err != nil {
return nil, errors.Wrap(err, "failed to register builtin property groups")
}
// It is important to initialize the hub only after the global logger is set
// to avoid race conditions while logging from inside the hub.
// Step 4: Start platform
if err = s.platform.Start(s.makeBroadcastHooks()); err != nil {
return nil, errors.Wrap(err, "failed to start platform")
}
// NOTE: There should be no call to App.Srv().Channels() before step 5 is done
// otherwise it will throw a panic.
// Step 5: Initialize channels.
// Depends on s.httpService, and depends on the hub to be initialized.
// Otherwise we run into race conditions.
channels, err := NewChannels(s)
if err != nil {
return nil, errors.Wrap(err, "failed to initialize channels")
}
s.ch = channels
// After channel is initialized set it to the App object
app := New(ServerConnector(channels))
// -------------------------------------------------------------------------
// Everything below this is not order sensitive and safe to be moved around.
// If you are adding a new field that is non-channels specific, please add
// below this. Otherwise, please add it to Channels struct in app/channels.go.
// -------------------------------------------------------------------------
if *s.platform.Config().LogSettings.EnableDiagnostics && *s.platform.Config().LogSettings.EnableSentry {
switch model.GetServiceEnvironment() {
case model.ServiceEnvironmentDev:
mlog.Warn("Sentry reporting is enabled, but service environment is dev. Disabling reporting.")
case model.ServiceEnvironmentProduction, model.ServiceEnvironmentTest:
if err2 := sentry.Init(sentry.ClientOptions{
Dsn: SentryDSN,
Release: model.BuildHash,
AttachStacktrace: true,
BeforeSend: func(event *sentry.Event, hint *sentry.EventHint) *sentry.Event {
// sanitize data sent to sentry to reduce exposure of PII
if event.Request != nil {
event.Request.Cookies = ""
event.Request.QueryString = ""
event.Request.Headers = nil
event.Request.Data = ""
}
return event
},
EnableTracing: false,
TracesSampler: sentry.TracesSampler(func(ctx sentry.SamplingContext) float64 {
return 0.0
}),
}); err2 != nil {
mlog.Warn("Sentry could not be initiated, probably bad DSN?", mlog.Err(err2))
}
}
}
s.pushNotificationClient = s.httpService.MakeClient(true)
s.outgoingWebhookClient = s.httpService.MakeClient(false)
if err2 := utils.TranslationsPreInit(); err2 != nil {
return nil, errors.Wrapf(err2, "unable to load Mattermost translation files")
}
model.AppErrorInit(i18n.T)
if s.seenPendingPostIdsCache, err = s.platform.CacheProvider().NewCache(&cache.CacheOptions{
Name: "seen_pending_post_ids",
Size: PendingPostIDsCacheSize,
}); err != nil {
return nil, errors.Wrap(err, "Unable to create pending post ids cache")
}
if s.openGraphDataCache, err = s.platform.CacheProvider().NewCache(&cache.CacheOptions{
Name: "opengraph_data",
Size: openGraphMetadataCacheSize,
}); err != nil {
return nil, errors.Wrap(err, "Unable to create opengraphdata cache")
}
s.createPushNotificationsHub(request.EmptyContext(s.Log()))
if err = i18n.InitTranslations(*s.platform.Config().LocalizationSettings.DefaultServerLocale, *s.platform.Config().LocalizationSettings.DefaultClientLocale); err != nil {
return nil, errors.Wrapf(err, "unable to load Mattermost translation files")
}
templatesDir, ok := templates.GetTemplateDirectory()
if !ok {
return nil, errors.New("Failed find server templates in \"templates\" directory")
}
htmlTemplates, err := templates.New(templatesDir)
if err != nil {
return nil, errors.Wrap(err, "cannot initialize server templates")
}
s.htmlTemplates = htmlTemplates
s.telemetryService, err = telemetry.New(New(ServerConnector(s.Channels())), s.Store(), s.Log())
if err != nil {
return nil, errors.Wrapf(err, "unable to initialize telemetry service")
}
s.platform.SetTelemetryId(s.ServerId()) // TODO: move this into platform once telemetry service moved to platform.
emailService, err := email.NewService(email.ServiceConfig{
ConfigFn: s.platform.Config,
LicenseFn: s.License,
TemplatesContainer: s.TemplatesContainer(),
UserService: s.userService,
Store: s.GetStore(),
})
if err != nil {
return nil, errors.Wrapf(err, "unable to initialize email service")
}
s.EmailService = emailService
s.platform.SetupFeatureFlags()
s.initJobs()
if ipFilteringInterface != nil {
s.IPFiltering = ipFilteringInterface(app)
}
if outgoingOauthConnectionInterface != nil {
s.OutgoingOAuthConnection = outgoingOauthConnectionInterface(app)
}
s.clusterLeaderListenerId = s.AddClusterLeaderChangedListener(func() {
mlog.Info("Cluster leader changed. Determining if job schedulers should be running:", mlog.Bool("isLeader", s.IsLeader()))
if s.Jobs != nil {
s.Jobs.HandleClusterLeaderChange(s.IsLeader())
}
s.platform.SetupFeatureFlags()
})
// If configured with a subpath, redirect 404s at the root back into the subpath.
if subpath != "/" {
s.RootRouter.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.URL.Path = path.Join(subpath, r.URL.Path)
http.Redirect(w, r, r.URL.String(), http.StatusFound)
})
}
if _, err = url.ParseRequestURI(*s.platform.Config().ServiceSettings.SiteURL); err != nil {
// Don't spam the logs when in CI or local testing mode
if !(os.Getenv("IS_CI") == "true" || os.Getenv("IS_LOCAL_TESTING") == "true") {
mlog.Error("SiteURL must be set. Some features will operate incorrectly if the SiteURL is not set. See documentation for details: https://mattermost.com/pl/configure-site-url")
}
}
// Start email batching because it's not like the other jobs
s.platform.AddConfigListener(func(_, _ *model.Config) {
s.EmailService.InitEmailBatching()
})
pwd, _ := os.Getwd()
mlog.Info("Printing current working", mlog.String("directory", pwd))
mlog.Info("Loaded config", mlog.String("source", s.platform.DescribeConfig()))
license := s.License()
allowAdvancedLogging := license != nil && *license.Features.AdvancedLogging
if s.Audit == nil {
s.Audit = &audit.Audit{}
s.Audit.Init(audit.DefMaxQueueSize)
if err = s.configureAudit(s.Audit, allowAdvancedLogging); err != nil {
mlog.Error("Error configuring audit", mlog.Err(err))
}
}
s.platform.RemoveUnlicensedLogTargets(license)
s.platform.EnableLoggingMetrics()
s.loggerLicenseListenerId = s.AddLicenseListener(func(oldLicense, newLicense *model.License) {
s.platform.RemoveUnlicensedLogTargets(newLicense)
s.platform.EnableLoggingMetrics()
})
// if enabled - perform initial product notices fetch
if *s.platform.Config().AnnouncementSettings.AdminNoticesEnabled || *s.platform.Config().AnnouncementSettings.UserNoticesEnabled {
s.platform.Go(func() {
appInstance := New(ServerConnector(s.Channels()))
if err := appInstance.UpdateProductNotices(); err != nil {
mlog.Warn("Failed to perform initial product notices fetch", mlog.Err(err))
}
})
}
if s.skipPostInit {
return s, nil
}
s.platform.AddConfigListener(func(oldCfg, newCfg *model.Config) {
appInstance := New(ServerConnector(s.Channels()))
if *oldCfg.GuestAccountsSettings.Enable && !*newCfg.GuestAccountsSettings.Enable {
c := request.EmptyContext(s.Log())
if appErr := appInstance.DeactivateGuests(c); appErr != nil {
mlog.Error("Unable to deactivate guest accounts", mlog.Err(appErr))
}
} else if *oldCfg.GuestAccountsSettings.EnableGuestMagicLink && !*newCfg.GuestAccountsSettings.EnableGuestMagicLink {
// Only run this if guest magic link accounts are still enabled
c := request.EmptyContext(s.Log())
if appErr := appInstance.DeactivateMagicLinkGuests(c); appErr != nil {
mlog.Error("Unable to deactivate guest magic link accounts", mlog.Err(appErr))
}
}
})
// Disable active guest accounts on first run if guest accounts are disabled
if !*s.platform.Config().GuestAccountsSettings.Enable {
appInstance := New(ServerConnector(s.Channels()))
c := request.EmptyContext(s.Log())
if appErr := appInstance.DeactivateGuests(c); appErr != nil {
mlog.Error("Unable to deactivate guest accounts", mlog.Err(appErr))
}
} else if !*s.platform.Config().GuestAccountsSettings.EnableGuestMagicLink {
// Disable guest magic link accounts on first run if guest magic link accounts are disabled
// and only if guest accounts are still enabled
appInstance := New(ServerConnector(s.Channels()))
c := request.EmptyContext(s.Log())
if appErr := appInstance.DeactivateMagicLinkGuests(c); appErr != nil {
mlog.Error("Unable to deactivate guest magic link accounts", mlog.Err(appErr))
}
}
if s.runEssentialJobs {
s.runJobs()
}
s.doAppMigrations()
s.initPostMetadata()
// Dump the image cache if the proxy settings have changed. (need switch URLs to the correct proxy)
s.platform.AddConfigListener(func(oldCfg, newCfg *model.Config) {
if (oldCfg.ImageProxySettings.Enable != newCfg.ImageProxySettings.Enable) ||
(oldCfg.ImageProxySettings.ImageProxyType != newCfg.ImageProxySettings.ImageProxyType) ||
(oldCfg.ImageProxySettings.RemoteImageProxyURL != newCfg.ImageProxySettings.RemoteImageProxyURL) ||
(oldCfg.ImageProxySettings.RemoteImageProxyOptions != newCfg.ImageProxySettings.RemoteImageProxyOptions) {
if err = s.openGraphDataCache.Purge(); err != nil {
mlog.Error("Failed to purge Open Graph data cache after config change", mlog.Err(err))
}
}
})
return s, nil
}
func (s *Server) runJobs() {
s.runLicenseExpirationCheckJob()
s.Go(func() {
appInstance := New(ServerConnector(s.Channels()))
runDNDStatusExpireJob(appInstance)
runPostReminderJob(appInstance)
runScheduledPostJob(appInstance)
})
s.Go(func() {
runSecurityJob(s)
})
s.Go(func() {
runSessionCleanupJob(s)
})
s.Go(func() {
runJobsCleanupJob(s)
})
s.Go(func() {
runTokenCleanupJob(s)
})
s.Go(func() {
runCommandWebhookCleanupJob(s)
})
s.Go(func() {
runConfigCleanupJob(s)
})
s.Go(func() {
runCloudUserCountReportJob(s)
})
if complianceI := s.Channels().Compliance; complianceI != nil {
go complianceI.StartComplianceDailyJob()
}
if *s.platform.Config().JobSettings.RunJobs && s.Jobs != nil {
if err := s.Jobs.StartWorkers(); err != nil {
mlog.Error("Failed to start job server workers", mlog.Err(err))
}
}
if *s.platform.Config().JobSettings.RunScheduler && s.Jobs != nil {
if err := s.Jobs.StartSchedulers(); err != nil {
mlog.Error("Failed to start job server schedulers", mlog.Err(err))
}
}
if *s.platform.Config().ServiceSettings.EnableAWSMetering {
runReportToAWSMeterJob(s)
}
}
// Global app options that should be applied to apps created by this server
func (s *Server) AppOptions() []AppOption {
return []AppOption{
ServerConnector(s.Channels()),
}
}
func (s *Server) Channels() *Channels {
return s.ch
}
func (s *Server) startInterClusterServices(license *model.License) error {
if license == nil {
mlog.Debug("No license provided; Remote Cluster services disabled")
return nil
}
// Remote Cluster service
// License check (assume enabled if shared channels enabled)
if !license.HasRemoteClusterService() && !license.HasSharedChannels() {
mlog.Debug("License does not have Remote Cluster services enabled")
return nil
}
// Config check
if !*s.platform.Config().ConnectedWorkspacesSettings.EnableRemoteClusterService && !*s.platform.Config().ConnectedWorkspacesSettings.EnableSharedChannels {
mlog.Debug("Remote Cluster Service disabled via config")
return nil
}
var err error
appInstance := New(ServerConnector(s.Channels()))
rcs, err := remotecluster.NewRemoteClusterService(s, appInstance)
if err != nil {
return err
}
if err = rcs.Start(); err != nil {
return err
}
s.serviceMux.Lock()
s.remoteClusterService = rcs
s.serviceMux.Unlock()
// Shared Channels service (depends on remote cluster service)
// License check
if !license.HasSharedChannels() {
mlog.Debug("License does not have shared channels enabled")
return nil
}
// Config check
if !*s.platform.Config().ConnectedWorkspacesSettings.EnableSharedChannels {
mlog.Debug("Shared Channels Service disabled via config")
return nil
}
scs, err := sharedchannel.NewSharedChannelService(s, s.Platform(), appInstance)
if err != nil {
return err
}
s.platform.SetSharedChannelService(scs)
if err = scs.Start(); err != nil {
return err
}
s.serviceMux.Lock()
s.sharedChannelService = scs
s.serviceMux.Unlock()
return nil
}
const TimeToWaitForConnectionsToCloseOnServerShutdown = time.Second
func (s *Server) StopHTTPServer() {
if s.Server != nil {
ctx, cancel := context.WithTimeout(context.Background(), TimeToWaitForConnectionsToCloseOnServerShutdown)
defer cancel()
didShutdown := false
for s.didFinishListen != nil && !didShutdown {
if err := s.Server.Shutdown(ctx); err != nil {
mlog.Warn("Unable to shutdown server", mlog.Err(err))
}
timer := time.NewTimer(time.Millisecond * 50)
select {
case <-s.didFinishListen:
didShutdown = true
case <-timer.C:
}
timer.Stop()
}
s.Server.Close()
s.Server = nil
}
}
func (s *Server) Shutdown() {
s.Log().Info("Stopping Server...")
defer sentry.Flush(2 * time.Second)
s.RemoveLicenseListener(s.loggerLicenseListenerId)
s.RemoveClusterLeaderChangedListener(s.clusterLeaderListenerId)
var err error
s.serviceMux.RLock()
if s.sharedChannelService != nil {
if err = s.sharedChannelService.Shutdown(); err != nil {
s.Log().Error("Error shutting down shared channel services", mlog.Err(err))
}
}
if s.remoteClusterService != nil {
if err = s.remoteClusterService.Shutdown(); err != nil {
s.Log().Error("Error shutting down intercluster services", mlog.Err(err))
}
}
if s.AutoTranslation != nil {
if err = s.AutoTranslation.Shutdown(); err != nil {
s.Log().Error("Error shutting down auto-translation service", mlog.Err(err))
}
}
s.serviceMux.RUnlock()
s.StopHTTPServer()
s.stopLocalModeServer()
// Push notification hub needs to be shutdown after HTTP server
// to prevent stray requests from generating a push notification after it's shut down.
s.StopPushNotificationsHubWorkers()
s.platform.StopSearchEngine()
if err = s.Audit.Shutdown(); err != nil {
s.Log().Warn("Failed to shut down audit", mlog.Err(err))
}
s.platform.StopFeatureFlagUpdateJob()
if err = s.platform.ShutdownConfig(); err != nil {
s.Log().Warn("Failed to shut down config store", mlog.Err(err))
}
if s.platform.Cluster() != nil {
s.platform.Cluster().StopInterNodeCommunication()
}
if err = s.platform.ShutdownMetrics(); err != nil {
s.Log().Warn("Failed to stop metrics server", mlog.Err(err))
}
// Stopping email service after HTTP server has stopped to prevent
// any stray notifications from being queued.
s.EmailService.Stop()
// This must be done after the cluster is stopped.
if s.Jobs != nil {
// For simplicity we don't check if workers and schedulers are active
// before stopping them as both calls essentially become no-ops
// if nothing is running.
if err = s.Jobs.StopWorkers(); err != nil && !errors.Is(err, jobs.ErrWorkersNotRunning) {
s.Log().Warn("Failed to stop job server workers", mlog.Err(err))
}
if err = s.Jobs.StopSchedulers(); err != nil && !errors.Is(err, jobs.ErrSchedulersNotRunning) {
s.Log().Warn("Failed to stop job server schedulers", mlog.Err(err))
}
}
// Stop channels.
// This needs to happen last because channels are dependent
// on parent services.
if err = s.Channels().Stop(); err != nil {
s.Log().Warn("Unable to cleanly stop channels", mlog.Err(err))
}
if err = s.platform.Shutdown(); err != nil {
s.Log().Warn("Failed to stop platform", mlog.Err(err))
}
s.Log().Info("Server stopped")
// shutdown main and notification loggers which will flush any remaining log records.
timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), time.Second*15)
defer timeoutCancel()
if err = s.Log().ShutdownWithTimeout(timeoutCtx); err != nil {
fmt.Fprintf(os.Stderr, "Error shutting down main logger: %v", err)
}
}
func (s *Server) Restart() error {
percentage, err := s.UpgradeToE0Status()
if err != nil || percentage != 100 {
return errors.Wrap(err, "unable to restart because the system has not been upgraded")
}
s.Shutdown()
argv0, err := exec.LookPath(os.Args[0])
if err != nil {
return err
}
if _, err = os.Stat(argv0); err != nil {
return err
}
mlog.Info("Restarting server")
return syscall.Exec(argv0, os.Args, os.Environ())
}
func (s *Server) CanIUpgradeToE0() error {
return upgrader.CanIUpgradeToE0()
}
func (s *Server) UpgradeToE0() error {
if err := upgrader.UpgradeToE0(); err != nil {
return err
}
upgradedFromTE := &model.System{Name: model.SystemUpgradedFromTeId, Value: "true"}
if err := s.Store().System().Save(upgradedFromTE); err != nil {
return err
}
return nil
}
func (s *Server) UpgradeToE0Status() (int64, error) {
return upgrader.UpgradeToE0Status()
}
// Go creates a goroutine, but maintains a record of it to ensure that execution completes before
// the server is shutdown.
func (s *Server) Go(f func()) {
s.platform.Go(f)
}
// GoBuffered acts like a semaphore which creates a goroutine, but maintains a record of it
// to ensure that execution completes before the server is shutdown.
func (s *Server) GoBuffered(f func()) {
s.platform.GoBuffered(f)
}
var corsAllowedMethods = []string{
"POST",
"GET",
"OPTIONS",
"PUT",
"PATCH",
"DELETE",
}
// golang.org/x/crypto/acme/autocert/autocert.go
func handleHTTPRedirect(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" && r.Method != "HEAD" {
http.Error(w, "Use HTTPS", http.StatusBadRequest)
return
}
target := "https://" + stripPort(r.Host) + r.URL.RequestURI()
http.Redirect(w, r, target, http.StatusFound)
}
// golang.org/x/crypto/acme/autocert/autocert.go
func stripPort(hostport string) string {
host, _, err := net.SplitHostPort(hostport)
if err != nil {
return hostport
}
return net.JoinHostPort(host, "443")
}
func (s *Server) Start() error {
// Start channels.
// This needs to happen before because channels is dependent on the HTTP server.
if err := s.Channels().Start(); err != nil {
return errors.Wrap(err, "Unable to start channels")
}
if s.joinCluster && s.platform.Cluster() != nil {
s.registerClusterHandlers()
s.platform.Cluster().StartInterNodeCommunication()
}
if err := s.ensureInstallationDate(); err != nil {
return errors.Wrapf(err, "unable to ensure installation date")
}
if err := s.ensureFirstServerRunTimestamp(); err != nil {
return errors.Wrapf(err, "unable to ensure first run timestamp")
}
if err := s.Store().Status().ResetAll(); err != nil {
mlog.Error("Error to reset the server status.", mlog.Err(err))
}
if s.MailServiceConfig().SendEmailNotifications {
if err := mail.TestConnection(s.MailServiceConfig()); err != nil {
mlog.Error("Mail server connection test failed", mlog.Err(err))
}
}
if s.AutoTranslation != nil {
if err := s.AutoTranslation.Start(); err != nil {
return errors.Wrap(err, "Unable to start auto-translation service")
}
}
err := s.FileBackend().TestConnection()
if err != nil {
if _, ok := err.(*filestore.S3FileBackendNoBucketError); ok {
err = s.FileBackend().(*filestore.S3FileBackend).MakeBucket()
}
if err != nil {
mlog.Error("Problem with file storage settings", mlog.Err(err))
}
}
s.checkPushNotificationServerURL()
if err = s.platform.ReloadConfig(); err != nil {
mlog.Error("Failed to reload config on server start", mlog.Err(err))
}
mlog.Info("Starting Server...")
var handler http.Handler = s.RootRouter
switch model.GetServiceEnvironment() {
case model.ServiceEnvironmentProduction, model.ServiceEnvironmentTest:
if *s.platform.Config().LogSettings.EnableDiagnostics && *s.platform.Config().LogSettings.EnableSentry {
sentryHandler := sentryhttp.New(sentryhttp.Options{
Repanic: true,
})
handler = sentryHandler.Handle(handler)
}
case model.ServiceEnvironmentDev:
}
if allowedOrigins := *s.platform.Config().ServiceSettings.AllowCorsFrom; allowedOrigins != "" {
exposedCorsHeaders := *s.platform.Config().ServiceSettings.CorsExposedHeaders
allowCredentials := *s.platform.Config().ServiceSettings.CorsAllowCredentials
debug := *s.platform.Config().ServiceSettings.CorsDebug
corsWrapper := cors.New(cors.Options{
AllowedOrigins: strings.Fields(allowedOrigins),
AllowedMethods: corsAllowedMethods,
AllowedHeaders: []string{"*"},
ExposedHeaders: strings.Fields(exposedCorsHeaders),
MaxAge: 86400,
AllowCredentials: allowCredentials,
Debug: debug,
})
// If we have debugging of CORS turned on then forward messages to logs
if debug {
corsWrapper.Log = s.Log().With(mlog.String("source", "cors")).StdLogger(mlog.LvlDebug)
}
handler = corsWrapper.Handler(handler)
}
if *s.platform.Config().RateLimitSettings.Enable {
mlog.Info("RateLimiter is enabled")
rateLimiter, err2 := NewRateLimiter(&s.platform.Config().RateLimitSettings, s.platform.Config().ServiceSettings.TrustedProxyIPHeader)
if err2 != nil {
return err2
}
s.RateLimiter = rateLimiter
handler = rateLimiter.RateLimitHandler(handler)
}
// Creating a logger for logging errors from http.Server at error level
errStdLog := s.Log().With(mlog.String("source", "httpserver")).StdLogger(mlog.LvlError)
s.Server = &http.Server{
Handler: handler,
ReadTimeout: time.Duration(*s.platform.Config().ServiceSettings.ReadTimeout) * time.Second,
WriteTimeout: time.Duration(*s.platform.Config().ServiceSettings.WriteTimeout) * time.Second,
IdleTimeout: time.Duration(*s.platform.Config().ServiceSettings.IdleTimeout) * time.Second,
ErrorLog: errStdLog,
}
addr := *s.platform.Config().ServiceSettings.ListenAddress
if addr == "" {
if *s.platform.Config().ServiceSettings.ConnectionSecurity == model.ConnSecurityTLS {
addr = ":https"
} else {
addr = ":http"
}
}
listener, err := net.Listen("tcp", addr)
if err != nil {
return errors.Wrapf(err, i18n.T("api.server.start_server.starting.critical"), err)
}
s.ListenAddr = listener.Addr().(*net.TCPAddr)
logListeningPort := fmt.Sprintf("Server is listening on %v", listener.Addr().String())
mlog.Info(logListeningPort, mlog.String("address", listener.Addr().String()))
m := &autocert.Manager{
Cache: autocert.DirCache(*s.platform.Config().ServiceSettings.LetsEncryptCertificateCacheFile),
Prompt: autocert.AcceptTOS,
}
if *s.platform.Config().ServiceSettings.Forward80To443 {
if host, port, err := net.SplitHostPort(addr); err != nil {
mlog.Error("Unable to setup forwarding", mlog.Err(err))
} else if port != "443" {
return fmt.Errorf(i18n.T("api.server.start_server.forward80to443.enabled_but_listening_on_wrong_port"), port)
} else {
httpListenAddress := net.JoinHostPort(host, "http")
if *s.platform.Config().ServiceSettings.UseLetsEncrypt {
server := &http.Server{
Addr: httpListenAddress,
Handler: m.HTTPHandler(nil),
ErrorLog: s.Log().With(mlog.String("source", "le_forwarder_server")).StdLogger(mlog.LvlError),
}
go func() {
if err := server.ListenAndServe(); err != nil {
mlog.Error("Failed to serve redirect from port 80 to 443 with autocert ", mlog.Err(err))
}
}()
} else {
go func() {
redirectListener, err := net.Listen("tcp", httpListenAddress)
if err != nil {
mlog.Error("Unable to setup forwarding", mlog.Err(err))
return
}
defer redirectListener.Close()
server := &http.Server{
Handler: http.HandlerFunc(handleHTTPRedirect),
ErrorLog: s.Log().With(mlog.String("source", "forwarder_server")).StdLogger(mlog.LvlError),
}
if err := server.Serve(redirectListener); err != nil {
mlog.Error("Failed to serve redirect from port 80 to 443", mlog.Err(err))
}
}()
}
}
} else if *s.platform.Config().ServiceSettings.UseLetsEncrypt {
return errors.New(i18n.T("api.server.start_server.forward80to443.disabled_while_using_lets_encrypt"))
}
s.didFinishListen = make(chan struct{})
go func() {
var err error
if *s.platform.Config().ServiceSettings.ConnectionSecurity == model.ConnSecurityTLS {
tlsConfig := &tls.Config{
PreferServerCipherSuites: true,
CurvePreferences: []tls.CurveID{tls.CurveP521, tls.CurveP384, tls.CurveP256},
}
switch *s.platform.Config().ServiceSettings.TLSMinVer {
case "1.0":
tlsConfig.MinVersion = tls.VersionTLS10
case "1.1":
tlsConfig.MinVersion = tls.VersionTLS11
default:
tlsConfig.MinVersion = tls.VersionTLS12
}
defaultCiphers := []uint16{
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_RSA_WITH_AES_128_GCM_SHA256,
tls.TLS_RSA_WITH_AES_256_GCM_SHA384,
}
if len(s.platform.Config().ServiceSettings.TLSOverwriteCiphers) == 0 {
tlsConfig.CipherSuites = defaultCiphers
} else {
var cipherSuites []uint16
for _, cipher := range s.platform.Config().ServiceSettings.TLSOverwriteCiphers {
value, ok := model.ServerTLSSupportedCiphers[cipher]
if !ok {
mlog.Warn("Unsupported cipher passed", mlog.String("cipher", cipher))
continue
}
cipherSuites = append(cipherSuites, value)
}
if len(cipherSuites) == 0 {
mlog.Warn("No supported ciphers passed, fallback to default cipher suite")
cipherSuites = defaultCiphers
}
tlsConfig.CipherSuites = cipherSuites
}
certFile := ""
keyFile := ""
if *s.platform.Config().ServiceSettings.UseLetsEncrypt {
tlsConfig.GetCertificate = m.GetCertificate
tlsConfig.NextProtos = append(tlsConfig.NextProtos, "h2")
} else {
certFile = *s.platform.Config().ServiceSettings.TLSCertFile
keyFile = *s.platform.Config().ServiceSettings.TLSKeyFile
}
s.Server.TLSConfig = tlsConfig
err = s.Server.ServeTLS(listener, certFile, keyFile)
} else {
err = s.Server.Serve(listener)
}
if err != nil && err != http.ErrServerClosed {
mlog.Fatal("Error starting server", mlog.Err(err))
time.Sleep(time.Second)
}
close(s.didFinishListen)
}()
if *s.platform.Config().ServiceSettings.EnableLocalMode {
if err := s.startLocalModeServer(); err != nil {
mlog.Fatal(err.Error())
}
}
if err := s.startInterClusterServices(s.License()); err != nil {
mlog.Error("Error starting inter-cluster services", mlog.Err(err))
}
return nil
}
func (s *Server) startLocalModeServer() error {
s.localModeServer = &http.Server{
Handler: s.LocalRouter,
}
socket := *s.platform.Config().ServiceSettings.LocalModeSocketLocation
if err := os.RemoveAll(socket); err != nil {
return errors.Wrapf(err, i18n.T("api.server.start_server.starting.critical"), err)
}
unixListener, err := net.Listen("unix", socket)
if err != nil {
return errors.Wrapf(err, i18n.T("api.server.start_server.starting.critical"), err)
}
if err = os.Chmod(socket, 0600); err != nil {
return errors.Wrapf(err, i18n.T("api.server.start_server.starting.critical"), err)
}
go func() {
err = s.localModeServer.Serve(unixListener)
if err != nil && err != http.ErrServerClosed {
mlog.Fatal("Error starting unix socket server", mlog.Err(err))
}
}()
return nil
}
func (s *Server) stopLocalModeServer() {
if s.localModeServer != nil {
s.localModeServer.Close()
}
}
func (a *App) OriginChecker() func(*http.Request) bool {
if allowed := *a.Config().ServiceSettings.AllowCorsFrom; allowed != "" {
if allowed != "*" {
siteURL, err := url.Parse(*a.Config().ServiceSettings.SiteURL)
if err == nil {
siteURL.Path = ""
allowed += " " + siteURL.String()
}
}
return utils.OriginChecker(allowed)
}
// Overriding the default origin checker
return func(r *http.Request) bool {
origin := r.Header["Origin"]
if len(origin) == 0 {
return true
}
if origin[0] == "null" {
return false
}
u, err := url.Parse(origin[0])
if err != nil {
return false
}
// To maintain the case where siteURL is not set.
if *a.Config().ServiceSettings.SiteURL == "" {
return strings.EqualFold(u.Host, r.Host)
}
siteURL, err := url.Parse(*a.Config().ServiceSettings.SiteURL)
if err != nil {
return false
}
return strings.EqualFold(u.Host, siteURL.Host) && strings.EqualFold(u.Scheme, siteURL.Scheme)
}
}
func (s *Server) checkPushNotificationServerURL() {
notificationServer := *s.platform.Config().EmailSettings.PushNotificationServer
if strings.HasPrefix(notificationServer, "http://") {
mlog.Warn("Your push notification server is configured with HTTP. For improved security, update to HTTPS in your configuration.")
}
}
func runSecurityJob(s *Server) {
doSecurity(s)
model.CreateRecurringTask("Security", func() {
doSecurity(s)
}, time.Hour*4)
}
func runTokenCleanupJob(s *Server) {
doTokenCleanup(s)
model.CreateRecurringTask("Token Cleanup", func() {
doTokenCleanup(s)
}, time.Hour*1)
}
func runCommandWebhookCleanupJob(s *Server) {
doCommandWebhookCleanup(s)
model.CreateRecurringTask("Command Hook Cleanup", func() {
doCommandWebhookCleanup(s)
}, time.Hour*1)
}
func runSessionCleanupJob(s *Server) {
doSessionCleanup(s)
model.CreateRecurringTask("Session Cleanup", func() {
doSessionCleanup(s)
}, time.Hour*24)
}
func runJobsCleanupJob(s *Server) {
doJobsCleanup(s)
model.CreateRecurringTask("Job Cleanup", func() {
doJobsCleanup(s)
}, time.Hour*24)
}
func runConfigCleanupJob(s *Server) {
doConfigCleanup(s)
model.CreateRecurringTask("Configuration Cleanup", func() {
doConfigCleanup(s)
}, time.Hour*24)
}
func (s *Server) runLicenseExpirationCheckJob() {
s.doLicenseExpirationCheck()
model.CreateRecurringTask("License Expiration Check", func() {
s.doLicenseExpirationCheck()
}, time.Hour*24)
}
func runReportToAWSMeterJob(s *Server) {
model.CreateRecurringTask("Collect and send usage report to AWS Metering Service", func() {
doReportUsageToAWSMeteringService(s)
}, time.Hour*model.AwsMeteringReportInterval)
}
func doReportUsageToAWSMeteringService(s *Server) {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(*s.platform.Config().ServiceSettings.AWSMeteringTimeoutSeconds)*time.Second)
defer cancel()
awsMeter := awsmeter.New(ctx, s.Store(), s.platform.Config())
if awsMeter == nil {
mlog.Error("Cannot obtain instance of AWS Metering Service.")
return
}
dimensions := []string{model.AwsMeteringDimensionUsageHrs}
reports := awsMeter.GetUserCategoryUsage(dimensions, time.Now().UTC(), time.Now().Add(-model.AwsMeteringReportInterval*time.Hour).UTC())
if err := awsMeter.ReportUserCategoryUsage(ctx, reports); err != nil {
mlog.Error("Failed to report usage to AWS Metering Service", mlog.Err(err))
}
}
func doSecurity(s *Server) {
s.DoSecurityUpdateCheck()
}
// Reports activated user count to the CWS every 24 hours
func runCloudUserCountReportJob(s *Server) {
model.CreateRecurringTask("Report user count for cloud subscription", func() {
s.doReportUserCountForCloudSubscriptionJob()
}, time.Hour*24)
}
func doTokenCleanup(s *Server) {
expiry := model.GetMillis() - model.MaxTokenExipryTime
mlog.Debug("Cleaning up token store.")
s.Store().Token().Cleanup(expiry)
}
func doCommandWebhookCleanup(s *Server) {
s.Store().CommandWebhook().Cleanup()
}
const (
sessionsCleanupBatchSize = 1000
jobsCleanupBatchSize = 1000
)
func doSessionCleanup(s *Server) {
mlog.Debug("Cleaning up session store.")
err := s.Store().Session().Cleanup(model.GetMillis(), sessionsCleanupBatchSize)
if err != nil {
mlog.Warn("Error while cleaning up sessions", mlog.Err(err))
}
}
func doJobsCleanup(s *Server) {
if *s.platform.Config().JobSettings.CleanupJobsThresholdDays < 0 {
return
}
mlog.Debug("Cleaning up jobs store.")
dur := time.Duration(*s.platform.Config().JobSettings.CleanupJobsThresholdDays) * time.Hour * 24
expiry := model.GetMillisForTime(time.Now().Add(-dur))
err := s.Store().Job().Cleanup(expiry, jobsCleanupBatchSize)
if err != nil {
mlog.Warn("Error while cleaning up jobs", mlog.Err(err))
}
}
func doConfigCleanup(s *Server) {
if *s.platform.Config().JobSettings.CleanupConfigThresholdDays < 0 || !config.IsDatabaseDSN(s.platform.DescribeConfig()) {
return
}
mlog.Info("Cleaning up configuration store.")
if err := s.platform.CleanUpConfig(); err != nil {
mlog.Warn("Error while cleaning up configurations", mlog.Err(err))
}
}
func (s *Server) HandleMetrics(route string, h http.Handler) {
s.platform.HandleMetrics(route, h)
}
func (s *Server) sendLicenseUpForRenewalEmail(users map[string]*model.User, license *model.License) *model.AppError {
key := model.LicenseUpForRenewalEmailSent + license.Id
if _, err := s.Store().System().GetByName(key); err == nil {
// return early because the key already exists and that means we already executed the code below to send email successfully
return nil
}
daysToExpiration := license.DaysToExpiration()
// we want to at least have one email sent out to an admin
countNotOks := 0
for _, user := range users {
if err := s.EmailService.SendLicenseUpForRenewalEmail(user.Email, user.Locale, daysToExpiration); err != nil {
mlog.Error("Error sending license up for renewal email to", mlog.String("user_email", user.Email), mlog.Err(err))
countNotOks++
}
}
// if not even one admin got an email, we consider that this operation errored
if countNotOks == len(users) {
return model.NewAppError("s.sendLicenseUpForRenewalEmail", "api.server.license_up_for_renewal.error_sending_email", nil, "", http.StatusInternalServerError)
}
system := model.System{
Name: key,
Value: "true",
}
if err := s.Store().System().Save(&system); err != nil {
mlog.Debug("Failed to mark license up for renewal email sending as completed.", mlog.Err(err))
}
return nil
}
func (s *Server) doReportUserCountForCloudSubscriptionJob() {
s.LoadLicense()
if !s.License().IsCloud() {
return
}
mlog.Debug("Reporting daily user count for cloud subscription.")
appInstance := New(ServerConnector(s.Channels()))
_, err := appInstance.SendSubscriptionHistoryEvent("")
if err != nil {
mlog.Error("an error occurred during daily user count reporting", mlog.Err(err))
}
mlog.Debug("Daily user count reported for cloud subscription.")
}
func (s *Server) doLicenseExpirationCheck() {
s.LoadLicense()
// This takes care of a rare edge case reported here https://mattermost.atlassian.net/browse/MM-40962
// To reproduce that case locally, attach a license to a server that was started with enterprise enabled
// Then restart using BUILD_ENTERPRISE=false make restart-server to enter Team Edition
if model.BuildEnterpriseReady != "true" {
mlog.Debug("Skipping license expiration check because no license is expected on Team Edition")
return
}
license := s.License()
if license == nil {
mlog.Debug("License cannot be found.")
return
}
if license.IsCloud() || license.IsMattermostEntry() {
return
}
users, err := s.Store().User().GetSystemAdminProfiles()
if err != nil {
mlog.Error("Failed to get system admins for license expired message from Mattermost.")
return
}
if license.IsWithinExpirationPeriod() {
appErr := s.sendLicenseUpForRenewalEmail(users, license)
if appErr != nil {
mlog.Debug(appErr.Error())
}
return
}
if !license.IsPastGracePeriod() {
mlog.Debug("License is not past the grace period.")
return
}
// send email to admin(s)
for _, user := range users {
if user.Email == "" {
mlog.Error("Invalid system admin email.", mlog.String("user_email", user.Email))
continue
}
mlog.Debug("Sending license expired email.", mlog.String("user_email", user.Email))
if err := s.SendRemoveExpiredLicenseEmail(user.Email, user.Locale); err != nil {
mlog.Error("Error while sending the license expired email.", mlog.String("user_email", user.Email), mlog.Err(err))
}
}
// remove the license
if appErr := s.RemoveLicense(); appErr != nil {
mlog.Error("Error while removing the license.", mlog.Err(appErr))
}
}
func (s *Server) SendRemoveExpiredLicenseEmail(email, locale string) *model.AppError {
if err := s.EmailService.SendRemoveExpiredLicenseEmail(email, locale); err != nil {
return model.NewAppError("SendRemoveExpiredLicenseEmail", "api.license.remove_expired_license.failed.error", nil, "", http.StatusInternalServerError).Wrap(err)
}
return nil
}
func (s *Server) FileBackend() filestore.FileBackend {
return s.platform.FileBackend()
}
func (s *Server) ExportFileBackend() filestore.FileBackend {
return s.platform.ExportFileBackend()
}
func (s *Server) TotalWebsocketConnections() int {
return s.Platform().TotalWebsocketConnections()
}
func (ch *Channels) ClientConfigHash() string {
return ch.srv.Platform().ClientConfigHash()
}
func (s *Server) initJobs() {
s.Jobs = jobs.NewJobServer(s.platform, s.Store(), s.GetMetrics(), s.Log())
if jobsDataRetentionJobInterface != nil {
builder := jobsDataRetentionJobInterface(s)
s.Jobs.RegisterJobType(model.JobTypeDataRetention, builder.MakeWorker(), builder.MakeScheduler())
}
if jobsMessageExportJobInterface != nil {
builder := jobsMessageExportJobInterface(s)
s.Jobs.RegisterJobType(model.JobTypeMessageExport, builder.MakeWorker(), builder.MakeScheduler())
}
if jobsElasticsearchAggregatorInterface != nil {
builder := jobsElasticsearchAggregatorInterface(s)
s.Jobs.RegisterJobType(model.JobTypeElasticsearchPostAggregation, builder.MakeWorker(), builder.MakeScheduler())
}
if jobsElasticsearchIndexerInterface != nil {
builder := jobsElasticsearchIndexerInterface(s)
s.Jobs.RegisterJobType(model.JobTypeElasticsearchPostIndexing, builder.MakeWorker(), nil)
}
if jobsLdapSyncInterface != nil {
builder := jobsLdapSyncInterface(New(ServerConnector(s.Channels())))
s.Jobs.RegisterJobType(model.JobTypeLdapSync, builder.MakeWorker(), builder.MakeScheduler())
}
if jobsAccessControlSyncJobInterface != nil {
builder := jobsAccessControlSyncJobInterface(s)
s.Jobs.RegisterJobType(model.JobTypeAccessControlSync, builder.MakeWorker(), builder.MakeScheduler())
}
if pushProxyInterface != nil {
builder := pushProxyInterface(New(ServerConnector(s.Channels())))
s.Jobs.RegisterJobType(model.JobTypePushProxyAuth, builder.MakeWorker(), builder.MakeScheduler())
}
if s.AutoTranslation != nil {
s.Jobs.RegisterJobType(model.JobTypeAutoTranslationRecovery,
s.AutoTranslation.MakeWorker(),
s.AutoTranslation.MakeScheduler())
}
s.Jobs.RegisterJobType(
model.JobTypeMigrations,
migrations.MakeWorker(s.Jobs, s.Store()),
migrations.MakeScheduler(s.Jobs, s.Store()),
)
s.Jobs.RegisterJobType(
model.JobTypePlugins,
plugins.MakeWorker(s.Jobs, New(ServerConnector(s.Channels()))),
plugins.MakeScheduler(s.Jobs),
)
s.Jobs.RegisterJobType(
model.JobTypeExpiryNotify,
expirynotify.MakeWorker(s.Jobs, New(ServerConnector(s.Channels())).NotifySessionsExpired),
expirynotify.MakeScheduler(s.Jobs),
)
s.Jobs.RegisterJobType(
model.JobTypeProductNotices,
product_notices.MakeWorker(s.Jobs, New(ServerConnector(s.Channels()))),
product_notices.MakeScheduler(s.Jobs),
)
s.Jobs.RegisterJobType(
model.JobTypeImportProcess,
import_process.MakeWorker(s.Jobs, New(ServerConnector(s.Channels()))),
nil,
)
s.Jobs.RegisterJobType(
model.JobTypeImportDelete,
import_delete.MakeWorker(s.Jobs, New(ServerConnector(s.Channels())), s.Store()),
import_delete.MakeScheduler(s.Jobs),
)
s.Jobs.RegisterJobType(
model.JobTypeS3PathMigration,
s3_path_migration.MakeWorker(s.Jobs, s.Store(), s.FileBackend()),
nil)
s.Jobs.RegisterJobType(
model.JobTypeDeleteEmptyDraftsMigration,
delete_empty_drafts_migration.MakeWorker(s.Jobs, s.Store(), New(ServerConnector(s.Channels()))),
nil)
s.Jobs.RegisterJobType(
model.JobTypeDeleteOrphanDraftsMigration,
delete_orphan_drafts_migration.MakeWorker(s.Jobs, s.Store(), New(ServerConnector(s.Channels()))),
nil)
s.Jobs.RegisterJobType(
model.JobTypeExportDelete,
export_delete.MakeWorker(s.Jobs, New(ServerConnector(s.Channels()))),
export_delete.MakeScheduler(s.Jobs),
)
s.Jobs.RegisterJobType(
model.JobTypeExportProcess,
export_process.MakeWorker(s.Jobs, New(ServerConnector(s.Channels()))),
nil,
)
s.Jobs.RegisterJobType(
model.JobTypeActiveUsers,
active_users.MakeWorker(s.Jobs, s.Store(), func() einterfaces.MetricsInterface { return s.GetMetrics() }),
active_users.MakeScheduler(s.Jobs),
)
s.Jobs.RegisterJobType(
model.JobTypeMobileSessionMetadata,
mobile_session_metadata.MakeWorker(s.Jobs, s.Store(), func() einterfaces.MetricsInterface { return s.GetMetrics() }),
mobile_session_metadata.MakeScheduler(s.Jobs),
)
s.Jobs.RegisterJobType(
model.JobTypeResendInvitationEmail,
resend_invitation_email.MakeWorker(s.Jobs, New(ServerConnector(s.Channels())), s.Store()),
nil,
)
s.Jobs.RegisterJobType(
model.JobTypeExtractContent,
extract_content.MakeWorker(s.Jobs, New(ServerConnector(s.Channels())), s.Store()),
nil,
)
s.Jobs.RegisterJobType(
model.JobTypeLastAccessiblePost,
last_accessible_post.MakeWorker(s.Jobs, s.License(), New(ServerConnector(s.Channels()))),
last_accessible_post.MakeScheduler(s.Jobs, s.License()),
)
s.Jobs.RegisterJobType(
model.JobTypeLastAccessibleFile,
last_accessible_file.MakeWorker(s.Jobs, s.License(), New(ServerConnector(s.Channels()))),
last_accessible_file.MakeScheduler(s.Jobs, s.License()),
)
s.Jobs.RegisterJobType(
model.JobTypeUpgradeNotifyAdmin,
notify_admin.MakeUpgradeNotifyWorker(s.Jobs, s.License(), New(ServerConnector(s.Channels()))),
notify_admin.MakeScheduler(s.Jobs, s.License(), model.JobTypeUpgradeNotifyAdmin),
)
s.Jobs.RegisterJobType(
model.JobTypeTrialNotifyAdmin,
notify_admin.MakeTrialNotifyWorker(s.Jobs, s.License(), New(ServerConnector(s.Channels()))),
notify_admin.MakeScheduler(s.Jobs, s.License(), model.JobTypeTrialNotifyAdmin),
)
s.Jobs.RegisterJobType(
model.JobTypePostPersistentNotifications,
post_persistent_notifications.MakeWorker(s.Jobs, New(ServerConnector(s.Channels()))),
post_persistent_notifications.MakeScheduler(s.Jobs, func() *model.License { return s.License() }),
)
s.Jobs.RegisterJobType(
model.JobTypeInstallPluginNotifyAdmin,
notify_admin.MakeInstallPluginNotifyWorker(s.Jobs, New(ServerConnector(s.Channels()))),
notify_admin.MakeInstallPluginScheduler(s.Jobs, s.License(), model.JobTypeInstallPluginNotifyAdmin),
)
s.Jobs.RegisterJobType(
model.JobTypeHostedPurchaseScreening,
hosted_purchase_screening.MakeWorker(s.Jobs, s.License(), s.Store().System()),
hosted_purchase_screening.MakeScheduler(s.Jobs, s.License()),
)
s.Jobs.RegisterJobType(
model.JobTypeCleanupDesktopTokens,
cleanup_desktop_tokens.MakeWorker(s.Jobs),
cleanup_desktop_tokens.MakeScheduler(s.Jobs),
)
s.Jobs.RegisterJobType(
model.JobTypeRefreshMaterializedViews,
refresh_materialized_views.MakeWorker(s.Jobs, *s.platform.Config().SqlSettings.DriverName),
refresh_materialized_views.MakeScheduler(s.Jobs, *s.platform.Config().SqlSettings.DriverName),
)
s.Jobs.RegisterJobType(
model.JobTypeExportUsersToCSV,
export_users_to_csv.MakeWorker(s.Jobs, s.Store(), New(ServerConnector(s.Channels()))),
nil,
)
s.Jobs.RegisterJobType(
model.JobTypeDeleteDmsPreferencesMigration,
delete_dms_preferences_migration.MakeWorker(s.Jobs, s.Store(), New(ServerConnector(s.Channels()))),
nil)
s.Jobs.RegisterJobType(
model.JobTypeRecap,
recap.MakeWorker(s.Jobs, s.Store(), New(ServerConnector(s.Channels()))),
nil,
)
s.Jobs.RegisterJobType(
model.JobTypeDeleteExpiredPosts,
delete_expired_posts.MakeWorker(s.Jobs, s.Store(), New(ServerConnector(s.Channels()))),
delete_expired_posts.MakeScheduler(s.Jobs),
)
s.platform.Jobs = s.Jobs
}
// ServerId returns the unique identifier for an installation of Mattermost servers.
//
// It is also known as the "telemetry id" or the "diagnostic id". Once generated
// on first start, the value is persisted to the database and should remain static
// for the lifetime of the installation.
//
// Only one server in a cluster will succeed in writing to the database on first
// start, after which the other servers will converge on the same value.
func (s *Server) ServerId() string {
if s.telemetryService != nil && s.telemetryService.ServerID != "" {
return s.telemetryService.ServerID
}
prop, err := s.Store().System().GetByNameWithContext(
store.RequestContextWithMaster(request.EmptyContext(s.Log())),
model.SystemServerId,
)
if err != nil {
return ""
}
return prop.Value
}
func (s *Server) HTTPService() httpservice.HTTPService {
return s.httpService
}
// GetStore returns the server's Store. Exposing via a method
// allows interfaces to be created with subsets of server APIs.
func (s *Server) GetStore() store.Store {
return s.Store()
}
// GetRemoteClusterService returns the `RemoteClusterService` instantiated by the server.
// May be nil if the service is not enabled via license.
func (s *Server) GetRemoteClusterService() remotecluster.RemoteClusterServiceIFace {
s.serviceMux.RLock()
defer s.serviceMux.RUnlock()
return s.remoteClusterService
}
// GetSharedChannelSyncService returns the `SharedChannelSyncService` instantiated by the server.
// May be nil if the service is not enabled via license.
func (s *Server) GetSharedChannelSyncService() SharedChannelServiceIFace {
s.serviceMux.RLock()
defer s.serviceMux.RUnlock()
return s.sharedChannelService
}
// GetMetrics returns the server's Metrics interface. Exposing via a method
// allows interfaces to be created with subsets of server APIs.
func (s *Server) GetMetrics() einterfaces.MetricsInterface {
if s.platform == nil {
return nil
}
return s.platform.Metrics()
}
// setSharedChannelSyncService sets the `SharedChannelSyncService` to be used by the server.
// For testing only.
func (s *Server) SetSharedChannelSyncService(sharedChannelService SharedChannelServiceIFace) {
s.serviceMux.Lock()
defer s.serviceMux.Unlock()
s.sharedChannelService = sharedChannelService
s.platform.SetSharedChannelService(sharedChannelService)
}
func (s *Server) GetProfileImage(user *model.User) ([]byte, bool, *model.AppError) {
if *s.platform.Config().FileSettings.DriverName == "" {
img, appErr := s.GetDefaultProfileImage(user)
if appErr != nil {
return nil, false, appErr
}
return img, false, nil
}
path := getProfileImagePath(user.Id)
data, err := s.ReadFile(path)
if err != nil {
img, appErr := s.GetDefaultProfileImage(user)
if appErr != nil {
return nil, false, appErr
}
if user.LastPictureUpdate == 0 {
if _, err := s.writeFile(bytes.NewReader(img), path); err != nil {
return nil, false, err
}
}
return img, true, nil
}
return data, false, nil
}
func (s *Server) GetDefaultProfileImage(user *model.User) ([]byte, *model.AppError) {
img, err := s.userService.GetDefaultProfileImage(user)
if err != nil {
switch {
case errors.Is(err, users.DefaultFontError):
return nil, model.NewAppError("GetDefaultProfileImage", "api.user.create_profile_image.default_font.app_error", nil, "", http.StatusInternalServerError).Wrap(err)
case errors.Is(err, users.UserInitialsError):
return nil, model.NewAppError("GetDefaultProfileImage", "api.user.create_profile_image.initial.app_error", nil, "", http.StatusInternalServerError).Wrap(err)
default:
return nil, model.NewAppError("GetDefaultProfileImage", "api.user.create_profile_image.encode.app_error", nil, "", http.StatusInternalServerError).Wrap(err)
}
}
return img, nil
}
func (s *Server) ReadFile(path string) ([]byte, *model.AppError) {
result, nErr := s.FileBackend().ReadFile(path)
if nErr != nil {
return nil, model.NewAppError("ReadFile", "api.file.read_file.app_error", nil, "", http.StatusInternalServerError).Wrap(nErr)
}
return result, nil
}
func withMut(mut *sync.Mutex, f func()) {
mut.Lock()
defer mut.Unlock()
f()
}
func cancelTask(mut *sync.Mutex, taskPointer **model.ScheduledTask) {
mut.Lock()
defer mut.Unlock()
if *taskPointer != nil {
(*taskPointer).Cancel()
*taskPointer = nil
}
}
func runDNDStatusExpireJob(a *App) {
if a.IsLeader() {
withMut(&a.ch.dndTaskMut, func() {
a.ch.dndTask = model.CreateRecurringTaskFromNextIntervalTime("Unset DND Statuses", a.UpdateDNDStatusOfUsers, model.DNDExpiryInterval)
})
} else {
mlog.Debug("Skipping unset DND status job startup since this is not the leader node")
}
a.ch.srv.AddClusterLeaderChangedListener(func() {
mlog.Info("Cluster leader changed. Determining if unset DNS status task should be running", mlog.Bool("isLeader", a.IsLeader()))
if a.IsLeader() {
withMut(&a.ch.dndTaskMut, func() {
a.ch.dndTask = model.CreateRecurringTaskFromNextIntervalTime("Unset DND Statuses", a.UpdateDNDStatusOfUsers, model.DNDExpiryInterval)
})
} else {
mlog.Debug("This is no longer leader node. Cancelling the unset DND status task", mlog.Bool("isLeader", a.IsLeader()))
cancelTask(&a.ch.dndTaskMut, &a.ch.dndTask)
}
})
}
func runPostReminderJob(a *App) {
if a.IsLeader() {
rctx := request.EmptyContext(a.Log())
withMut(&a.ch.postReminderMut, func() {
fn := func() { a.CheckPostReminders(rctx) }
a.ch.postReminderTask = model.CreateRecurringTaskFromNextIntervalTime("Check Post reminders", fn, 5*time.Minute)
})
} else {
mlog.Debug("Skipping post reminder job startup since this is not the leader node")
}
a.ch.srv.AddClusterLeaderChangedListener(func() {
mlog.Info("Cluster leader changed. Determining if post reminder task should be running", mlog.Bool("isLeader", a.IsLeader()))
if a.IsLeader() {
rctx := request.EmptyContext(a.Log())
withMut(&a.ch.postReminderMut, func() {
fn := func() { a.CheckPostReminders(rctx) }
a.ch.postReminderTask = model.CreateRecurringTaskFromNextIntervalTime("Check Post reminders", fn, 5*time.Minute)
})
} else {
mlog.Debug("This is no longer leader node. Cancelling the post reminder task", mlog.Bool("isLeader", a.IsLeader()))
cancelTask(&a.ch.postReminderMut, &a.ch.postReminderTask)
}
})
}
func runScheduledPostJob(a *App) {
if a.IsLeader() {
doRunScheduledPostJob(a)
} else {
mlog.Debug("Skipping scheduled posts job startup since this is not the leader node")
}
a.ch.srv.AddClusterLeaderChangedListener(func() {
mlog.Info("Cluster leader changed. Determining if scheduled posts task should be running", mlog.Bool("isLeader", a.IsLeader()))
if a.IsLeader() {
doRunScheduledPostJob(a)
} else {
mlog.Debug("This is no longer leader node. Cancelling the scheduled post task", mlog.Bool("isLeader", a.IsLeader()))
cancelTask(&a.ch.scheduledPostMut, &a.ch.scheduledPostTask)
}
})
}
func doRunScheduledPostJob(a *App) {
var jobInterval time.Duration
if *a.Config().ServiceSettings.EnableTesting {
jobInterval = debugScheduledPostJobInterval
} else {
jobInterval = scheduledPostJobInterval
}
rctx := request.EmptyContext(a.Log())
withMut(&a.ch.scheduledPostMut, func() {
fn := func() { a.ProcessScheduledPosts(rctx) }
a.ch.scheduledPostTask = model.CreateRecurringTaskFromNextIntervalTime("Process Scheduled Posts", fn, jobInterval)
})
}
func (a *App) GetAppliedSchemaMigrations() ([]model.AppliedMigration, *model.AppError) {
table, err := a.Srv().Store().GetAppliedMigrations()
if err != nil {
return nil, model.NewAppError("GetDBSchemaTable", "api.file.read_file.app_error", nil, "", http.StatusInternalServerError).Wrap(err)
}
return table, nil
}
// Expose platform service from server, this should be replaced with server itself in time.
func (s *Server) Platform() *platform.PlatformService {
return s.platform
}
func (s *Server) Log() *mlog.Logger {
return s.platform.Logger()
}