MM-68179: Run sendLoop workers on all HA nodes (#35909)

* MM-68179: Run sendLoop workers on all HA nodes

  In HA clusters, sendLoop worker goroutines only ran on the leader node.
  When an API request to send a channel invite landed on a non-leader node,
  SendMsg enqueued the task to a local in-memory channel but no goroutine
  consumed it, silently losing the message. Fix by starting sendLoop workers
  in Start() on all nodes, independent of the leader-only ping lifecycle.

  - Separate sendLoop lifecycle (Start/Shutdown) from ping lifecycle
    (pingStart/pingStop on leader change)
  - Rename resume/pause to pingStart/pingStop for clarity
  - Change Active() to mean "service started" via atomic.Bool
  - Remove SetActive (no longer needed; tests use Start())

* address review comment

* Added idempotency guard to Start()

* Start() and Shutdown(): CompareAndSwap instead of Load/Store — eliminates races where concurrent calls could both proceed. Only the winner of the CAS executes; the loser returns nil
  immediately.

Ping test: replaced time.Sleep with assert.Never/assert.Eventually — no more brittle fixed sleeps. Uses assert.Never to verify no pings fire on non-leader, and assert.Eventually to
  verify pings stop after losing leadership (snapshot-then-compare pattern).

* make unit tests parallel capable

---------

Co-authored-by: Mattermost Build <build@mattermost.com>
This commit is contained in:
Doug Lauder 2026-04-07 12:49:54 -04:00 committed by GitHub
parent 83819e3db4
commit 540ccc599b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 425 additions and 81 deletions

View file

@ -13,7 +13,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/v8/platform/services/remotecluster"
"github.com/mattermost/mattermost/server/v8/platform/services/sharedchannel"
)
@ -54,9 +53,6 @@ func setupTestEnvironment(t *testing.T) (*TestHelper, *sharedchannel.Service) {
rcService := th.App.Srv().GetRemoteClusterService()
if rcService != nil {
_ = rcService.Start()
if rc, ok := rcService.(*remotecluster.Service); ok {
rc.SetActive(true)
}
require.True(t, rcService.Active(), "RemoteClusterService should be active")
}

View file

@ -235,9 +235,6 @@ func EnsureCleanState(t *testing.T, th *TestHelper, ss store.Store) {
rcService := th.App.Srv().GetRemoteClusterService()
if rcService != nil {
if rc, ok := rcService.(*remotecluster.Service); ok {
rc.SetActive(true)
}
require.Eventually(t, func() bool {
return rcService.Active()
}, 5*time.Second, 100*time.Millisecond, "Remote cluster service should be active")

View file

@ -61,11 +61,6 @@ func TestSharedChannelGlobalUserSyncSelfReferential(t *testing.T) {
if rcService != nil {
_ = rcService.Start()
// Force the service to be active in test environment
if rc, ok := rcService.(*remotecluster.Service); ok {
rc.SetActive(true)
}
// Verify it's active
if !rcService.Active() {
t.Fatalf("RemoteClusterService is not active after Start")

View file

@ -48,11 +48,6 @@ func TestSharedChannelMembershipSyncSelfReferential(t *testing.T) {
if rcService != nil {
_ = rcService.Start()
// Force the service to be active in test environment
if rc, ok := rcService.(*remotecluster.Service); ok {
rc.SetActive(true)
}
// Wait for remote cluster service to be active
require.Eventually(t, func() bool {
return rcService.Active()

View file

@ -300,9 +300,6 @@ func EnsureCleanState(t *testing.T, th *TestHelper, ss store.Store) {
rcService := th.App.Srv().GetRemoteClusterService()
if rcService != nil {
if rc, ok := rcService.(*remotecluster.Service); ok {
rc.SetActive(true)
}
require.Eventually(t, func() bool {
return rcService.Active()
}, 5*time.Second, 100*time.Millisecond, "Remote cluster service should be active")

View file

@ -24,8 +24,6 @@ const (
)
func TestPing(t *testing.T) {
disablePing = false
t.Run("No error", func(t *testing.T) {
merr := merror.New()

View file

@ -34,7 +34,6 @@ type testPayload struct {
func TestBroadcastMsg(t *testing.T) {
msgId := model.NewId()
disablePing = true
t.Run("No error", func(t *testing.T) {
var countCallbacks atomic.Int32
@ -87,6 +86,7 @@ func TestBroadcastMsg(t *testing.T) {
service, err := NewRemoteClusterService(mockServer, mockApp)
require.NoError(t, err)
service.disablePing = true
err = service.Start()
require.NoError(t, err)
@ -144,6 +144,7 @@ func TestBroadcastMsg(t *testing.T) {
service, err := NewRemoteClusterService(mockServer, mockApp)
require.NoError(t, err)
service.disablePing = true
err = service.Start()
require.NoError(t, err)

View file

@ -27,10 +27,6 @@ const (
)
func TestService_sendProfileImageToRemote(t *testing.T) {
hadPing := disablePing
disablePing = true
defer func() { disablePing = hadPing }()
shouldError := &flag{}
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@ -107,6 +103,7 @@ func TestService_sendProfileImageToRemote(t *testing.T) {
service, err := NewRemoteClusterService(mockServer, mockApp)
require.NoError(t, err)
service.disablePing = true
err = service.Start()
require.NoError(t, err)

View file

@ -8,6 +8,7 @@ import (
"net"
"net/http"
"sync"
"sync/atomic"
"time"
"github.com/mattermost/mattermost/server/public/model"
@ -36,10 +37,6 @@ const (
InviteExpiresAfter = time.Hour * 48
)
var (
disablePing bool // override for testing
)
type ServerIface interface {
Config() *model.Config
IsLeader() bool
@ -83,18 +80,20 @@ type ConnectionStateListener func(rc *model.RemoteCluster, online bool)
// Service provides inter-cluster communication via topic based messages. In product these are called "Secured Connections".
type Service struct {
server ServerIface
app AppIface
httpClient *http.Client
send []chan any
server ServerIface
app AppIface
httpClient *http.Client
send []chan any
done chan struct{} // signals sendLoop workers to stop; lifecycle: Start -> Shutdown
active atomic.Bool // true after Start(), false after Shutdown()
disablePing bool // when true, pingStart skips launching the ping loop; for testing
// everything below guarded by `mux`
mux sync.RWMutex
active bool
leaderListenerId string
topicListeners map[string]map[string]TopicListener // maps topic id to a map of listenerid->listener
connectionStateListeners map[string]ConnectionStateListener // maps listener id to listener
done chan struct{}
pingDone chan struct{} // signals ping workers to stop; lifecycle: pingStart -> pingStop
pingFreq time.Duration
}
@ -140,6 +139,14 @@ func NewRemoteClusterService(server ServerIface, app AppIface) (*Service, error)
// Start is called by the server on server start-up.
func (rcs *Service) Start() error {
if !rcs.active.CompareAndSwap(false, true) {
return nil
}
rcs.done = make(chan struct{})
for i := range rcs.send {
go rcs.sendLoop(i, rcs.done)
}
rcs.mux.Lock()
rcs.leaderListenerId = rcs.server.AddClusterLeaderChangedListener(rcs.onClusterLeaderChange)
rcs.mux.Unlock()
@ -151,17 +158,22 @@ func (rcs *Service) Start() error {
// Shutdown is called by the server on server shutdown.
func (rcs *Service) Shutdown() error {
rcs.server.RemoveClusterLeaderChangedListener(rcs.leaderListenerId)
rcs.pause()
if !rcs.active.CompareAndSwap(true, false) {
return nil
}
rcs.mux.Lock()
id := rcs.leaderListenerId
rcs.mux.Unlock()
rcs.server.RemoveClusterLeaderChangedListener(id)
rcs.pingStop()
close(rcs.done)
return nil
}
// Active returns true if this instance of the remote cluster service is active.
// The active instance is responsible for pinging and sending messages to remotes.
// Active returns true if this instance of the remote cluster service has been started.
func (rcs *Service) Active() bool {
rcs.mux.Lock()
defer rcs.mux.Unlock()
return rcs.active
return rcs.active.Load()
}
// GetPingFreq gets the frequency of pings to each remote.
@ -244,64 +256,41 @@ func (rcs *Service) RemoveConnectionStateListener(listenerId string) {
// onClusterLeaderChange is called whenever the cluster leader may have changed.
func (rcs *Service) onClusterLeaderChange() {
if rcs.server.IsLeader() {
rcs.resume()
rcs.pingStart()
} else {
rcs.pause()
rcs.pingStop()
}
}
func (rcs *Service) resume() {
func (rcs *Service) pingStart() {
rcs.mux.Lock()
defer rcs.mux.Unlock()
if rcs.active {
return // already active
if rcs.pingDone != nil {
return
}
rcs.active = true
rcs.done = make(chan struct{})
rcs.pingDone = make(chan struct{})
if !disablePing {
if !rcs.disablePing {
// first ping all the plugin remotes immediately, synchronously.
rcs.pingAllNow(model.RemoteClusterQueryFilter{OnlyPlugins: true})
// start the async ping loop
rcs.pingLoop(rcs.done)
rcs.pingLoop(rcs.pingDone)
}
// create thread pool for concurrent message sending.
for i := range rcs.send {
go rcs.sendLoop(i, rcs.done)
}
rcs.server.Log().Debug("Remote Cluster Service active")
rcs.server.Log().Debug("Remote Cluster Service ping active")
}
func (rcs *Service) pause() {
func (rcs *Service) pingStop() {
rcs.mux.Lock()
defer rcs.mux.Unlock()
if !rcs.active {
return // already inactive
}
rcs.active = false
close(rcs.done)
rcs.done = nil
rcs.server.Log().Debug("Remote Cluster Service inactive")
}
// SetActive forces the service to be active or inactive for testing
func (rcs *Service) SetActive(active bool) {
rcs.mux.Lock()
defer rcs.mux.Unlock()
if rcs.active == active {
if rcs.pingDone == nil {
return
}
close(rcs.pingDone)
rcs.pingDone = nil
if active {
rcs.resume()
} else {
rcs.pause()
}
rcs.server.Log().Debug("Remote Cluster Service ping inactive")
}

View file

@ -4,13 +4,24 @@
package remotecluster
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/plugin/plugintest/mock"
"github.com/mattermost/mattermost/server/public/shared/mlog"
"github.com/mattermost/mattermost/server/v8/channels/store"
"github.com/mattermost/mattermost/server/v8/channels/store/storetest/mocks"
"github.com/mattermost/mattermost/server/v8/einterfaces"
)
func TestService_AddTopicListener(t *testing.T) {
@ -71,3 +82,371 @@ func TestService_AddTopicListener(t *testing.T) {
listeners = service.getTopicListeners("test")
assert.Empty(t, listeners)
}
// leaderAwareMockServer is a mock server that supports toggling leader state
// and firing leader-change listeners, allowing lifecycle tests to simulate
// HA cluster leader transitions.
type leaderAwareMockServer struct {
remotes []*model.RemoteCluster
logger *mlog.Logger
isLeader atomic.Bool
mux sync.Mutex
listeners map[string]func()
}
func newLeaderAwareMockServer(t *testing.T, remotes []*model.RemoteCluster, leader bool) *leaderAwareMockServer {
ms := &leaderAwareMockServer{
remotes: remotes,
logger: mlog.CreateConsoleTestLogger(t),
listeners: make(map[string]func()),
}
ms.isLeader.Store(leader)
return ms
}
func (ms *leaderAwareMockServer) Config() *model.Config { return nil }
func (ms *leaderAwareMockServer) GetMetrics() einterfaces.MetricsInterface { return nil }
func (ms *leaderAwareMockServer) IsLeader() bool { return ms.isLeader.Load() }
func (ms *leaderAwareMockServer) Log() *mlog.Logger { return ms.logger }
func (ms *leaderAwareMockServer) AddClusterLeaderChangedListener(listener func()) string {
ms.mux.Lock()
defer ms.mux.Unlock()
id := model.NewId()
ms.listeners[id] = listener
return id
}
func (ms *leaderAwareMockServer) RemoveClusterLeaderChangedListener(id string) {
ms.mux.Lock()
defer ms.mux.Unlock()
delete(ms.listeners, id)
}
// setLeader changes leader status and fires all registered listeners.
func (ms *leaderAwareMockServer) setLeader(leader bool) {
ms.isLeader.Store(leader)
ms.mux.Lock()
listeners := make([]func(), 0, len(ms.listeners))
for _, l := range ms.listeners {
listeners = append(listeners, l)
}
ms.mux.Unlock()
for _, l := range listeners {
l()
}
}
func (ms *leaderAwareMockServer) GetStore() store.Store {
anyQueryFilter := mock.MatchedBy(func(filter model.RemoteClusterQueryFilter) bool {
return true
})
anyId := mock.AnythingOfType("string")
remoteClusterStoreMock := &mocks.RemoteClusterStore{}
remoteClusterStoreMock.On("GetByTopic", "share").Return(ms.remotes, nil)
remoteClusterStoreMock.On("GetAll", 0, 999999, anyQueryFilter).Return(ms.remotes, nil)
remoteClusterStoreMock.On("SetLastPingAt", anyId).Return(nil)
storeMock := &mocks.Store{}
storeMock.On("RemoteCluster").Return(remoteClusterStoreMock)
return storeMock
}
func TestServiceLifecycle(t *testing.T) {
t.Run("Active after Start, inactive after Shutdown", func(t *testing.T) {
mockServer := newLeaderAwareMockServer(t, nil, false)
mockApp := newMockApp(t, nil)
service, err := NewRemoteClusterService(mockServer, mockApp)
require.NoError(t, err)
assert.False(t, service.Active(), "service should not be active before Start")
err = service.Start()
require.NoError(t, err)
assert.True(t, service.Active(), "service should be active after Start")
err = service.Shutdown()
require.NoError(t, err)
assert.False(t, service.Active(), "service should not be active after Shutdown")
})
t.Run("Double Start is idempotent", func(t *testing.T) {
mockServer := newLeaderAwareMockServer(t, nil, false)
mockApp := newMockApp(t, nil)
service, err := NewRemoteClusterService(mockServer, mockApp)
require.NoError(t, err)
err = service.Start()
require.NoError(t, err)
firstDone := service.done
// Second Start should be a no-op
err = service.Start()
require.NoError(t, err)
assert.Equal(t, firstDone, service.done, "second Start should not replace done channel")
require.NoError(t, service.Shutdown())
})
t.Run("Shutdown before Start does not panic", func(t *testing.T) {
mockServer := newLeaderAwareMockServer(t, nil, false)
mockApp := newMockApp(t, nil)
service, err := NewRemoteClusterService(mockServer, mockApp)
require.NoError(t, err)
err = service.Shutdown()
require.NoError(t, err)
})
t.Run("Active on non-leader node", func(t *testing.T) {
mockServer := newLeaderAwareMockServer(t, nil, false)
mockApp := newMockApp(t, nil)
service, err := NewRemoteClusterService(mockServer, mockApp)
require.NoError(t, err)
err = service.Start()
require.NoError(t, err)
defer func() { require.NoError(t, service.Shutdown()) }()
assert.True(t, service.Active(), "service should be active on non-leader node")
})
}
func TestSendLoopLifecycle(t *testing.T) {
t.Run("sendLoop runs on non-leader node", func(t *testing.T) {
var webReqCount atomic.Int32
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
webReqCount.Add(1)
w.WriteHeader(200)
resp := Response{}
b, _ := json.Marshal(&resp)
_, _ = w.Write(b)
}))
defer ts.Close()
remotes := makeRemoteClusters(3, ts.URL, false)
mockServer := newLeaderAwareMockServer(t, remotes, false) // non-leader
mockApp := newMockApp(t, nil)
service, err := NewRemoteClusterService(mockServer, mockApp)
require.NoError(t, err)
service.disablePing = true
err = service.Start()
require.NoError(t, err)
defer func() { require.NoError(t, service.Shutdown()) }()
// Verify pings are NOT running (non-leader)
service.mux.RLock()
pingRunning := service.pingDone != nil
service.mux.RUnlock()
assert.False(t, pingRunning, "pings should not be running on non-leader node")
// Send a message — sendLoop should process it even on non-leader
msg := makeRemoteClusterMsg(model.NewId(), NoteContent)
var callbackCount atomic.Int32
wg := &sync.WaitGroup{}
wg.Add(3)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
err = service.BroadcastMsg(ctx, msg, func(msg model.RemoteClusterMsg, remote *model.RemoteCluster, resp *Response, err error) {
defer wg.Done()
callbackCount.Add(1)
})
require.NoError(t, err)
wg.Wait()
assert.Equal(t, int32(3), callbackCount.Load(), "all callbacks should fire on non-leader node")
assert.Equal(t, int32(3), webReqCount.Load(), "all HTTP requests should be made on non-leader node")
})
t.Run("sendLoop survives leader transitions", func(t *testing.T) {
var webReqCount atomic.Int32
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
webReqCount.Add(1)
w.WriteHeader(200)
resp := Response{}
b, _ := json.Marshal(&resp)
_, _ = w.Write(b)
}))
defer ts.Close()
remotes := makeRemoteClusters(3, ts.URL, false)
mockServer := newLeaderAwareMockServer(t, remotes, true) // start as leader
mockApp := newMockApp(t, nil)
service, err := NewRemoteClusterService(mockServer, mockApp)
require.NoError(t, err)
service.disablePing = true
err = service.Start()
require.NoError(t, err)
defer func() { require.NoError(t, service.Shutdown()) }()
// Lose leadership
mockServer.setLeader(false)
// Send a message — should still work after losing leadership
msg := makeRemoteClusterMsg(model.NewId(), NoteContent)
wg := &sync.WaitGroup{}
wg.Add(3)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
err = service.BroadcastMsg(ctx, msg, func(msg model.RemoteClusterMsg, remote *model.RemoteCluster, resp *Response, err error) {
defer wg.Done()
})
require.NoError(t, err)
wg.Wait()
assert.Equal(t, int32(3), webReqCount.Load(), "sends should work after losing leadership")
})
}
func TestPingLifecycle(t *testing.T) {
t.Run("pings start on leader, stop on non-leader", func(t *testing.T) {
mockServer := newLeaderAwareMockServer(t, nil, false)
mockApp := newMockApp(t, nil)
service, err := NewRemoteClusterService(mockServer, mockApp)
require.NoError(t, err)
err = service.Start()
require.NoError(t, err)
defer func() { require.NoError(t, service.Shutdown()) }()
// Non-leader: pings should not be running
service.mux.RLock()
assert.Nil(t, service.pingDone, "pingDone should be nil on non-leader")
service.mux.RUnlock()
// Become leader: pings should start
mockServer.setLeader(true)
service.mux.RLock()
assert.NotNil(t, service.pingDone, "pingDone should be set after becoming leader")
service.mux.RUnlock()
// Lose leadership: pings should stop
mockServer.setLeader(false)
service.mux.RLock()
assert.Nil(t, service.pingDone, "pingDone should be nil after losing leadership")
service.mux.RUnlock()
})
t.Run("pingStart is idempotent", func(t *testing.T) {
mockServer := newLeaderAwareMockServer(t, nil, true)
mockApp := newMockApp(t, nil)
service, err := NewRemoteClusterService(mockServer, mockApp)
require.NoError(t, err)
err = service.Start()
require.NoError(t, err)
defer func() { require.NoError(t, service.Shutdown()) }()
// pingDone should already be set (started as leader)
service.mux.RLock()
firstPingDone := service.pingDone
service.mux.RUnlock()
require.NotNil(t, firstPingDone)
// Call pingStart again — should be a no-op, same channel
service.pingStart()
service.mux.RLock()
secondPingDone := service.pingDone
service.mux.RUnlock()
assert.Equal(t, firstPingDone, secondPingDone, "pingStart should be idempotent")
})
t.Run("pingStop is idempotent", func(t *testing.T) {
mockServer := newLeaderAwareMockServer(t, nil, false)
mockApp := newMockApp(t, nil)
service, err := NewRemoteClusterService(mockServer, mockApp)
require.NoError(t, err)
err = service.Start()
require.NoError(t, err)
defer func() { require.NoError(t, service.Shutdown()) }()
// Already non-leader, pingDone is nil
service.mux.RLock()
assert.Nil(t, service.pingDone)
service.mux.RUnlock()
// Calling pingStop again should not panic
service.pingStop()
service.mux.RLock()
assert.Nil(t, service.pingDone)
service.mux.RUnlock()
})
t.Run("pings fire on leader with real ping loop", func(t *testing.T) {
var pingCount atomic.Int32
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
pingCount.Add(1)
w.WriteHeader(200)
resp := model.RemoteClusterPing{}
b, _ := json.Marshal(&resp)
_, _ = w.Write(b)
}))
defer ts.Close()
remotes := makeRemoteClusters(2, ts.URL, false)
mockServer := newLeaderAwareMockServer(t, remotes, false) // start as non-leader
mockApp := newMockApp(t, nil)
service, err := NewRemoteClusterService(mockServer, mockApp)
require.NoError(t, err)
service.SetPingFreq(time.Millisecond * 50)
err = service.Start()
require.NoError(t, err)
defer func() { require.NoError(t, service.Shutdown()) }()
// Non-leader: no pings should be sent
assert.Never(t, func() bool {
return pingCount.Load() > 0
}, time.Millisecond*200, time.Millisecond*50, "no pings should fire on non-leader")
// Become leader: pings should start firing
mockServer.setLeader(true)
assert.Eventually(t, func() bool {
return pingCount.Load() >= 2
}, time.Second*5, time.Millisecond*50, "pings should fire after becoming leader")
// Lose leadership: pings should stop
mockServer.setLeader(false)
// Allow in-flight pings to drain, then verify no new pings arrive
assert.Eventually(t, func() bool {
snapshot := pingCount.Load()
time.Sleep(time.Millisecond * 150)
return pingCount.Load() == snapshot
}, time.Second*5, time.Millisecond*50, "no new pings should fire after losing leadership")
})
}