From 540ccc599be98ea130b48ae4b8df6ab90259e44b Mon Sep 17 00:00:00 2001 From: Doug Lauder Date: Tue, 7 Apr 2026 12:49:54 -0400 Subject: [PATCH] MM-68179: Run sendLoop workers on all HA nodes (#35909) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * MM-68179: Run sendLoop workers on all HA nodes In HA clusters, sendLoop worker goroutines only ran on the leader node. When an API request to send a channel invite landed on a non-leader node, SendMsg enqueued the task to a local in-memory channel but no goroutine consumed it, silently losing the message. Fix by starting sendLoop workers in Start() on all nodes, independent of the leader-only ping lifecycle. - Separate sendLoop lifecycle (Start/Shutdown) from ping lifecycle (pingStart/pingStop on leader change) - Rename resume/pause to pingStart/pingStop for clarity - Change Active() to mean "service started" via atomic.Bool - Remove SetActive (no longer needed; tests use Start()) * address review comment * Added idempotency guard to Start() * Start() and Shutdown(): CompareAndSwap instead of Load/Store — eliminates races where concurrent calls could both proceed. Only the winner of the CAS executes; the loser returns nil immediately. Ping test: replaced time.Sleep with assert.Never/assert.Eventually — no more brittle fixed sleeps. Uses assert.Never to verify no pings fire on non-leader, and assert.Eventually to verify pings stop after losing leadership (snapshot-then-compare pattern). * make unit tests parallel capable --------- Co-authored-by: Mattermost Build --- .../api4/shared_channel_metadata_test.go | 4 - .../api4/shared_channel_test_utils.go | 3 - ..._global_user_sync_self_referential_test.go | 5 - ...l_membership_sync_self_referential_test.go | 5 - ...hannel_sync_self_referential_utils_test.go | 3 - .../services/remotecluster/ping_test.go | 2 - .../services/remotecluster/send_test.go | 3 +- .../remotecluster/sendprofileImage_test.go | 5 +- .../services/remotecluster/service.go | 97 ++--- .../services/remotecluster/service_test.go | 379 ++++++++++++++++++ 10 files changed, 425 insertions(+), 81 deletions(-) diff --git a/server/channels/api4/shared_channel_metadata_test.go b/server/channels/api4/shared_channel_metadata_test.go index a1f43aaabd9..e537ebaa993 100644 --- a/server/channels/api4/shared_channel_metadata_test.go +++ b/server/channels/api4/shared_channel_metadata_test.go @@ -13,7 +13,6 @@ import ( "github.com/stretchr/testify/require" "github.com/mattermost/mattermost/server/public/model" - "github.com/mattermost/mattermost/server/v8/platform/services/remotecluster" "github.com/mattermost/mattermost/server/v8/platform/services/sharedchannel" ) @@ -54,9 +53,6 @@ func setupTestEnvironment(t *testing.T) (*TestHelper, *sharedchannel.Service) { rcService := th.App.Srv().GetRemoteClusterService() if rcService != nil { _ = rcService.Start() - if rc, ok := rcService.(*remotecluster.Service); ok { - rc.SetActive(true) - } require.True(t, rcService.Active(), "RemoteClusterService should be active") } diff --git a/server/channels/api4/shared_channel_test_utils.go b/server/channels/api4/shared_channel_test_utils.go index ddd0e41a561..b22b1e31f22 100644 --- a/server/channels/api4/shared_channel_test_utils.go +++ b/server/channels/api4/shared_channel_test_utils.go @@ -235,9 +235,6 @@ func EnsureCleanState(t *testing.T, th *TestHelper, ss store.Store) { rcService := th.App.Srv().GetRemoteClusterService() if rcService != nil { - if rc, ok := rcService.(*remotecluster.Service); ok { - rc.SetActive(true) - } require.Eventually(t, func() bool { return rcService.Active() }, 5*time.Second, 100*time.Millisecond, "Remote cluster service should be active") diff --git a/server/channels/app/shared_channel_global_user_sync_self_referential_test.go b/server/channels/app/shared_channel_global_user_sync_self_referential_test.go index 58b5684dc2e..80f8df595fe 100644 --- a/server/channels/app/shared_channel_global_user_sync_self_referential_test.go +++ b/server/channels/app/shared_channel_global_user_sync_self_referential_test.go @@ -61,11 +61,6 @@ func TestSharedChannelGlobalUserSyncSelfReferential(t *testing.T) { if rcService != nil { _ = rcService.Start() - // Force the service to be active in test environment - if rc, ok := rcService.(*remotecluster.Service); ok { - rc.SetActive(true) - } - // Verify it's active if !rcService.Active() { t.Fatalf("RemoteClusterService is not active after Start") diff --git a/server/channels/app/shared_channel_membership_sync_self_referential_test.go b/server/channels/app/shared_channel_membership_sync_self_referential_test.go index 98c0da0e597..c48ed3e3258 100644 --- a/server/channels/app/shared_channel_membership_sync_self_referential_test.go +++ b/server/channels/app/shared_channel_membership_sync_self_referential_test.go @@ -48,11 +48,6 @@ func TestSharedChannelMembershipSyncSelfReferential(t *testing.T) { if rcService != nil { _ = rcService.Start() - // Force the service to be active in test environment - if rc, ok := rcService.(*remotecluster.Service); ok { - rc.SetActive(true) - } - // Wait for remote cluster service to be active require.Eventually(t, func() bool { return rcService.Active() diff --git a/server/channels/app/shared_channel_sync_self_referential_utils_test.go b/server/channels/app/shared_channel_sync_self_referential_utils_test.go index 1866ab2e4d3..1c678b89e02 100644 --- a/server/channels/app/shared_channel_sync_self_referential_utils_test.go +++ b/server/channels/app/shared_channel_sync_self_referential_utils_test.go @@ -300,9 +300,6 @@ func EnsureCleanState(t *testing.T, th *TestHelper, ss store.Store) { rcService := th.App.Srv().GetRemoteClusterService() if rcService != nil { - if rc, ok := rcService.(*remotecluster.Service); ok { - rc.SetActive(true) - } require.Eventually(t, func() bool { return rcService.Active() }, 5*time.Second, 100*time.Millisecond, "Remote cluster service should be active") diff --git a/server/platform/services/remotecluster/ping_test.go b/server/platform/services/remotecluster/ping_test.go index 4f3d38fc216..42d5bbcdd21 100644 --- a/server/platform/services/remotecluster/ping_test.go +++ b/server/platform/services/remotecluster/ping_test.go @@ -24,8 +24,6 @@ const ( ) func TestPing(t *testing.T) { - disablePing = false - t.Run("No error", func(t *testing.T) { merr := merror.New() diff --git a/server/platform/services/remotecluster/send_test.go b/server/platform/services/remotecluster/send_test.go index 63b4088f5fb..a1a934f024b 100644 --- a/server/platform/services/remotecluster/send_test.go +++ b/server/platform/services/remotecluster/send_test.go @@ -34,7 +34,6 @@ type testPayload struct { func TestBroadcastMsg(t *testing.T) { msgId := model.NewId() - disablePing = true t.Run("No error", func(t *testing.T) { var countCallbacks atomic.Int32 @@ -87,6 +86,7 @@ func TestBroadcastMsg(t *testing.T) { service, err := NewRemoteClusterService(mockServer, mockApp) require.NoError(t, err) + service.disablePing = true err = service.Start() require.NoError(t, err) @@ -144,6 +144,7 @@ func TestBroadcastMsg(t *testing.T) { service, err := NewRemoteClusterService(mockServer, mockApp) require.NoError(t, err) + service.disablePing = true err = service.Start() require.NoError(t, err) diff --git a/server/platform/services/remotecluster/sendprofileImage_test.go b/server/platform/services/remotecluster/sendprofileImage_test.go index 7fa7003b176..95cf6ff193e 100644 --- a/server/platform/services/remotecluster/sendprofileImage_test.go +++ b/server/platform/services/remotecluster/sendprofileImage_test.go @@ -27,10 +27,6 @@ const ( ) func TestService_sendProfileImageToRemote(t *testing.T) { - hadPing := disablePing - disablePing = true - defer func() { disablePing = hadPing }() - shouldError := &flag{} ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -107,6 +103,7 @@ func TestService_sendProfileImageToRemote(t *testing.T) { service, err := NewRemoteClusterService(mockServer, mockApp) require.NoError(t, err) + service.disablePing = true err = service.Start() require.NoError(t, err) diff --git a/server/platform/services/remotecluster/service.go b/server/platform/services/remotecluster/service.go index e21f2f021ed..265ec0f1b89 100644 --- a/server/platform/services/remotecluster/service.go +++ b/server/platform/services/remotecluster/service.go @@ -8,6 +8,7 @@ import ( "net" "net/http" "sync" + "sync/atomic" "time" "github.com/mattermost/mattermost/server/public/model" @@ -36,10 +37,6 @@ const ( InviteExpiresAfter = time.Hour * 48 ) -var ( - disablePing bool // override for testing -) - type ServerIface interface { Config() *model.Config IsLeader() bool @@ -83,18 +80,20 @@ type ConnectionStateListener func(rc *model.RemoteCluster, online bool) // Service provides inter-cluster communication via topic based messages. In product these are called "Secured Connections". type Service struct { - server ServerIface - app AppIface - httpClient *http.Client - send []chan any + server ServerIface + app AppIface + httpClient *http.Client + send []chan any + done chan struct{} // signals sendLoop workers to stop; lifecycle: Start -> Shutdown + active atomic.Bool // true after Start(), false after Shutdown() + disablePing bool // when true, pingStart skips launching the ping loop; for testing // everything below guarded by `mux` mux sync.RWMutex - active bool leaderListenerId string topicListeners map[string]map[string]TopicListener // maps topic id to a map of listenerid->listener connectionStateListeners map[string]ConnectionStateListener // maps listener id to listener - done chan struct{} + pingDone chan struct{} // signals ping workers to stop; lifecycle: pingStart -> pingStop pingFreq time.Duration } @@ -140,6 +139,14 @@ func NewRemoteClusterService(server ServerIface, app AppIface) (*Service, error) // Start is called by the server on server start-up. func (rcs *Service) Start() error { + if !rcs.active.CompareAndSwap(false, true) { + return nil + } + rcs.done = make(chan struct{}) + for i := range rcs.send { + go rcs.sendLoop(i, rcs.done) + } + rcs.mux.Lock() rcs.leaderListenerId = rcs.server.AddClusterLeaderChangedListener(rcs.onClusterLeaderChange) rcs.mux.Unlock() @@ -151,17 +158,22 @@ func (rcs *Service) Start() error { // Shutdown is called by the server on server shutdown. func (rcs *Service) Shutdown() error { - rcs.server.RemoveClusterLeaderChangedListener(rcs.leaderListenerId) - rcs.pause() + if !rcs.active.CompareAndSwap(true, false) { + return nil + } + rcs.mux.Lock() + id := rcs.leaderListenerId + rcs.mux.Unlock() + + rcs.server.RemoveClusterLeaderChangedListener(id) + rcs.pingStop() + close(rcs.done) return nil } -// Active returns true if this instance of the remote cluster service is active. -// The active instance is responsible for pinging and sending messages to remotes. +// Active returns true if this instance of the remote cluster service has been started. func (rcs *Service) Active() bool { - rcs.mux.Lock() - defer rcs.mux.Unlock() - return rcs.active + return rcs.active.Load() } // GetPingFreq gets the frequency of pings to each remote. @@ -244,64 +256,41 @@ func (rcs *Service) RemoveConnectionStateListener(listenerId string) { // onClusterLeaderChange is called whenever the cluster leader may have changed. func (rcs *Service) onClusterLeaderChange() { if rcs.server.IsLeader() { - rcs.resume() + rcs.pingStart() } else { - rcs.pause() + rcs.pingStop() } } -func (rcs *Service) resume() { +func (rcs *Service) pingStart() { rcs.mux.Lock() defer rcs.mux.Unlock() - if rcs.active { - return // already active + if rcs.pingDone != nil { + return } - rcs.active = true - rcs.done = make(chan struct{}) + rcs.pingDone = make(chan struct{}) - if !disablePing { + if !rcs.disablePing { // first ping all the plugin remotes immediately, synchronously. rcs.pingAllNow(model.RemoteClusterQueryFilter{OnlyPlugins: true}) // start the async ping loop - rcs.pingLoop(rcs.done) + rcs.pingLoop(rcs.pingDone) } - // create thread pool for concurrent message sending. - for i := range rcs.send { - go rcs.sendLoop(i, rcs.done) - } - - rcs.server.Log().Debug("Remote Cluster Service active") + rcs.server.Log().Debug("Remote Cluster Service ping active") } -func (rcs *Service) pause() { +func (rcs *Service) pingStop() { rcs.mux.Lock() defer rcs.mux.Unlock() - if !rcs.active { - return // already inactive - } - rcs.active = false - close(rcs.done) - rcs.done = nil - - rcs.server.Log().Debug("Remote Cluster Service inactive") -} - -// SetActive forces the service to be active or inactive for testing -func (rcs *Service) SetActive(active bool) { - rcs.mux.Lock() - defer rcs.mux.Unlock() - - if rcs.active == active { + if rcs.pingDone == nil { return } + close(rcs.pingDone) + rcs.pingDone = nil - if active { - rcs.resume() - } else { - rcs.pause() - } + rcs.server.Log().Debug("Remote Cluster Service ping inactive") } diff --git a/server/platform/services/remotecluster/service_test.go b/server/platform/services/remotecluster/service_test.go index 4491c26b022..7b03e9b7b40 100644 --- a/server/platform/services/remotecluster/service_test.go +++ b/server/platform/services/remotecluster/service_test.go @@ -4,13 +4,24 @@ package remotecluster import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "sync" "sync/atomic" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/mattermost/mattermost/server/public/model" + "github.com/mattermost/mattermost/server/public/plugin/plugintest/mock" + "github.com/mattermost/mattermost/server/public/shared/mlog" + "github.com/mattermost/mattermost/server/v8/channels/store" + "github.com/mattermost/mattermost/server/v8/channels/store/storetest/mocks" + "github.com/mattermost/mattermost/server/v8/einterfaces" ) func TestService_AddTopicListener(t *testing.T) { @@ -71,3 +82,371 @@ func TestService_AddTopicListener(t *testing.T) { listeners = service.getTopicListeners("test") assert.Empty(t, listeners) } + +// leaderAwareMockServer is a mock server that supports toggling leader state +// and firing leader-change listeners, allowing lifecycle tests to simulate +// HA cluster leader transitions. +type leaderAwareMockServer struct { + remotes []*model.RemoteCluster + logger *mlog.Logger + isLeader atomic.Bool + + mux sync.Mutex + listeners map[string]func() +} + +func newLeaderAwareMockServer(t *testing.T, remotes []*model.RemoteCluster, leader bool) *leaderAwareMockServer { + ms := &leaderAwareMockServer{ + remotes: remotes, + logger: mlog.CreateConsoleTestLogger(t), + listeners: make(map[string]func()), + } + ms.isLeader.Store(leader) + return ms +} + +func (ms *leaderAwareMockServer) Config() *model.Config { return nil } +func (ms *leaderAwareMockServer) GetMetrics() einterfaces.MetricsInterface { return nil } +func (ms *leaderAwareMockServer) IsLeader() bool { return ms.isLeader.Load() } +func (ms *leaderAwareMockServer) Log() *mlog.Logger { return ms.logger } + +func (ms *leaderAwareMockServer) AddClusterLeaderChangedListener(listener func()) string { + ms.mux.Lock() + defer ms.mux.Unlock() + id := model.NewId() + ms.listeners[id] = listener + return id +} + +func (ms *leaderAwareMockServer) RemoveClusterLeaderChangedListener(id string) { + ms.mux.Lock() + defer ms.mux.Unlock() + delete(ms.listeners, id) +} + +// setLeader changes leader status and fires all registered listeners. +func (ms *leaderAwareMockServer) setLeader(leader bool) { + ms.isLeader.Store(leader) + ms.mux.Lock() + listeners := make([]func(), 0, len(ms.listeners)) + for _, l := range ms.listeners { + listeners = append(listeners, l) + } + ms.mux.Unlock() + for _, l := range listeners { + l() + } +} + +func (ms *leaderAwareMockServer) GetStore() store.Store { + anyQueryFilter := mock.MatchedBy(func(filter model.RemoteClusterQueryFilter) bool { + return true + }) + anyId := mock.AnythingOfType("string") + + remoteClusterStoreMock := &mocks.RemoteClusterStore{} + remoteClusterStoreMock.On("GetByTopic", "share").Return(ms.remotes, nil) + remoteClusterStoreMock.On("GetAll", 0, 999999, anyQueryFilter).Return(ms.remotes, nil) + remoteClusterStoreMock.On("SetLastPingAt", anyId).Return(nil) + + storeMock := &mocks.Store{} + storeMock.On("RemoteCluster").Return(remoteClusterStoreMock) + return storeMock +} + +func TestServiceLifecycle(t *testing.T) { + t.Run("Active after Start, inactive after Shutdown", func(t *testing.T) { + mockServer := newLeaderAwareMockServer(t, nil, false) + mockApp := newMockApp(t, nil) + + service, err := NewRemoteClusterService(mockServer, mockApp) + require.NoError(t, err) + + assert.False(t, service.Active(), "service should not be active before Start") + + err = service.Start() + require.NoError(t, err) + + assert.True(t, service.Active(), "service should be active after Start") + + err = service.Shutdown() + require.NoError(t, err) + + assert.False(t, service.Active(), "service should not be active after Shutdown") + }) + + t.Run("Double Start is idempotent", func(t *testing.T) { + mockServer := newLeaderAwareMockServer(t, nil, false) + mockApp := newMockApp(t, nil) + + service, err := NewRemoteClusterService(mockServer, mockApp) + require.NoError(t, err) + + err = service.Start() + require.NoError(t, err) + + firstDone := service.done + + // Second Start should be a no-op + err = service.Start() + require.NoError(t, err) + + assert.Equal(t, firstDone, service.done, "second Start should not replace done channel") + + require.NoError(t, service.Shutdown()) + }) + + t.Run("Shutdown before Start does not panic", func(t *testing.T) { + mockServer := newLeaderAwareMockServer(t, nil, false) + mockApp := newMockApp(t, nil) + + service, err := NewRemoteClusterService(mockServer, mockApp) + require.NoError(t, err) + + err = service.Shutdown() + require.NoError(t, err) + }) + + t.Run("Active on non-leader node", func(t *testing.T) { + mockServer := newLeaderAwareMockServer(t, nil, false) + mockApp := newMockApp(t, nil) + + service, err := NewRemoteClusterService(mockServer, mockApp) + require.NoError(t, err) + + err = service.Start() + require.NoError(t, err) + defer func() { require.NoError(t, service.Shutdown()) }() + + assert.True(t, service.Active(), "service should be active on non-leader node") + }) +} + +func TestSendLoopLifecycle(t *testing.T) { + t.Run("sendLoop runs on non-leader node", func(t *testing.T) { + var webReqCount atomic.Int32 + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + webReqCount.Add(1) + w.WriteHeader(200) + resp := Response{} + b, _ := json.Marshal(&resp) + _, _ = w.Write(b) + })) + defer ts.Close() + + remotes := makeRemoteClusters(3, ts.URL, false) + mockServer := newLeaderAwareMockServer(t, remotes, false) // non-leader + mockApp := newMockApp(t, nil) + + service, err := NewRemoteClusterService(mockServer, mockApp) + require.NoError(t, err) + service.disablePing = true + + err = service.Start() + require.NoError(t, err) + defer func() { require.NoError(t, service.Shutdown()) }() + + // Verify pings are NOT running (non-leader) + service.mux.RLock() + pingRunning := service.pingDone != nil + service.mux.RUnlock() + assert.False(t, pingRunning, "pings should not be running on non-leader node") + + // Send a message — sendLoop should process it even on non-leader + msg := makeRemoteClusterMsg(model.NewId(), NoteContent) + var callbackCount atomic.Int32 + wg := &sync.WaitGroup{} + wg.Add(3) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + err = service.BroadcastMsg(ctx, msg, func(msg model.RemoteClusterMsg, remote *model.RemoteCluster, resp *Response, err error) { + defer wg.Done() + callbackCount.Add(1) + }) + require.NoError(t, err) + + wg.Wait() + + assert.Equal(t, int32(3), callbackCount.Load(), "all callbacks should fire on non-leader node") + assert.Equal(t, int32(3), webReqCount.Load(), "all HTTP requests should be made on non-leader node") + }) + + t.Run("sendLoop survives leader transitions", func(t *testing.T) { + var webReqCount atomic.Int32 + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + webReqCount.Add(1) + w.WriteHeader(200) + resp := Response{} + b, _ := json.Marshal(&resp) + _, _ = w.Write(b) + })) + defer ts.Close() + + remotes := makeRemoteClusters(3, ts.URL, false) + mockServer := newLeaderAwareMockServer(t, remotes, true) // start as leader + mockApp := newMockApp(t, nil) + + service, err := NewRemoteClusterService(mockServer, mockApp) + require.NoError(t, err) + service.disablePing = true + + err = service.Start() + require.NoError(t, err) + defer func() { require.NoError(t, service.Shutdown()) }() + + // Lose leadership + mockServer.setLeader(false) + + // Send a message — should still work after losing leadership + msg := makeRemoteClusterMsg(model.NewId(), NoteContent) + wg := &sync.WaitGroup{} + wg.Add(3) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + err = service.BroadcastMsg(ctx, msg, func(msg model.RemoteClusterMsg, remote *model.RemoteCluster, resp *Response, err error) { + defer wg.Done() + }) + require.NoError(t, err) + + wg.Wait() + + assert.Equal(t, int32(3), webReqCount.Load(), "sends should work after losing leadership") + }) +} + +func TestPingLifecycle(t *testing.T) { + t.Run("pings start on leader, stop on non-leader", func(t *testing.T) { + mockServer := newLeaderAwareMockServer(t, nil, false) + mockApp := newMockApp(t, nil) + + service, err := NewRemoteClusterService(mockServer, mockApp) + require.NoError(t, err) + + err = service.Start() + require.NoError(t, err) + defer func() { require.NoError(t, service.Shutdown()) }() + + // Non-leader: pings should not be running + service.mux.RLock() + assert.Nil(t, service.pingDone, "pingDone should be nil on non-leader") + service.mux.RUnlock() + + // Become leader: pings should start + mockServer.setLeader(true) + + service.mux.RLock() + assert.NotNil(t, service.pingDone, "pingDone should be set after becoming leader") + service.mux.RUnlock() + + // Lose leadership: pings should stop + mockServer.setLeader(false) + + service.mux.RLock() + assert.Nil(t, service.pingDone, "pingDone should be nil after losing leadership") + service.mux.RUnlock() + }) + + t.Run("pingStart is idempotent", func(t *testing.T) { + mockServer := newLeaderAwareMockServer(t, nil, true) + mockApp := newMockApp(t, nil) + + service, err := NewRemoteClusterService(mockServer, mockApp) + require.NoError(t, err) + + err = service.Start() + require.NoError(t, err) + defer func() { require.NoError(t, service.Shutdown()) }() + + // pingDone should already be set (started as leader) + service.mux.RLock() + firstPingDone := service.pingDone + service.mux.RUnlock() + require.NotNil(t, firstPingDone) + + // Call pingStart again — should be a no-op, same channel + service.pingStart() + + service.mux.RLock() + secondPingDone := service.pingDone + service.mux.RUnlock() + + assert.Equal(t, firstPingDone, secondPingDone, "pingStart should be idempotent") + }) + + t.Run("pingStop is idempotent", func(t *testing.T) { + mockServer := newLeaderAwareMockServer(t, nil, false) + mockApp := newMockApp(t, nil) + + service, err := NewRemoteClusterService(mockServer, mockApp) + require.NoError(t, err) + + err = service.Start() + require.NoError(t, err) + defer func() { require.NoError(t, service.Shutdown()) }() + + // Already non-leader, pingDone is nil + service.mux.RLock() + assert.Nil(t, service.pingDone) + service.mux.RUnlock() + + // Calling pingStop again should not panic + service.pingStop() + + service.mux.RLock() + assert.Nil(t, service.pingDone) + service.mux.RUnlock() + }) + + t.Run("pings fire on leader with real ping loop", func(t *testing.T) { + var pingCount atomic.Int32 + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + pingCount.Add(1) + w.WriteHeader(200) + resp := model.RemoteClusterPing{} + b, _ := json.Marshal(&resp) + _, _ = w.Write(b) + })) + defer ts.Close() + + remotes := makeRemoteClusters(2, ts.URL, false) + mockServer := newLeaderAwareMockServer(t, remotes, false) // start as non-leader + mockApp := newMockApp(t, nil) + + service, err := NewRemoteClusterService(mockServer, mockApp) + require.NoError(t, err) + service.SetPingFreq(time.Millisecond * 50) + + err = service.Start() + require.NoError(t, err) + defer func() { require.NoError(t, service.Shutdown()) }() + + // Non-leader: no pings should be sent + assert.Never(t, func() bool { + return pingCount.Load() > 0 + }, time.Millisecond*200, time.Millisecond*50, "no pings should fire on non-leader") + + // Become leader: pings should start firing + mockServer.setLeader(true) + + assert.Eventually(t, func() bool { + return pingCount.Load() >= 2 + }, time.Second*5, time.Millisecond*50, "pings should fire after becoming leader") + + // Lose leadership: pings should stop + mockServer.setLeader(false) + + // Allow in-flight pings to drain, then verify no new pings arrive + assert.Eventually(t, func() bool { + snapshot := pingCount.Load() + time.Sleep(time.Millisecond * 150) + return pingCount.Load() == snapshot + }, time.Second*5, time.Millisecond*50, "no new pings should fire after losing leadership") + }) +}