mattermost/server/channels/app/channels.go
Alejandro García Montoro 238867e247
MM-68732: Remove global mutex for login attempts in favour of database serialization (#36515)
* Add atomic login-attempt counter primitives to UserStore

Two new store methods back the upcoming switch from a global
per-node mutex to per-user atomic slot claiming:

  TryIncrementFailedPasswordAttempts(userID, maxAttempts) (bool, error)
    UPDATE Users SET FailedAttempts = FailedAttempts + 1
                 WHERE Id = ? AND FailedAttempts < maxAttempts
    Returns true when a slot was claimed (rows affected == 1) and
    false when the cap was already reached. The conditional UPDATE
    serialises concurrent attempts on the same user via the row
    lock, so the cap is enforced without any application-level
    locking and without serialising attempts across users.

  DecrementFailedPasswordAttempts(userID) error
    UPDATE Users SET FailedAttempts = FailedAttempts - 1
                 WHERE Id = ? AND FailedAttempts > 0
    Releases a slot previously claimed by TryIncrement when the
    in-flight authentication turns out not to be a credential
    failure. The conditional UPDATE means concurrent decrements
    cannot underflow.

Storetest covers both primitives: claim-below-cap, reject-at-cap,
reject-above-cap, no-op for unknown user, and a 50-goroutine
concurrent test with a start barrier asserting exactly
maxAttempts slots are ever claimed and that decrement clamps at
zero under contention.

The testify mock is regenerated here so the storetest package
that returns *mocks.UserStore as a store.UserStore still satisfies
the interface; the wrapper layers are regenerated in the next
commit.

------
AI assisted commit

* Regenerate store layers for the new primitives

Pick up TryIncrementFailedPasswordAttempts and
DecrementFailedPasswordAttempts in every generated wrapper:

  - retrylayer: retry on repeatable errors using the standard
    three-attempt loop.
  - timerlayer: record store-method duration metrics under
    UserStore.TryIncrementFailedPasswordAttempts and
    UserStore.DecrementFailedPasswordAttempts.
  - localcachelayer: invalidate the profile cache only after the
    underlying conditional UPDATE actually changes a row; an
    at-cap no-op return on TryIncrement no longer produces
    unnecessary cluster invalidation traffic.

------
AI assisted commit

* Drop login-attempt mutex; use per-user slot claiming

Replace the global per-node mutex that serialised every login
attempt with the database-side atomic slot machine added on the
Users row. Each of the three authentication entry points now
pre-claims a slot via TryIncrementFailedPasswordAttempts before
running the expensive password / LDAP / MFA check, and releases
the slot when the failure path is not a real credential mismatch:

  - CheckPasswordAndAllCriteria (email/password): refunds the
    slot on backend errors during the password check (malformed
    stored hash, hasher misc failure, password-migration write
    failure) so a transient infra issue cannot ratchet
    FailedAttempts to a lockout for a user with valid credentials;
    refunds on the MFA pre-flight probe (empty mfaToken on an
    MFA-enabled user) so the probe is not counted as a real
    attempt.
  - DoubleCheckPassword: same backend-error refund predicate.
  - checkLdapUserPasswordAndAllCriteria: pre-claims only for
    existing users (first-time LDAP users have no local row to
    claim against); refunds non-credential DoLogin errors (server
    unreachable, transient) so an LDAP outage cannot lock out
    everyone; refunds the MFA pre-flight probe; for first-time
    users, explicitly bumps the counter via UpdateFailedPasswordAttempts
    on a real bad-password or bad-MFA attempt, matching the
    pre-refactor counting behaviour.

If the refund itself fails the underlying authentication error is
preserved and returned to the caller (the failure is logged); a
leaked slot is annoying, but masking the real failure with a
generic store 500 would be a clear observability regression.

Cluster-wide behaviour also changes: the previous design honoured
MaximumLoginAttempts per node, so an n-node cluster effectively
permitted n * MaximumLoginAttempts attempts. The cap is now
enforced globally.

------
AI assisted commit

* Cover app-layer behaviors of the new login slot machine

The store-layer tests already exercise TryIncrement and Decrement
under concurrency and at the cap boundary. The new behavioural
contracts at the app layer were not covered, so a regression that
flipped a refund predicate, a probe condition, or a first-time
LDAP path would have slipped through type checking and existing
unit tests.

Add tests around the three callers of the new path:

  - CheckPasswordAndAllCriteria: an MFA pre-flight probe (empty
    token) does not consume a slot; a real attempt with a wrong
    non-empty token does; a backend error during the password
    check (malformed stored hash) refunds the slot; the happy
    path also asserts FailedAttempts resets to zero.
  - DoubleCheckPassword: gets its first test coverage, covering
    the happy path, rate-limit rejection once max attempts is
    reached, and the backend-error refund path.
  - checkLdapUserPasswordAndAllCriteria: covers paths the table
    loop did not exercise, first-time LDAP user with a bad
    password (uses GetUserByAuth to reach the freshly created
    row), first-time LDAP user with a wrong MFA token, existing
    LDAP user with a non-credential DoLogin error (slot
    refunded), and the existing LDAP user MFA pre-flight probe
    (slot refunded).

------
AI assisted commit

* Address coderabbit review

------
AI assisted commit

* Fix race in first-time LDAP failed-attempt counter

For first-time LDAP users we have no local row to pre-claim, so
the bad-password and bad-MFA branches fell back to an absolute
UpdateFailedPasswordAttempts(id, ldapUser.FailedAttempts+1) based
on a snapshot from GetUserByAuth. Concurrent first-attempt
requests for the same user could all read FailedAttempts == 0 and
all write 1, losing increments. As a secondary issue the absolute
set did not enforce MaximumLoginAttempts, so the counter could
also drift past the cap.

Switch both branches to TryIncrementFailedPasswordAttempts, the
atomic conditional UPDATE already used on every other path. The
row lock serialises concurrent increments and the predicate caps
at MaximumLoginAttempts.

A new concurrent storetest-style subtest runs
3 * maxFailedLoginAttempts goroutines through the first-time
bad-password path against the same fresh LDAP row and asserts
FailedAttempts lands at exactly maxFailedLoginAttempts. Against
the previous absolute-set implementation the test fails (observed
FailedAttempts = 4 with maxFailedLoginAttempts = 3, either a lost
increment or a cap overshoot).

The first-time bad-password branch also switches from a wrapped
500 return on store error to log-and-continue, matching the rest
of the file's refund/probe error handling: the underlying LDAP
authentication failure is the more useful error for the caller.

------
AI assisted commit

* Address review comments

------
AI assisted commit

---------

Co-authored-by: Mattermost Build <build@mattermost.com>
2026-05-18 10:16:32 +00:00

353 lines
11 KiB
Go

// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package app
import (
"net/http"
"os"
"os/signal"
"runtime"
"strings"
"sync"
"syscall"
"time"
"github.com/pkg/errors"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/plugin"
"github.com/mattermost/mattermost/server/public/shared/mlog"
"github.com/mattermost/mattermost/server/public/shared/request"
"github.com/mattermost/mattermost/server/v8/channels/app/imaging"
"github.com/mattermost/mattermost/server/v8/config"
"github.com/mattermost/mattermost/server/v8/einterfaces"
"github.com/mattermost/mattermost/server/v8/platform/services/imageproxy"
"github.com/mattermost/mattermost/server/v8/platform/shared/filestore"
)
type configService interface {
Config() *model.Config
AddConfigListener(listener func(*model.Config, *model.Config)) string
RemoveConfigListener(id string)
UpdateConfig(f func(*model.Config))
SaveConfig(newCfg *model.Config, sendConfigChangeClusterMessage bool) (*model.Config, *model.Config, *model.AppError)
}
// Channels contains all channels related state.
type Channels struct {
srv *Server
cfgSvc configService
filestore filestore.FileBackend
exportFilestore filestore.FileBackend
postActionCookieSecret []byte
pluginCommandsLock sync.RWMutex
pluginCommands []*PluginCommand
pluginsLock sync.RWMutex
pluginsEnvironment *plugin.Environment
pluginConfigListenerID string
pluginClusterLeaderListenerID string
imageProxy *imageproxy.ImageProxy
agentsBridge AgentsBridge
// cached counts that are used during notice condition validation
cachedPostCount int64
cachedUserCount int64
cachedDBMSVersion string
// previously fetched notices
cachedNotices model.ProductNotices
managedCategoryGroupID string
managedCategoryFieldID string
AccountMigration einterfaces.AccountMigrationInterface
Compliance einterfaces.ComplianceInterface
DataRetention einterfaces.DataRetentionInterface
MessageExport einterfaces.MessageExportInterface
Saml einterfaces.SamlInterface
Notification einterfaces.NotificationInterface
Ldap einterfaces.LdapInterface
AccessControl einterfaces.AccessControlServiceInterface
Intune einterfaces.IntuneInterface
attributeViewRefreshMut sync.Mutex
attributeViewRefreshLast time.Time
// These are used to prevent concurrent upload requests
// for a given upload session which could cause inconsistencies
// and data corruption.
uploadLockMapMut sync.Mutex
uploadLockMap map[string]bool
imgDecoder *imaging.Decoder
imgEncoder *imaging.Encoder
dndTaskMut sync.Mutex
dndTask *model.ScheduledTask
postReminderMut sync.Mutex
postReminderTask *model.ScheduledTask
interruptQuitChan chan struct{}
scheduledPostMut sync.Mutex
scheduledPostTask *model.ScheduledTask
}
func NewChannels(s *Server) (*Channels, error) {
ch := &Channels{
srv: s,
imageProxy: imageproxy.MakeImageProxy(s.platform, s.httpService, s.Log()),
uploadLockMap: map[string]bool{},
filestore: s.FileBackend(),
exportFilestore: s.ExportFileBackend(),
cfgSvc: s.Platform(),
interruptQuitChan: make(chan struct{}),
}
if s.agentsBridgeOverride != nil {
ch.agentsBridge = s.agentsBridgeOverride
} else {
ch.agentsBridge = newLiveAgentsBridge(ch)
}
// We are passing a partially filled Channels struct so that the enterprise
// methods can have access to app methods.
// Otherwise, passing server would mean it has to call s.Channels(),
// which would be nil at this point.
if complianceInterface != nil {
ch.Compliance = complianceInterface(New(ServerConnector(ch)))
}
if messageExportInterface != nil {
ch.MessageExport = messageExportInterface(New(ServerConnector(ch)))
}
if dataRetentionInterface != nil {
ch.DataRetention = dataRetentionInterface(New(ServerConnector(ch)))
}
if accountMigrationInterface != nil {
ch.AccountMigration = accountMigrationInterface(New(ServerConnector(ch)))
}
if ldapInterface != nil {
ch.Ldap = ldapInterface(New(ServerConnector(ch)))
}
if notificationInterface != nil {
ch.Notification = notificationInterface(New(ServerConnector(ch)))
}
if samlInterface != nil {
ch.Saml = samlInterface(New(ServerConnector(ch)))
if err := ch.Saml.ConfigureSP(request.EmptyContext(s.Log())); err != nil {
s.Log().Error("An error occurred while configuring SAML Service Provider", mlog.Err(err))
}
ch.AddConfigListener(func(_, _ *model.Config) {
if err := ch.Saml.ConfigureSP(request.EmptyContext(s.Log())); err != nil {
s.Log().Error("An error occurred while configuring SAML Service Provider", mlog.Err(err))
}
})
}
if intuneInterface != nil {
ch.Intune = intuneInterface(New(ServerConnector(ch)))
}
if pushProxyInterface != nil {
app := New(ServerConnector(ch))
s.PushProxy = pushProxyInterface(app)
// Add config listener to regenerate token when push proxy URL changes
app.AddConfigListener(func(oldCfg, newCfg *model.Config) {
// Only cluster leader should regenerate to avoid duplicate requests
if !app.IsLeader() {
return
}
oldURL := model.SafeDereference(oldCfg.EmailSettings.PushNotificationServer)
newURL := model.SafeDereference(newCfg.EmailSettings.PushNotificationServer)
// If push proxy URL changed
if oldURL != newURL {
if newURL != "" {
// URL changed to a new value, regenerate token
s.Log().Info("Push notification server URL changed, regenerating auth token",
mlog.String("old_url", oldURL),
mlog.String("new_url", newURL))
if err := s.PushProxy.GenerateAuthToken(); err != nil {
s.Log().Error("Failed to regenerate auth token after config change", mlog.Err(err))
}
} else if oldURL != "" {
// URL was cleared, delete the old token
s.Log().Info("Push notification server URL cleared, removing auth token")
if err := s.PushProxy.DeleteAuthToken(); err != nil {
s.Log().Error("Failed to delete auth token after URL cleared", mlog.Err(err))
}
}
}
})
}
if accessControlServiceInterface != nil {
app := New(ServerConnector(ch))
ch.AccessControl = accessControlServiceInterface(app)
appErr := ch.AccessControl.Init(request.EmptyContext(s.Log()))
if appErr != nil && appErr.StatusCode != http.StatusNotImplemented {
s.Log().Error("An error occurred while initializing Access Control", mlog.Err(appErr))
}
app.AddLicenseListener(func(newCfg, old *model.License) {
if ch.AccessControl != nil {
if appErr := ch.AccessControl.Init(request.EmptyContext(s.Log())); appErr != nil && appErr.StatusCode != http.StatusNotImplemented {
s.Log().Error("An error occurred while initializing Access Control", mlog.Err(appErr))
}
}
})
}
var imgErr error
decoderConcurrency := int(*ch.cfgSvc.Config().FileSettings.MaxImageDecoderConcurrency)
if decoderConcurrency == -1 {
decoderConcurrency = runtime.NumCPU()
}
ch.imgDecoder, imgErr = imaging.NewDecoder(imaging.DecoderOptions{
ConcurrencyLevel: decoderConcurrency,
})
if imgErr != nil {
return nil, errors.Wrap(imgErr, "failed to create image decoder")
}
ch.imgEncoder, imgErr = imaging.NewEncoder(imaging.EncoderOptions{
ConcurrencyLevel: runtime.NumCPU(),
})
if imgErr != nil {
return nil, errors.Wrap(imgErr, "failed to create image encoder")
}
// Setup routes.
pluginsRoute := ch.srv.Router.PathPrefix("/plugins/{plugin_id:[A-Za-z0-9\\_\\-\\.]+}").Subrouter()
pluginsRoute.HandleFunc("", ch.ServePluginRequest)
pluginsRoute.HandleFunc("/public/{public_file:.*}", ch.ServePluginPublicRequest)
pluginsRoute.HandleFunc("/{anything:.*}", ch.ServePluginRequest)
return ch, nil
}
func (ch *Channels) SetAgentsBridge(bridge AgentsBridge) {
ch.agentsBridge = bridge
}
func (ch *Channels) Start() error {
// Start plugins
ctx := request.EmptyContext(ch.srv.Log())
ch.initPlugins(ctx, *ch.cfgSvc.Config().PluginSettings.Directory, *ch.cfgSvc.Config().PluginSettings.ClientDirectory)
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
select {
case <-interruptChan:
if err := ch.Stop(); err != nil {
ch.srv.Log().Warn("Error stopping channels", mlog.Err(err))
}
os.Exit(1)
case <-ch.interruptQuitChan:
return
}
}()
ch.AddConfigListener(func(prevCfg, cfg *model.Config) {
// We compute the difference between configs
// to ensure we don't re-init plugins unnecessarily.
diffs, err := config.Diff(prevCfg, cfg)
if err != nil {
ch.srv.Log().Warn("Error in comparing configs", mlog.Err(err))
return
}
hasDiff := false
// TODO: This could be a method on ConfigDiffs itself
for _, diff := range diffs {
if strings.HasPrefix(diff.Path, "PluginSettings.") {
hasDiff = true
break
}
}
// Do only if some plugin related settings has changed.
if hasDiff {
if *cfg.PluginSettings.Enable {
ch.initPlugins(ctx, *cfg.PluginSettings.Directory, *ch.cfgSvc.Config().PluginSettings.ClientDirectory)
} else {
ch.ShutDownPlugins()
}
}
})
// TODO: This should be moved to the platform service.
if err := ch.srv.platform.EnsureAsymmetricSigningKey(); err != nil {
return errors.Wrapf(err, "unable to ensure asymmetric signing key")
}
if err := ch.ensurePostActionCookieSecret(); err != nil {
return errors.Wrapf(err, "unable to ensure PostAction cookie secret")
}
return nil
}
func (ch *Channels) Stop() error {
ch.ShutDownPlugins()
ch.dndTaskMut.Lock()
if ch.dndTask != nil {
ch.dndTask.Cancel()
}
ch.dndTaskMut.Unlock()
close(ch.interruptQuitChan)
return nil
}
func (ch *Channels) AddConfigListener(listener func(*model.Config, *model.Config)) string {
return ch.cfgSvc.AddConfigListener(listener)
}
func (ch *Channels) RemoveConfigListener(id string) {
ch.cfgSvc.RemoveConfigListener(id)
}
func (ch *Channels) RunMultiHook(hookRunnerFunc func(hooks plugin.Hooks, manifest *model.Manifest) bool, hookId int) {
if env := ch.GetPluginsEnvironment(); env != nil {
env.RunMultiPluginHook(hookRunnerFunc, hookId)
}
}
// RunMultiHookWithRPCErr dispatches a hook closure across active plugins, surfacing RPC transport
// errors. Returns nil in two cases that callers must distinguish themselves: (a) the plugin
// environment is unavailable (plugins disabled, or not yet initialized), so the closure was never
// invoked; (b) iteration completed and every closure invocation returned nil. Callers that need
// fail-closed semantics on case (a) must check `ch.GetPluginsEnvironment() != nil` at the call site
// before invoking — the right policy when plugins are disabled by config is caller-specific.
func (ch *Channels) RunMultiHookWithRPCErr(hookRunnerFunc func(hooks plugin.HooksWithRPCErr, manifest *model.Manifest) (bool, error), hookId int) error {
if env := ch.GetPluginsEnvironment(); env != nil {
return env.RunMultiPluginHookWithRPCErr(hookRunnerFunc, hookId)
}
return nil
}
func (ch *Channels) HooksForPlugin(id string) (plugin.Hooks, error) {
env := ch.GetPluginsEnvironment()
if env == nil {
return nil, errors.New("plugins are not initialized")
}
hooks, err := env.HooksForPlugin(id)
if err != nil {
return nil, err
}
return hooks, nil
}