mirror of
https://github.com/mattermost/mattermost.git
synced 2026-05-28 04:35:04 -04:00
* Add atomic login-attempt counter primitives to UserStore
Two new store methods back the upcoming switch from a global
per-node mutex to per-user atomic slot claiming:
TryIncrementFailedPasswordAttempts(userID, maxAttempts) (bool, error)
UPDATE Users SET FailedAttempts = FailedAttempts + 1
WHERE Id = ? AND FailedAttempts < maxAttempts
Returns true when a slot was claimed (rows affected == 1) and
false when the cap was already reached. The conditional UPDATE
serialises concurrent attempts on the same user via the row
lock, so the cap is enforced without any application-level
locking and without serialising attempts across users.
DecrementFailedPasswordAttempts(userID) error
UPDATE Users SET FailedAttempts = FailedAttempts - 1
WHERE Id = ? AND FailedAttempts > 0
Releases a slot previously claimed by TryIncrement when the
in-flight authentication turns out not to be a credential
failure. The conditional UPDATE means concurrent decrements
cannot underflow.
Storetest covers both primitives: claim-below-cap, reject-at-cap,
reject-above-cap, no-op for unknown user, and a 50-goroutine
concurrent test with a start barrier asserting exactly
maxAttempts slots are ever claimed and that decrement clamps at
zero under contention.
The testify mock is regenerated here so the storetest package
that returns *mocks.UserStore as a store.UserStore still satisfies
the interface; the wrapper layers are regenerated in the next
commit.
------
AI assisted commit
* Regenerate store layers for the new primitives
Pick up TryIncrementFailedPasswordAttempts and
DecrementFailedPasswordAttempts in every generated wrapper:
- retrylayer: retry on repeatable errors using the standard
three-attempt loop.
- timerlayer: record store-method duration metrics under
UserStore.TryIncrementFailedPasswordAttempts and
UserStore.DecrementFailedPasswordAttempts.
- localcachelayer: invalidate the profile cache only after the
underlying conditional UPDATE actually changes a row; an
at-cap no-op return on TryIncrement no longer produces
unnecessary cluster invalidation traffic.
------
AI assisted commit
* Drop login-attempt mutex; use per-user slot claiming
Replace the global per-node mutex that serialised every login
attempt with the database-side atomic slot machine added on the
Users row. Each of the three authentication entry points now
pre-claims a slot via TryIncrementFailedPasswordAttempts before
running the expensive password / LDAP / MFA check, and releases
the slot when the failure path is not a real credential mismatch:
- CheckPasswordAndAllCriteria (email/password): refunds the
slot on backend errors during the password check (malformed
stored hash, hasher misc failure, password-migration write
failure) so a transient infra issue cannot ratchet
FailedAttempts to a lockout for a user with valid credentials;
refunds on the MFA pre-flight probe (empty mfaToken on an
MFA-enabled user) so the probe is not counted as a real
attempt.
- DoubleCheckPassword: same backend-error refund predicate.
- checkLdapUserPasswordAndAllCriteria: pre-claims only for
existing users (first-time LDAP users have no local row to
claim against); refunds non-credential DoLogin errors (server
unreachable, transient) so an LDAP outage cannot lock out
everyone; refunds the MFA pre-flight probe; for first-time
users, explicitly bumps the counter via UpdateFailedPasswordAttempts
on a real bad-password or bad-MFA attempt, matching the
pre-refactor counting behaviour.
If the refund itself fails the underlying authentication error is
preserved and returned to the caller (the failure is logged); a
leaked slot is annoying, but masking the real failure with a
generic store 500 would be a clear observability regression.
Cluster-wide behaviour also changes: the previous design honoured
MaximumLoginAttempts per node, so an n-node cluster effectively
permitted n * MaximumLoginAttempts attempts. The cap is now
enforced globally.
------
AI assisted commit
* Cover app-layer behaviors of the new login slot machine
The store-layer tests already exercise TryIncrement and Decrement
under concurrency and at the cap boundary. The new behavioural
contracts at the app layer were not covered, so a regression that
flipped a refund predicate, a probe condition, or a first-time
LDAP path would have slipped through type checking and existing
unit tests.
Add tests around the three callers of the new path:
- CheckPasswordAndAllCriteria: an MFA pre-flight probe (empty
token) does not consume a slot; a real attempt with a wrong
non-empty token does; a backend error during the password
check (malformed stored hash) refunds the slot; the happy
path also asserts FailedAttempts resets to zero.
- DoubleCheckPassword: gets its first test coverage, covering
the happy path, rate-limit rejection once max attempts is
reached, and the backend-error refund path.
- checkLdapUserPasswordAndAllCriteria: covers paths the table
loop did not exercise, first-time LDAP user with a bad
password (uses GetUserByAuth to reach the freshly created
row), first-time LDAP user with a wrong MFA token, existing
LDAP user with a non-credential DoLogin error (slot
refunded), and the existing LDAP user MFA pre-flight probe
(slot refunded).
------
AI assisted commit
* Address coderabbit review
------
AI assisted commit
* Fix race in first-time LDAP failed-attempt counter
For first-time LDAP users we have no local row to pre-claim, so
the bad-password and bad-MFA branches fell back to an absolute
UpdateFailedPasswordAttempts(id, ldapUser.FailedAttempts+1) based
on a snapshot from GetUserByAuth. Concurrent first-attempt
requests for the same user could all read FailedAttempts == 0 and
all write 1, losing increments. As a secondary issue the absolute
set did not enforce MaximumLoginAttempts, so the counter could
also drift past the cap.
Switch both branches to TryIncrementFailedPasswordAttempts, the
atomic conditional UPDATE already used on every other path. The
row lock serialises concurrent increments and the predicate caps
at MaximumLoginAttempts.
A new concurrent storetest-style subtest runs
3 * maxFailedLoginAttempts goroutines through the first-time
bad-password path against the same fresh LDAP row and asserts
FailedAttempts lands at exactly maxFailedLoginAttempts. Against
the previous absolute-set implementation the test fails (observed
FailedAttempts = 4 with maxFailedLoginAttempts = 3, either a lost
increment or a cap overshoot).
The first-time bad-password branch also switches from a wrapped
500 return on store error to log-and-continue, matching the rest
of the file's refund/probe error handling: the underlying LDAP
authentication failure is the more useful error for the caller.
------
AI assisted commit
* Address review comments
------
AI assisted commit
---------
Co-authored-by: Mattermost Build <build@mattermost.com>
353 lines
11 KiB
Go
353 lines
11 KiB
Go
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
|
// See LICENSE.txt for license information.
|
|
|
|
package app
|
|
|
|
import (
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/mattermost/mattermost/server/public/model"
|
|
"github.com/mattermost/mattermost/server/public/plugin"
|
|
"github.com/mattermost/mattermost/server/public/shared/mlog"
|
|
"github.com/mattermost/mattermost/server/public/shared/request"
|
|
"github.com/mattermost/mattermost/server/v8/channels/app/imaging"
|
|
"github.com/mattermost/mattermost/server/v8/config"
|
|
"github.com/mattermost/mattermost/server/v8/einterfaces"
|
|
"github.com/mattermost/mattermost/server/v8/platform/services/imageproxy"
|
|
"github.com/mattermost/mattermost/server/v8/platform/shared/filestore"
|
|
)
|
|
|
|
type configService interface {
|
|
Config() *model.Config
|
|
AddConfigListener(listener func(*model.Config, *model.Config)) string
|
|
RemoveConfigListener(id string)
|
|
UpdateConfig(f func(*model.Config))
|
|
SaveConfig(newCfg *model.Config, sendConfigChangeClusterMessage bool) (*model.Config, *model.Config, *model.AppError)
|
|
}
|
|
|
|
// Channels contains all channels related state.
|
|
type Channels struct {
|
|
srv *Server
|
|
cfgSvc configService
|
|
filestore filestore.FileBackend
|
|
exportFilestore filestore.FileBackend
|
|
|
|
postActionCookieSecret []byte
|
|
|
|
pluginCommandsLock sync.RWMutex
|
|
pluginCommands []*PluginCommand
|
|
pluginsLock sync.RWMutex
|
|
pluginsEnvironment *plugin.Environment
|
|
pluginConfigListenerID string
|
|
pluginClusterLeaderListenerID string
|
|
|
|
imageProxy *imageproxy.ImageProxy
|
|
|
|
agentsBridge AgentsBridge
|
|
|
|
// cached counts that are used during notice condition validation
|
|
cachedPostCount int64
|
|
cachedUserCount int64
|
|
cachedDBMSVersion string
|
|
// previously fetched notices
|
|
cachedNotices model.ProductNotices
|
|
|
|
managedCategoryGroupID string
|
|
managedCategoryFieldID string
|
|
|
|
AccountMigration einterfaces.AccountMigrationInterface
|
|
Compliance einterfaces.ComplianceInterface
|
|
DataRetention einterfaces.DataRetentionInterface
|
|
MessageExport einterfaces.MessageExportInterface
|
|
Saml einterfaces.SamlInterface
|
|
Notification einterfaces.NotificationInterface
|
|
Ldap einterfaces.LdapInterface
|
|
AccessControl einterfaces.AccessControlServiceInterface
|
|
Intune einterfaces.IntuneInterface
|
|
|
|
attributeViewRefreshMut sync.Mutex
|
|
attributeViewRefreshLast time.Time
|
|
|
|
// These are used to prevent concurrent upload requests
|
|
// for a given upload session which could cause inconsistencies
|
|
// and data corruption.
|
|
uploadLockMapMut sync.Mutex
|
|
uploadLockMap map[string]bool
|
|
|
|
imgDecoder *imaging.Decoder
|
|
imgEncoder *imaging.Encoder
|
|
|
|
dndTaskMut sync.Mutex
|
|
dndTask *model.ScheduledTask
|
|
|
|
postReminderMut sync.Mutex
|
|
postReminderTask *model.ScheduledTask
|
|
|
|
interruptQuitChan chan struct{}
|
|
scheduledPostMut sync.Mutex
|
|
scheduledPostTask *model.ScheduledTask
|
|
}
|
|
|
|
func NewChannels(s *Server) (*Channels, error) {
|
|
ch := &Channels{
|
|
srv: s,
|
|
imageProxy: imageproxy.MakeImageProxy(s.platform, s.httpService, s.Log()),
|
|
uploadLockMap: map[string]bool{},
|
|
filestore: s.FileBackend(),
|
|
exportFilestore: s.ExportFileBackend(),
|
|
cfgSvc: s.Platform(),
|
|
interruptQuitChan: make(chan struct{}),
|
|
}
|
|
|
|
if s.agentsBridgeOverride != nil {
|
|
ch.agentsBridge = s.agentsBridgeOverride
|
|
} else {
|
|
ch.agentsBridge = newLiveAgentsBridge(ch)
|
|
}
|
|
|
|
// We are passing a partially filled Channels struct so that the enterprise
|
|
// methods can have access to app methods.
|
|
// Otherwise, passing server would mean it has to call s.Channels(),
|
|
// which would be nil at this point.
|
|
if complianceInterface != nil {
|
|
ch.Compliance = complianceInterface(New(ServerConnector(ch)))
|
|
}
|
|
if messageExportInterface != nil {
|
|
ch.MessageExport = messageExportInterface(New(ServerConnector(ch)))
|
|
}
|
|
if dataRetentionInterface != nil {
|
|
ch.DataRetention = dataRetentionInterface(New(ServerConnector(ch)))
|
|
}
|
|
if accountMigrationInterface != nil {
|
|
ch.AccountMigration = accountMigrationInterface(New(ServerConnector(ch)))
|
|
}
|
|
if ldapInterface != nil {
|
|
ch.Ldap = ldapInterface(New(ServerConnector(ch)))
|
|
}
|
|
if notificationInterface != nil {
|
|
ch.Notification = notificationInterface(New(ServerConnector(ch)))
|
|
}
|
|
if samlInterface != nil {
|
|
ch.Saml = samlInterface(New(ServerConnector(ch)))
|
|
if err := ch.Saml.ConfigureSP(request.EmptyContext(s.Log())); err != nil {
|
|
s.Log().Error("An error occurred while configuring SAML Service Provider", mlog.Err(err))
|
|
}
|
|
|
|
ch.AddConfigListener(func(_, _ *model.Config) {
|
|
if err := ch.Saml.ConfigureSP(request.EmptyContext(s.Log())); err != nil {
|
|
s.Log().Error("An error occurred while configuring SAML Service Provider", mlog.Err(err))
|
|
}
|
|
})
|
|
}
|
|
|
|
if intuneInterface != nil {
|
|
ch.Intune = intuneInterface(New(ServerConnector(ch)))
|
|
}
|
|
|
|
if pushProxyInterface != nil {
|
|
app := New(ServerConnector(ch))
|
|
s.PushProxy = pushProxyInterface(app)
|
|
|
|
// Add config listener to regenerate token when push proxy URL changes
|
|
app.AddConfigListener(func(oldCfg, newCfg *model.Config) {
|
|
// Only cluster leader should regenerate to avoid duplicate requests
|
|
if !app.IsLeader() {
|
|
return
|
|
}
|
|
|
|
oldURL := model.SafeDereference(oldCfg.EmailSettings.PushNotificationServer)
|
|
newURL := model.SafeDereference(newCfg.EmailSettings.PushNotificationServer)
|
|
|
|
// If push proxy URL changed
|
|
if oldURL != newURL {
|
|
if newURL != "" {
|
|
// URL changed to a new value, regenerate token
|
|
s.Log().Info("Push notification server URL changed, regenerating auth token",
|
|
mlog.String("old_url", oldURL),
|
|
mlog.String("new_url", newURL))
|
|
|
|
if err := s.PushProxy.GenerateAuthToken(); err != nil {
|
|
s.Log().Error("Failed to regenerate auth token after config change", mlog.Err(err))
|
|
}
|
|
} else if oldURL != "" {
|
|
// URL was cleared, delete the old token
|
|
s.Log().Info("Push notification server URL cleared, removing auth token")
|
|
if err := s.PushProxy.DeleteAuthToken(); err != nil {
|
|
s.Log().Error("Failed to delete auth token after URL cleared", mlog.Err(err))
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
if accessControlServiceInterface != nil {
|
|
app := New(ServerConnector(ch))
|
|
ch.AccessControl = accessControlServiceInterface(app)
|
|
|
|
appErr := ch.AccessControl.Init(request.EmptyContext(s.Log()))
|
|
if appErr != nil && appErr.StatusCode != http.StatusNotImplemented {
|
|
s.Log().Error("An error occurred while initializing Access Control", mlog.Err(appErr))
|
|
}
|
|
|
|
app.AddLicenseListener(func(newCfg, old *model.License) {
|
|
if ch.AccessControl != nil {
|
|
if appErr := ch.AccessControl.Init(request.EmptyContext(s.Log())); appErr != nil && appErr.StatusCode != http.StatusNotImplemented {
|
|
s.Log().Error("An error occurred while initializing Access Control", mlog.Err(appErr))
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
var imgErr error
|
|
decoderConcurrency := int(*ch.cfgSvc.Config().FileSettings.MaxImageDecoderConcurrency)
|
|
if decoderConcurrency == -1 {
|
|
decoderConcurrency = runtime.NumCPU()
|
|
}
|
|
ch.imgDecoder, imgErr = imaging.NewDecoder(imaging.DecoderOptions{
|
|
ConcurrencyLevel: decoderConcurrency,
|
|
})
|
|
if imgErr != nil {
|
|
return nil, errors.Wrap(imgErr, "failed to create image decoder")
|
|
}
|
|
ch.imgEncoder, imgErr = imaging.NewEncoder(imaging.EncoderOptions{
|
|
ConcurrencyLevel: runtime.NumCPU(),
|
|
})
|
|
if imgErr != nil {
|
|
return nil, errors.Wrap(imgErr, "failed to create image encoder")
|
|
}
|
|
|
|
// Setup routes.
|
|
pluginsRoute := ch.srv.Router.PathPrefix("/plugins/{plugin_id:[A-Za-z0-9\\_\\-\\.]+}").Subrouter()
|
|
pluginsRoute.HandleFunc("", ch.ServePluginRequest)
|
|
pluginsRoute.HandleFunc("/public/{public_file:.*}", ch.ServePluginPublicRequest)
|
|
pluginsRoute.HandleFunc("/{anything:.*}", ch.ServePluginRequest)
|
|
|
|
return ch, nil
|
|
}
|
|
|
|
func (ch *Channels) SetAgentsBridge(bridge AgentsBridge) {
|
|
ch.agentsBridge = bridge
|
|
}
|
|
|
|
func (ch *Channels) Start() error {
|
|
// Start plugins
|
|
ctx := request.EmptyContext(ch.srv.Log())
|
|
ch.initPlugins(ctx, *ch.cfgSvc.Config().PluginSettings.Directory, *ch.cfgSvc.Config().PluginSettings.ClientDirectory)
|
|
|
|
interruptChan := make(chan os.Signal, 1)
|
|
signal.Notify(interruptChan, syscall.SIGINT, syscall.SIGTERM)
|
|
go func() {
|
|
select {
|
|
case <-interruptChan:
|
|
if err := ch.Stop(); err != nil {
|
|
ch.srv.Log().Warn("Error stopping channels", mlog.Err(err))
|
|
}
|
|
os.Exit(1)
|
|
case <-ch.interruptQuitChan:
|
|
return
|
|
}
|
|
}()
|
|
|
|
ch.AddConfigListener(func(prevCfg, cfg *model.Config) {
|
|
// We compute the difference between configs
|
|
// to ensure we don't re-init plugins unnecessarily.
|
|
diffs, err := config.Diff(prevCfg, cfg)
|
|
if err != nil {
|
|
ch.srv.Log().Warn("Error in comparing configs", mlog.Err(err))
|
|
return
|
|
}
|
|
|
|
hasDiff := false
|
|
// TODO: This could be a method on ConfigDiffs itself
|
|
for _, diff := range diffs {
|
|
if strings.HasPrefix(diff.Path, "PluginSettings.") {
|
|
hasDiff = true
|
|
break
|
|
}
|
|
}
|
|
|
|
// Do only if some plugin related settings has changed.
|
|
if hasDiff {
|
|
if *cfg.PluginSettings.Enable {
|
|
ch.initPlugins(ctx, *cfg.PluginSettings.Directory, *ch.cfgSvc.Config().PluginSettings.ClientDirectory)
|
|
} else {
|
|
ch.ShutDownPlugins()
|
|
}
|
|
}
|
|
})
|
|
|
|
// TODO: This should be moved to the platform service.
|
|
if err := ch.srv.platform.EnsureAsymmetricSigningKey(); err != nil {
|
|
return errors.Wrapf(err, "unable to ensure asymmetric signing key")
|
|
}
|
|
|
|
if err := ch.ensurePostActionCookieSecret(); err != nil {
|
|
return errors.Wrapf(err, "unable to ensure PostAction cookie secret")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ch *Channels) Stop() error {
|
|
ch.ShutDownPlugins()
|
|
|
|
ch.dndTaskMut.Lock()
|
|
if ch.dndTask != nil {
|
|
ch.dndTask.Cancel()
|
|
}
|
|
ch.dndTaskMut.Unlock()
|
|
|
|
close(ch.interruptQuitChan)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ch *Channels) AddConfigListener(listener func(*model.Config, *model.Config)) string {
|
|
return ch.cfgSvc.AddConfigListener(listener)
|
|
}
|
|
|
|
func (ch *Channels) RemoveConfigListener(id string) {
|
|
ch.cfgSvc.RemoveConfigListener(id)
|
|
}
|
|
|
|
func (ch *Channels) RunMultiHook(hookRunnerFunc func(hooks plugin.Hooks, manifest *model.Manifest) bool, hookId int) {
|
|
if env := ch.GetPluginsEnvironment(); env != nil {
|
|
env.RunMultiPluginHook(hookRunnerFunc, hookId)
|
|
}
|
|
}
|
|
|
|
// RunMultiHookWithRPCErr dispatches a hook closure across active plugins, surfacing RPC transport
|
|
// errors. Returns nil in two cases that callers must distinguish themselves: (a) the plugin
|
|
// environment is unavailable (plugins disabled, or not yet initialized), so the closure was never
|
|
// invoked; (b) iteration completed and every closure invocation returned nil. Callers that need
|
|
// fail-closed semantics on case (a) must check `ch.GetPluginsEnvironment() != nil` at the call site
|
|
// before invoking — the right policy when plugins are disabled by config is caller-specific.
|
|
func (ch *Channels) RunMultiHookWithRPCErr(hookRunnerFunc func(hooks plugin.HooksWithRPCErr, manifest *model.Manifest) (bool, error), hookId int) error {
|
|
if env := ch.GetPluginsEnvironment(); env != nil {
|
|
return env.RunMultiPluginHookWithRPCErr(hookRunnerFunc, hookId)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (ch *Channels) HooksForPlugin(id string) (plugin.Hooks, error) {
|
|
env := ch.GetPluginsEnvironment()
|
|
if env == nil {
|
|
return nil, errors.New("plugins are not initialized")
|
|
}
|
|
|
|
hooks, err := env.HooksForPlugin(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return hooks, nil
|
|
}
|