2019-11-29 06:59:40 -05:00
|
|
|
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
|
|
|
|
// See LICENSE.txt for license information.
|
|
|
|
|
|
2022-10-06 04:04:21 -04:00
|
|
|
package platform
|
2017-11-13 14:59:51 -05:00
|
|
|
|
|
|
|
|
import (
|
2023-11-08 16:17:07 -05:00
|
|
|
"bytes"
|
2023-01-30 11:10:39 -05:00
|
|
|
"encoding/json"
|
2025-02-13 22:49:36 -05:00
|
|
|
"fmt"
|
2025-05-11 02:30:12 -04:00
|
|
|
"iter"
|
2017-11-13 14:59:51 -05:00
|
|
|
"net"
|
|
|
|
|
"net/http"
|
|
|
|
|
"net/http/httptest"
|
2023-03-24 01:52:37 -04:00
|
|
|
"runtime"
|
2025-05-11 02:30:12 -04:00
|
|
|
"slices"
|
2017-11-13 14:59:51 -05:00
|
|
|
"testing"
|
2018-11-22 04:53:44 -05:00
|
|
|
"time"
|
2017-11-13 14:59:51 -05:00
|
|
|
|
2021-03-24 05:59:23 -04:00
|
|
|
"github.com/gorilla/websocket"
|
2020-05-19 10:25:52 -04:00
|
|
|
"github.com/stretchr/testify/assert"
|
2025-09-12 02:35:06 -04:00
|
|
|
"github.com/stretchr/testify/mock"
|
2017-11-13 14:59:51 -05:00
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
|
|
2023-06-11 01:24:35 -04:00
|
|
|
"github.com/mattermost/mattermost/server/public/model"
|
|
|
|
|
"github.com/mattermost/mattermost/server/public/shared/i18n"
|
|
|
|
|
platform_mocks "github.com/mattermost/mattermost/server/v8/channels/app/platform/mocks"
|
|
|
|
|
"github.com/mattermost/mattermost/server/v8/channels/testlib"
|
2017-11-13 14:59:51 -05:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
func dummyWebsocketHandler(t *testing.T) http.HandlerFunc {
|
|
|
|
|
return func(w http.ResponseWriter, req *http.Request) {
|
2021-03-24 05:59:23 -04:00
|
|
|
upgrader := &websocket.Upgrader{
|
|
|
|
|
ReadBufferSize: 1024,
|
|
|
|
|
WriteBufferSize: 1024,
|
2017-11-13 14:59:51 -05:00
|
|
|
}
|
2021-03-24 05:59:23 -04:00
|
|
|
conn, err := upgrader.Upgrade(w, req, nil)
|
2017-11-13 14:59:51 -05:00
|
|
|
for err == nil {
|
2021-03-24 05:59:23 -04:00
|
|
|
_, _, err = conn.ReadMessage()
|
2017-11-13 14:59:51 -05:00
|
|
|
}
|
2021-03-24 05:59:23 -04:00
|
|
|
if _, ok := err.(*websocket.CloseError); !ok {
|
|
|
|
|
require.NoError(t, err)
|
2017-11-13 14:59:51 -05:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2022-10-06 04:04:21 -04:00
|
|
|
func registerDummyWebConn(t *testing.T, th *TestHelper, addr net.Addr, session *model.Session) *WebConn {
|
2021-03-24 05:59:23 -04:00
|
|
|
d := websocket.Dialer{}
|
|
|
|
|
c, _, err := d.Dial("ws://"+addr.String()+"/ws", nil)
|
2017-11-13 14:59:51 -05:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
MM-32950: Reliable WebSockets: Basic single server (#17406)
* MM-32950: Reliable WebSockets: Basic single server
This PR adds reliable websocket support for a single server.
Below is a brief overview of the three states of a connection:
Normal:
- All messages are routed via web hub.
- Each web conn has a send queue to which it gets pushed.
- A message gets pulled from the queue, and before it
gets written to the wire, it is added to the dead queue.
Disconnect:
- Hub Unregister gets called, where the connection is just
marked as inactive. And new messages keep getting pushed
to the send queue.
If it gets full, the channel is closed and the conn gets removed
from conn index.
Reconnect:
- We query the hub for the connection ID, and get back the
queues.
- We construct a WebConn reusing the old queues, or a fresh one
depending on whether the connection ID was found or not.
- Now there is a tricky bit here which needs to be carefully processed.
On register, we would always send the hello message in the send queue.
But we cannot do that now because the send queue might already have messages.
Therefore, we don't send the hello message from web hub, if we reuse a connection.
Instead, we move that logic to the web conn write pump. We check if
the sequence number is in dead queue, and if it is, then we drain
the dead queue, and start consuming from the active queue.
No hello message is sent here.
But if the message does not exist in the dead queue, and the sequence number
is actually something that should have existed, then we set
a new connction id and clear the dead queue, and send a hello message.
The client, on receiving a new connection id will automatically
set its sequence number to 0, and make the sync API calls to manage
any lost data.
https://mattermost.atlassian.net/browse/MM-32590
```release-note
NONE
```
* gofmt
* Add EnableReliableWebSockets to the client config
* Refactoring isInDeadQueue
* Passing index to drainDeadQueue
* refactoring webconn
* fix pointer
* review comments
* simplify hasMsgLoss
* safety comment
* fix test
* Trigger CI
* Trigger CI
Co-authored-by: Devin Binnie <devin.binnie@mattermost.com>
Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
2021-04-26 10:21:25 -04:00
|
|
|
cfg := &WebConnConfig{
|
|
|
|
|
WebSocket: c,
|
|
|
|
|
Session: *session,
|
|
|
|
|
TFunc: i18n.IdentityTfunc(),
|
|
|
|
|
Locale: "en",
|
|
|
|
|
}
|
2022-12-05 14:16:35 -05:00
|
|
|
wc := th.Service.NewWebConn(cfg, th.Suite, &hookRunner{})
|
2024-11-07 23:27:54 -05:00
|
|
|
require.NoError(t, th.Service.HubRegister(wc))
|
2017-11-13 14:59:51 -05:00
|
|
|
go wc.Pump()
|
|
|
|
|
return wc
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestHubStopWithMultipleConnections(t *testing.T) {
|
2025-05-30 07:58:26 -04:00
|
|
|
mainHelper.Parallel(t)
|
2025-11-12 07:00:51 -05:00
|
|
|
th := Setup(t).InitBasic(t)
|
2017-11-13 14:59:51 -05:00
|
|
|
|
2019-10-28 05:57:21 -04:00
|
|
|
s := httptest.NewServer(dummyWebsocketHandler(t))
|
2017-11-13 14:59:51 -05:00
|
|
|
defer s.Close()
|
|
|
|
|
|
2023-10-11 07:08:55 -04:00
|
|
|
session, err := th.Service.CreateSession(th.Context, &model.Session{
|
2022-10-06 04:04:21 -04:00
|
|
|
UserId: th.BasicUser.Id,
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
wc1 := registerDummyWebConn(t, th, s.Listener.Addr(), session)
|
|
|
|
|
wc2 := registerDummyWebConn(t, th, s.Listener.Addr(), session)
|
|
|
|
|
wc3 := registerDummyWebConn(t, th, s.Listener.Addr(), session)
|
2018-08-07 13:52:51 -04:00
|
|
|
defer wc1.Close()
|
|
|
|
|
defer wc2.Close()
|
|
|
|
|
defer wc3.Close()
|
2017-11-13 14:59:51 -05:00
|
|
|
}
|
2018-11-22 04:53:44 -05:00
|
|
|
|
|
|
|
|
// TestHubStopRaceCondition verifies that attempts to use the hub after it has shutdown does not
|
|
|
|
|
// block the caller indefinitely.
|
|
|
|
|
func TestHubStopRaceCondition(t *testing.T) {
|
2025-05-30 07:58:26 -04:00
|
|
|
mainHelper.Parallel(t)
|
2025-11-12 07:00:51 -05:00
|
|
|
th := Setup(t).InitBasic(t)
|
|
|
|
|
|
2019-10-28 05:57:21 -04:00
|
|
|
s := httptest.NewServer(dummyWebsocketHandler(t))
|
2018-11-22 04:53:44 -05:00
|
|
|
|
2023-10-11 07:08:55 -04:00
|
|
|
session, err := th.Service.CreateSession(th.Context, &model.Session{
|
2022-10-06 04:04:21 -04:00
|
|
|
UserId: th.BasicUser.Id,
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
wc1 := registerDummyWebConn(t, th, s.Listener.Addr(), session)
|
2021-03-24 05:59:23 -04:00
|
|
|
defer wc1.Close()
|
2018-11-22 04:53:44 -05:00
|
|
|
|
2022-10-06 04:04:21 -04:00
|
|
|
hub := th.Service.hubs[0]
|
2025-11-12 07:00:51 -05:00
|
|
|
th.Shutdown(t)
|
2018-11-22 04:53:44 -05:00
|
|
|
|
|
|
|
|
done := make(chan bool)
|
|
|
|
|
go func() {
|
2022-10-06 04:04:21 -04:00
|
|
|
wc4 := registerDummyWebConn(t, th, s.Listener.Addr(), session)
|
|
|
|
|
wc5 := registerDummyWebConn(t, th, s.Listener.Addr(), session)
|
2024-11-07 23:27:54 -05:00
|
|
|
require.NoError(t, hub.Register(wc4))
|
|
|
|
|
require.NoError(t, hub.Register(wc5))
|
2018-11-22 04:53:44 -05:00
|
|
|
|
|
|
|
|
hub.UpdateActivity("userId", "sessionToken", 0)
|
|
|
|
|
|
2020-04-23 03:46:18 -04:00
|
|
|
for i := 0; i <= broadcastQueueSize; i++ {
|
2022-09-02 06:17:22 -04:00
|
|
|
hub.Broadcast(model.NewWebSocketEvent("", "", "", "", nil, ""))
|
2018-11-22 04:53:44 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
hub.InvalidateUser("userId")
|
|
|
|
|
hub.Unregister(wc4)
|
|
|
|
|
hub.Unregister(wc5)
|
|
|
|
|
close(done)
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case <-done:
|
|
|
|
|
case <-time.After(15 * time.Second):
|
2019-09-24 10:44:01 -04:00
|
|
|
require.FailNow(t, "hub call did not return within 15 seconds after stop")
|
2018-11-22 04:53:44 -05:00
|
|
|
}
|
|
|
|
|
}
|
2020-05-19 10:25:52 -04:00
|
|
|
|
2020-08-19 13:57:48 -04:00
|
|
|
func TestHubSessionRevokeRace(t *testing.T) {
|
2025-05-30 07:58:26 -04:00
|
|
|
mainHelper.Parallel(t)
|
2024-11-07 23:27:54 -05:00
|
|
|
th := Setup(t)
|
2020-08-19 13:57:48 -04:00
|
|
|
|
|
|
|
|
// This needs to be false for the condition to trigger
|
2022-10-06 04:04:21 -04:00
|
|
|
th.Service.UpdateConfig(func(cfg *model.Config) {
|
2020-08-19 13:57:48 -04:00
|
|
|
*cfg.ServiceSettings.ExtendSessionLengthWithActivity = false
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
s := httptest.NewServer(dummyWebsocketHandler(t))
|
|
|
|
|
defer s.Close()
|
|
|
|
|
|
2023-10-11 07:08:55 -04:00
|
|
|
session, err := th.Service.CreateSession(th.Context, &model.Session{
|
2024-11-07 23:27:54 -05:00
|
|
|
UserId: model.NewId(),
|
2022-10-06 04:04:21 -04:00
|
|
|
})
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
wc1 := registerDummyWebConn(t, th, s.Listener.Addr(), session)
|
|
|
|
|
hub := th.Service.GetHubForUserId(wc1.UserId)
|
2020-08-19 13:57:48 -04:00
|
|
|
|
|
|
|
|
done := make(chan bool)
|
|
|
|
|
|
2024-04-05 09:58:49 -04:00
|
|
|
time.Sleep(2 * time.Second)
|
2020-08-19 13:57:48 -04:00
|
|
|
// We override the LastActivityAt which happens in NewWebConn.
|
|
|
|
|
// This is needed to call RevokeSessionById which triggers the race.
|
2025-05-06 04:33:35 -04:00
|
|
|
|
2025-05-05 12:36:25 -04:00
|
|
|
err = th.Service.AddSessionToCache(session)
|
|
|
|
|
require.NoError(t, err)
|
2020-08-19 13:57:48 -04:00
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
|
for i := 0; i <= broadcastQueueSize; i++ {
|
2022-09-02 06:17:22 -04:00
|
|
|
hub.Broadcast(model.NewWebSocketEvent("", "teamID", "", "", nil, ""))
|
2020-08-19 13:57:48 -04:00
|
|
|
}
|
|
|
|
|
close(done)
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// This call should happen _after_ !wc.IsAuthenticated() and _before_wc.isMemberOfTeam().
|
2024-11-07 23:27:54 -05:00
|
|
|
// There's no guarantee this will happen. But that's our best bet to trigger this race.
|
2020-08-19 13:57:48 -04:00
|
|
|
wc1.InvalidateCache()
|
|
|
|
|
|
2025-07-18 06:54:51 -04:00
|
|
|
for range 10 {
|
2020-08-19 13:57:48 -04:00
|
|
|
// If broadcast buffer has not emptied,
|
|
|
|
|
// we sleep for a second and check again
|
|
|
|
|
if len(hub.broadcast) > 0 {
|
|
|
|
|
time.Sleep(time.Second)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if len(hub.broadcast) > 0 {
|
|
|
|
|
require.Fail(t, "hub is deadlocked")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2020-05-22 01:09:21 -04:00
|
|
|
func TestHubConnIndex(t *testing.T) {
|
2025-05-30 07:58:26 -04:00
|
|
|
mainHelper.Parallel(t)
|
2025-11-12 07:00:51 -05:00
|
|
|
th := Setup(t).InitBasic(t)
|
2020-05-22 01:09:21 -04:00
|
|
|
|
2024-11-07 23:27:54 -05:00
|
|
|
_, err := th.Service.Store.Channel().SaveMember(th.Context, &model.ChannelMember{
|
|
|
|
|
ChannelId: th.BasicChannel.Id,
|
|
|
|
|
UserId: th.BasicUser.Id,
|
|
|
|
|
NotifyProps: model.GetDefaultChannelNotifyProps(),
|
|
|
|
|
SchemeGuest: th.BasicUser.IsGuest(),
|
|
|
|
|
SchemeUser: !th.BasicUser.IsGuest(),
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
_, err = th.Service.Store.Channel().SaveMember(th.Context, &model.ChannelMember{
|
|
|
|
|
ChannelId: th.BasicChannel.Id,
|
|
|
|
|
UserId: th.BasicUser2.Id,
|
|
|
|
|
NotifyProps: model.GetDefaultChannelNotifyProps(),
|
|
|
|
|
SchemeGuest: th.BasicUser2.IsGuest(),
|
|
|
|
|
SchemeUser: !th.BasicUser2.IsGuest(),
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, err)
|
2020-05-22 01:09:21 -04:00
|
|
|
|
2025-02-13 22:49:36 -05:00
|
|
|
for _, fastIterate := range []bool{true, false} {
|
|
|
|
|
t.Run(fmt.Sprintf("fastIterate=%t", fastIterate), func(t *testing.T) {
|
|
|
|
|
t.Run("Basic", func(t *testing.T) {
|
|
|
|
|
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, fastIterate)
|
|
|
|
|
|
|
|
|
|
// User1
|
|
|
|
|
wc1 := &WebConn{
|
|
|
|
|
Platform: th.Service,
|
|
|
|
|
Suite: th.Suite,
|
|
|
|
|
UserId: model.NewId(),
|
|
|
|
|
}
|
|
|
|
|
wc1.SetConnectionID(model.NewId())
|
|
|
|
|
wc1.SetSession(&model.Session{})
|
|
|
|
|
|
|
|
|
|
// User2
|
|
|
|
|
wc2 := &WebConn{
|
|
|
|
|
Platform: th.Service,
|
|
|
|
|
Suite: th.Suite,
|
|
|
|
|
UserId: model.NewId(),
|
|
|
|
|
}
|
|
|
|
|
wc2.SetConnectionID(model.NewId())
|
|
|
|
|
wc2.SetSession(&model.Session{})
|
|
|
|
|
|
|
|
|
|
wc3 := &WebConn{
|
|
|
|
|
Platform: th.Service,
|
|
|
|
|
Suite: th.Suite,
|
|
|
|
|
UserId: wc2.UserId,
|
|
|
|
|
}
|
|
|
|
|
wc3.SetConnectionID(model.NewId())
|
|
|
|
|
wc3.SetSession(&model.Session{})
|
|
|
|
|
|
|
|
|
|
wc4 := &WebConn{
|
|
|
|
|
Platform: th.Service,
|
|
|
|
|
Suite: th.Suite,
|
|
|
|
|
UserId: wc2.UserId,
|
|
|
|
|
}
|
|
|
|
|
wc4.SetConnectionID(model.NewId())
|
|
|
|
|
wc4.SetSession(&model.Session{})
|
|
|
|
|
|
2025-04-29 04:23:53 -04:00
|
|
|
errAdd := connIndex.Add(wc1)
|
|
|
|
|
require.NoError(t, errAdd)
|
|
|
|
|
err = connIndex.Add(wc2)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
err = connIndex.Add(wc3)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
err = connIndex.Add(wc4)
|
|
|
|
|
require.NoError(t, err)
|
2025-02-13 22:49:36 -05:00
|
|
|
|
|
|
|
|
t.Run("Basic", func(t *testing.T) {
|
|
|
|
|
assert.True(t, connIndex.Has(wc1))
|
|
|
|
|
assert.True(t, connIndex.Has(wc2))
|
|
|
|
|
|
2025-05-11 02:30:12 -04:00
|
|
|
assert.ElementsMatch(t, slices.Collect(connIndex.ForUser(wc2.UserId)), []*WebConn{wc2, wc3, wc4})
|
|
|
|
|
assert.ElementsMatch(t, slices.Collect(connIndex.ForUser(wc1.UserId)), []*WebConn{wc1})
|
2025-02-13 22:49:36 -05:00
|
|
|
assert.True(t, connIndex.Has(wc2))
|
|
|
|
|
assert.True(t, connIndex.Has(wc1))
|
|
|
|
|
assert.Len(t, connIndex.All(), 4)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
t.Run("RemoveMiddleUser2", func(t *testing.T) {
|
|
|
|
|
connIndex.Remove(wc3) // Remove from middle from user2
|
|
|
|
|
|
2025-05-11 02:30:12 -04:00
|
|
|
assert.ElementsMatch(t, slices.Collect(connIndex.ForUser(wc2.UserId)), []*WebConn{wc2, wc4})
|
|
|
|
|
assert.ElementsMatch(t, slices.Collect(connIndex.ForUser(wc1.UserId)), []*WebConn{wc1})
|
2025-02-13 22:49:36 -05:00
|
|
|
assert.True(t, connIndex.Has(wc2))
|
|
|
|
|
assert.False(t, connIndex.Has(wc3))
|
|
|
|
|
assert.True(t, connIndex.Has(wc4))
|
|
|
|
|
assert.Len(t, connIndex.All(), 3)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
t.Run("RemoveUser1", func(t *testing.T) {
|
|
|
|
|
connIndex.Remove(wc1) // Remove sole connection from user1
|
|
|
|
|
|
2025-05-11 02:30:12 -04:00
|
|
|
assert.ElementsMatch(t, slices.Collect(connIndex.ForUser(wc2.UserId)), []*WebConn{wc2, wc4})
|
|
|
|
|
assert.ElementsMatch(t, slices.Collect(connIndex.ForUser(wc1.UserId)), []*WebConn{})
|
|
|
|
|
assert.Len(t, slices.Collect(connIndex.ForUser(wc1.UserId)), 0)
|
2025-02-13 22:49:36 -05:00
|
|
|
assert.Len(t, connIndex.All(), 2)
|
|
|
|
|
assert.False(t, connIndex.Has(wc1))
|
|
|
|
|
assert.True(t, connIndex.Has(wc2))
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
t.Run("RemoveEndUser2", func(t *testing.T) {
|
|
|
|
|
connIndex.Remove(wc4) // Remove from end from user2
|
|
|
|
|
|
2025-05-11 02:30:12 -04:00
|
|
|
assert.ElementsMatch(t, slices.Collect(connIndex.ForUser(wc2.UserId)), []*WebConn{wc2})
|
|
|
|
|
assert.ElementsMatch(t, slices.Collect(connIndex.ForUser(wc1.UserId)), []*WebConn{})
|
2025-02-13 22:49:36 -05:00
|
|
|
assert.True(t, connIndex.Has(wc2))
|
|
|
|
|
assert.False(t, connIndex.Has(wc3))
|
|
|
|
|
assert.False(t, connIndex.Has(wc4))
|
|
|
|
|
assert.Len(t, connIndex.All(), 1)
|
|
|
|
|
})
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
t.Run("ByConnectionId", func(t *testing.T) {
|
|
|
|
|
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, fastIterate)
|
|
|
|
|
|
|
|
|
|
// User1
|
|
|
|
|
wc1ID := model.NewId()
|
|
|
|
|
wc1 := &WebConn{
|
|
|
|
|
Platform: th.Service,
|
|
|
|
|
Suite: th.Suite,
|
|
|
|
|
UserId: th.BasicUser.Id,
|
|
|
|
|
}
|
|
|
|
|
wc1.SetConnectionID(wc1ID)
|
|
|
|
|
wc1.SetSession(&model.Session{})
|
|
|
|
|
|
|
|
|
|
// User2
|
|
|
|
|
wc2ID := model.NewId()
|
|
|
|
|
wc2 := &WebConn{
|
|
|
|
|
Platform: th.Service,
|
|
|
|
|
Suite: th.Suite,
|
|
|
|
|
UserId: th.BasicUser2.Id,
|
|
|
|
|
}
|
|
|
|
|
wc2.SetConnectionID(wc2ID)
|
|
|
|
|
wc2.SetSession(&model.Session{})
|
|
|
|
|
|
|
|
|
|
wc3ID := model.NewId()
|
|
|
|
|
wc3 := &WebConn{
|
|
|
|
|
Platform: th.Service,
|
|
|
|
|
Suite: th.Suite,
|
|
|
|
|
UserId: wc2.UserId,
|
|
|
|
|
}
|
|
|
|
|
wc3.SetConnectionID(wc3ID)
|
|
|
|
|
wc3.SetSession(&model.Session{})
|
|
|
|
|
|
|
|
|
|
t.Run("no connections", func(t *testing.T) {
|
|
|
|
|
assert.False(t, connIndex.Has(wc1))
|
|
|
|
|
assert.False(t, connIndex.Has(wc2))
|
|
|
|
|
assert.False(t, connIndex.Has(wc3))
|
|
|
|
|
assert.Empty(t, connIndex.byConnectionId)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
t.Run("adding", func(t *testing.T) {
|
2025-04-29 04:23:53 -04:00
|
|
|
err = connIndex.Add(wc1)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
err = connIndex.Add(wc3)
|
|
|
|
|
require.NoError(t, err)
|
2025-02-13 22:49:36 -05:00
|
|
|
|
|
|
|
|
assert.Len(t, connIndex.byConnectionId, 2)
|
|
|
|
|
assert.Equal(t, wc1, connIndex.ForConnection(wc1ID))
|
|
|
|
|
assert.Equal(t, wc3, connIndex.ForConnection(wc3ID))
|
|
|
|
|
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc2ID))
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
t.Run("removing", func(t *testing.T) {
|
|
|
|
|
connIndex.Remove(wc3)
|
|
|
|
|
|
|
|
|
|
assert.Len(t, connIndex.byConnectionId, 1)
|
|
|
|
|
assert.Equal(t, wc1, connIndex.ForConnection(wc1ID))
|
|
|
|
|
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc3ID))
|
|
|
|
|
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc2ID))
|
|
|
|
|
})
|
|
|
|
|
})
|
2024-11-07 23:27:54 -05:00
|
|
|
})
|
2025-02-13 22:49:36 -05:00
|
|
|
}
|
2020-05-22 01:09:21 -04:00
|
|
|
|
2024-11-07 23:27:54 -05:00
|
|
|
t.Run("ByChannelId", func(t *testing.T) {
|
2025-02-13 22:49:36 -05:00
|
|
|
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, true)
|
2020-05-22 01:09:21 -04:00
|
|
|
|
2024-11-07 23:27:54 -05:00
|
|
|
// User1
|
|
|
|
|
wc1ID := model.NewId()
|
|
|
|
|
wc1 := &WebConn{
|
|
|
|
|
Platform: th.Service,
|
|
|
|
|
Suite: th.Suite,
|
|
|
|
|
UserId: th.BasicUser.Id,
|
|
|
|
|
}
|
|
|
|
|
wc1.SetConnectionID(wc1ID)
|
|
|
|
|
wc1.SetSession(&model.Session{})
|
|
|
|
|
|
|
|
|
|
// User2
|
|
|
|
|
wc2ID := model.NewId()
|
|
|
|
|
wc2 := &WebConn{
|
|
|
|
|
Platform: th.Service,
|
|
|
|
|
Suite: th.Suite,
|
|
|
|
|
UserId: th.BasicUser2.Id,
|
|
|
|
|
}
|
|
|
|
|
wc2.SetConnectionID(wc2ID)
|
|
|
|
|
wc2.SetSession(&model.Session{})
|
|
|
|
|
|
|
|
|
|
wc3ID := model.NewId()
|
|
|
|
|
wc3 := &WebConn{
|
|
|
|
|
Platform: th.Service,
|
|
|
|
|
Suite: th.Suite,
|
|
|
|
|
UserId: wc2.UserId,
|
|
|
|
|
}
|
|
|
|
|
wc3.SetConnectionID(wc3ID)
|
|
|
|
|
wc3.SetSession(&model.Session{})
|
|
|
|
|
|
2025-04-29 04:23:53 -04:00
|
|
|
err = connIndex.Add(wc1)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
err = connIndex.Add(wc2)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
err = connIndex.Add(wc3)
|
|
|
|
|
require.NoError(t, err)
|
2024-11-07 23:27:54 -05:00
|
|
|
|
|
|
|
|
t.Run("ForChannel", func(t *testing.T) {
|
|
|
|
|
require.Len(t, connIndex.byChannelID, 1)
|
2025-02-17 09:09:39 -05:00
|
|
|
ids := make([]string, 0)
|
2025-05-11 02:30:12 -04:00
|
|
|
for c := range connIndex.ForChannel(th.BasicChannel.Id) {
|
2025-02-17 09:09:39 -05:00
|
|
|
ids = append(ids, c.GetConnectionID())
|
|
|
|
|
}
|
|
|
|
|
require.ElementsMatch(t, []string{wc1ID, wc2ID, wc3ID}, ids)
|
2025-05-11 02:30:12 -04:00
|
|
|
require.Len(t, slices.Collect(connIndex.ForChannel("notexist")), 0)
|
2024-11-07 23:27:54 -05:00
|
|
|
})
|
|
|
|
|
|
2025-11-12 07:00:51 -05:00
|
|
|
ch := th.CreateChannel(t, th.BasicTeam)
|
2024-11-07 23:27:54 -05:00
|
|
|
_, err = th.Service.Store.Channel().SaveMember(th.Context, &model.ChannelMember{
|
|
|
|
|
ChannelId: ch.Id,
|
|
|
|
|
UserId: th.BasicUser2.Id,
|
|
|
|
|
NotifyProps: model.GetDefaultChannelNotifyProps(),
|
|
|
|
|
SchemeGuest: th.BasicUser2.IsGuest(),
|
|
|
|
|
SchemeUser: !th.BasicUser2.IsGuest(),
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
t.Run("InvalidateCMCacheForUser", func(t *testing.T) {
|
|
|
|
|
require.NoError(t, connIndex.InvalidateCMCacheForUser(th.BasicUser2.Id))
|
|
|
|
|
require.Len(t, connIndex.byChannelID, 2)
|
2025-05-11 02:30:12 -04:00
|
|
|
require.Len(t, slices.Collect(connIndex.ForChannel(th.BasicChannel.Id)), 3)
|
|
|
|
|
require.Len(t, slices.Collect(connIndex.ForChannel(ch.Id)), 2)
|
2024-11-07 23:27:54 -05:00
|
|
|
})
|
|
|
|
|
|
|
|
|
|
t.Run("Remove", func(t *testing.T) {
|
|
|
|
|
connIndex.Remove(wc3)
|
|
|
|
|
require.Len(t, connIndex.byChannelID, 2)
|
2025-05-11 02:30:12 -04:00
|
|
|
require.Len(t, slices.Collect(connIndex.ForChannel(th.BasicChannel.Id)), 2)
|
2024-11-07 23:27:54 -05:00
|
|
|
})
|
2020-05-22 01:09:21 -04:00
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
2024-01-04 14:36:22 -05:00
|
|
|
func TestHubConnIndexIncorrectRemoval(t *testing.T) {
|
2025-05-30 07:58:26 -04:00
|
|
|
mainHelper.Parallel(t)
|
2024-01-04 14:36:22 -05:00
|
|
|
th := Setup(t)
|
|
|
|
|
|
2025-02-13 22:49:36 -05:00
|
|
|
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, false)
|
2024-01-04 14:36:22 -05:00
|
|
|
|
|
|
|
|
// User2
|
|
|
|
|
wc2 := &WebConn{
|
|
|
|
|
Platform: th.Service,
|
|
|
|
|
Suite: th.Suite,
|
|
|
|
|
UserId: model.NewId(),
|
|
|
|
|
}
|
|
|
|
|
wc2.SetConnectionID("first")
|
|
|
|
|
wc2.SetSession(&model.Session{})
|
|
|
|
|
|
|
|
|
|
wc3 := &WebConn{
|
|
|
|
|
Platform: th.Service,
|
|
|
|
|
Suite: th.Suite,
|
|
|
|
|
UserId: wc2.UserId,
|
|
|
|
|
}
|
|
|
|
|
wc3.SetConnectionID("myID")
|
|
|
|
|
wc3.SetSession(&model.Session{})
|
|
|
|
|
|
|
|
|
|
wc4 := &WebConn{
|
|
|
|
|
Platform: th.Service,
|
|
|
|
|
Suite: th.Suite,
|
|
|
|
|
UserId: wc2.UserId,
|
|
|
|
|
}
|
|
|
|
|
wc4.SetConnectionID("last")
|
|
|
|
|
wc4.SetSession(&model.Session{})
|
|
|
|
|
|
2025-04-29 04:23:53 -04:00
|
|
|
err := connIndex.Add(wc2)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
err = connIndex.Add(wc3)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
err = connIndex.Add(wc4)
|
|
|
|
|
require.NoError(t, err)
|
2024-01-04 14:36:22 -05:00
|
|
|
|
2025-05-11 02:30:12 -04:00
|
|
|
for wc := range connIndex.ForUser(wc2.UserId) {
|
2024-01-04 14:36:22 -05:00
|
|
|
if !connIndex.Has(wc) {
|
|
|
|
|
require.Failf(t, "Failed to find connection", "connection: %v", wc)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if connIndex.ForConnection("myID") != nil {
|
|
|
|
|
connIndex.Remove(wc)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
MM-32950: Reliable WebSockets: Basic single server (#17406)
* MM-32950: Reliable WebSockets: Basic single server
This PR adds reliable websocket support for a single server.
Below is a brief overview of the three states of a connection:
Normal:
- All messages are routed via web hub.
- Each web conn has a send queue to which it gets pushed.
- A message gets pulled from the queue, and before it
gets written to the wire, it is added to the dead queue.
Disconnect:
- Hub Unregister gets called, where the connection is just
marked as inactive. And new messages keep getting pushed
to the send queue.
If it gets full, the channel is closed and the conn gets removed
from conn index.
Reconnect:
- We query the hub for the connection ID, and get back the
queues.
- We construct a WebConn reusing the old queues, or a fresh one
depending on whether the connection ID was found or not.
- Now there is a tricky bit here which needs to be carefully processed.
On register, we would always send the hello message in the send queue.
But we cannot do that now because the send queue might already have messages.
Therefore, we don't send the hello message from web hub, if we reuse a connection.
Instead, we move that logic to the web conn write pump. We check if
the sequence number is in dead queue, and if it is, then we drain
the dead queue, and start consuming from the active queue.
No hello message is sent here.
But if the message does not exist in the dead queue, and the sequence number
is actually something that should have existed, then we set
a new connction id and clear the dead queue, and send a hello message.
The client, on receiving a new connection id will automatically
set its sequence number to 0, and make the sync API calls to manage
any lost data.
https://mattermost.atlassian.net/browse/MM-32590
```release-note
NONE
```
* gofmt
* Add EnableReliableWebSockets to the client config
* Refactoring isInDeadQueue
* Passing index to drainDeadQueue
* refactoring webconn
* fix pointer
* review comments
* simplify hasMsgLoss
* safety comment
* fix test
* Trigger CI
* Trigger CI
Co-authored-by: Devin Binnie <devin.binnie@mattermost.com>
Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
2021-04-26 10:21:25 -04:00
|
|
|
func TestHubConnIndexInactive(t *testing.T) {
|
2025-05-30 07:58:26 -04:00
|
|
|
mainHelper.Parallel(t)
|
MM-46604: Fix racy access to session props (#20996)
The core mistake was that the webconn doesn't really go out of scope
once the connection disconnects. It is kept in the webhub connIndex
to be reconnected if the user connects again. This was the new
behavior as part of reliable websockets.
Therefore, it was a mistake to return the session to the pool
once the connection drops. Because the connection would still
recieve events from the web_hub.
And once you release the session, another login might acquire the
session and set some props, while the web_hub might still try
to send events to it, which will cause a read of the map prop.
The following test case illustrates such a race. It is very
hard to trigger it organically, hence I artificially wrote
the code.
The right fix is to release the session only when the connection
is stale and gets deleted from the conn index. The PR has been
load tested in `-race` mode just for extra sanity check.
```go
func TestHubSessionRace(t *testing.T) {
th := Setup(t).InitBasic()
defer th.TearDown()
s := httptest.NewServer(dummyWebsocketHandler(t))
defer s.Close()
th.Server.HubStart()
wc1 := registerDummyWebConn(t, th.App, s.Listener.Addr(), th.BasicUser.Id)
defer wc1.Close()
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
token := wc1.GetSessionToken()
// Return to pool after *WebConn.Pump finishes
wc1.App.Srv().userService.ReturnSessionToPool(wc1.GetSession())
// A new HTTP requests acquires a session which gets it from the pool
sess, _ := wc1.App.GetSession(token)
// Login happens which sets some session properties
sess.AddProp(model.SessionPropPlatform, "chrome")
}()
go func() {
defer wg.Done()
// Called from *WebConn.shouldSendEvent
t.Log("session: ", wc1.GetSession().Props[model.SessionPropIsGuest] == "true")
}()
wg.Wait()
}
```
https://mattermost.atlassian.net/browse/MM-46604
```release-note
NONE
```
2022-09-14 04:57:24 -04:00
|
|
|
th := Setup(t)
|
|
|
|
|
|
2025-02-13 22:49:36 -05:00
|
|
|
connIndex := newHubConnectionIndex(2*time.Second, th.Service.Store, th.Service.logger, false)
|
MM-32950: Reliable WebSockets: Basic single server (#17406)
* MM-32950: Reliable WebSockets: Basic single server
This PR adds reliable websocket support for a single server.
Below is a brief overview of the three states of a connection:
Normal:
- All messages are routed via web hub.
- Each web conn has a send queue to which it gets pushed.
- A message gets pulled from the queue, and before it
gets written to the wire, it is added to the dead queue.
Disconnect:
- Hub Unregister gets called, where the connection is just
marked as inactive. And new messages keep getting pushed
to the send queue.
If it gets full, the channel is closed and the conn gets removed
from conn index.
Reconnect:
- We query the hub for the connection ID, and get back the
queues.
- We construct a WebConn reusing the old queues, or a fresh one
depending on whether the connection ID was found or not.
- Now there is a tricky bit here which needs to be carefully processed.
On register, we would always send the hello message in the send queue.
But we cannot do that now because the send queue might already have messages.
Therefore, we don't send the hello message from web hub, if we reuse a connection.
Instead, we move that logic to the web conn write pump. We check if
the sequence number is in dead queue, and if it is, then we drain
the dead queue, and start consuming from the active queue.
No hello message is sent here.
But if the message does not exist in the dead queue, and the sequence number
is actually something that should have existed, then we set
a new connction id and clear the dead queue, and send a hello message.
The client, on receiving a new connection id will automatically
set its sequence number to 0, and make the sync API calls to manage
any lost data.
https://mattermost.atlassian.net/browse/MM-32590
```release-note
NONE
```
* gofmt
* Add EnableReliableWebSockets to the client config
* Refactoring isInDeadQueue
* Passing index to drainDeadQueue
* refactoring webconn
* fix pointer
* review comments
* simplify hasMsgLoss
* safety comment
* fix test
* Trigger CI
* Trigger CI
Co-authored-by: Devin Binnie <devin.binnie@mattermost.com>
Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
2021-04-26 10:21:25 -04:00
|
|
|
|
|
|
|
|
// User1
|
|
|
|
|
wc1 := &WebConn{
|
2022-10-06 04:04:21 -04:00
|
|
|
Platform: th.Service,
|
|
|
|
|
UserId: model.NewId(),
|
MM-32950: Reliable WebSockets: Basic single server (#17406)
* MM-32950: Reliable WebSockets: Basic single server
This PR adds reliable websocket support for a single server.
Below is a brief overview of the three states of a connection:
Normal:
- All messages are routed via web hub.
- Each web conn has a send queue to which it gets pushed.
- A message gets pulled from the queue, and before it
gets written to the wire, it is added to the dead queue.
Disconnect:
- Hub Unregister gets called, where the connection is just
marked as inactive. And new messages keep getting pushed
to the send queue.
If it gets full, the channel is closed and the conn gets removed
from conn index.
Reconnect:
- We query the hub for the connection ID, and get back the
queues.
- We construct a WebConn reusing the old queues, or a fresh one
depending on whether the connection ID was found or not.
- Now there is a tricky bit here which needs to be carefully processed.
On register, we would always send the hello message in the send queue.
But we cannot do that now because the send queue might already have messages.
Therefore, we don't send the hello message from web hub, if we reuse a connection.
Instead, we move that logic to the web conn write pump. We check if
the sequence number is in dead queue, and if it is, then we drain
the dead queue, and start consuming from the active queue.
No hello message is sent here.
But if the message does not exist in the dead queue, and the sequence number
is actually something that should have existed, then we set
a new connction id and clear the dead queue, and send a hello message.
The client, on receiving a new connection id will automatically
set its sequence number to 0, and make the sync API calls to manage
any lost data.
https://mattermost.atlassian.net/browse/MM-32590
```release-note
NONE
```
* gofmt
* Add EnableReliableWebSockets to the client config
* Refactoring isInDeadQueue
* Passing index to drainDeadQueue
* refactoring webconn
* fix pointer
* review comments
* simplify hasMsgLoss
* safety comment
* fix test
* Trigger CI
* Trigger CI
Co-authored-by: Devin Binnie <devin.binnie@mattermost.com>
Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
2021-04-26 10:21:25 -04:00
|
|
|
}
|
2024-06-03 09:44:53 -04:00
|
|
|
wc1.Active.Store(true)
|
MM-32950: Reliable WebSockets: Basic single server (#17406)
* MM-32950: Reliable WebSockets: Basic single server
This PR adds reliable websocket support for a single server.
Below is a brief overview of the three states of a connection:
Normal:
- All messages are routed via web hub.
- Each web conn has a send queue to which it gets pushed.
- A message gets pulled from the queue, and before it
gets written to the wire, it is added to the dead queue.
Disconnect:
- Hub Unregister gets called, where the connection is just
marked as inactive. And new messages keep getting pushed
to the send queue.
If it gets full, the channel is closed and the conn gets removed
from conn index.
Reconnect:
- We query the hub for the connection ID, and get back the
queues.
- We construct a WebConn reusing the old queues, or a fresh one
depending on whether the connection ID was found or not.
- Now there is a tricky bit here which needs to be carefully processed.
On register, we would always send the hello message in the send queue.
But we cannot do that now because the send queue might already have messages.
Therefore, we don't send the hello message from web hub, if we reuse a connection.
Instead, we move that logic to the web conn write pump. We check if
the sequence number is in dead queue, and if it is, then we drain
the dead queue, and start consuming from the active queue.
No hello message is sent here.
But if the message does not exist in the dead queue, and the sequence number
is actually something that should have existed, then we set
a new connction id and clear the dead queue, and send a hello message.
The client, on receiving a new connection id will automatically
set its sequence number to 0, and make the sync API calls to manage
any lost data.
https://mattermost.atlassian.net/browse/MM-32590
```release-note
NONE
```
* gofmt
* Add EnableReliableWebSockets to the client config
* Refactoring isInDeadQueue
* Passing index to drainDeadQueue
* refactoring webconn
* fix pointer
* review comments
* simplify hasMsgLoss
* safety comment
* fix test
* Trigger CI
* Trigger CI
Co-authored-by: Devin Binnie <devin.binnie@mattermost.com>
Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
2021-04-26 10:21:25 -04:00
|
|
|
wc1.SetConnectionID("conn1")
|
MM-46604: Fix racy access to session props (#20996)
The core mistake was that the webconn doesn't really go out of scope
once the connection disconnects. It is kept in the webhub connIndex
to be reconnected if the user connects again. This was the new
behavior as part of reliable websockets.
Therefore, it was a mistake to return the session to the pool
once the connection drops. Because the connection would still
recieve events from the web_hub.
And once you release the session, another login might acquire the
session and set some props, while the web_hub might still try
to send events to it, which will cause a read of the map prop.
The following test case illustrates such a race. It is very
hard to trigger it organically, hence I artificially wrote
the code.
The right fix is to release the session only when the connection
is stale and gets deleted from the conn index. The PR has been
load tested in `-race` mode just for extra sanity check.
```go
func TestHubSessionRace(t *testing.T) {
th := Setup(t).InitBasic()
defer th.TearDown()
s := httptest.NewServer(dummyWebsocketHandler(t))
defer s.Close()
th.Server.HubStart()
wc1 := registerDummyWebConn(t, th.App, s.Listener.Addr(), th.BasicUser.Id)
defer wc1.Close()
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
token := wc1.GetSessionToken()
// Return to pool after *WebConn.Pump finishes
wc1.App.Srv().userService.ReturnSessionToPool(wc1.GetSession())
// A new HTTP requests acquires a session which gets it from the pool
sess, _ := wc1.App.GetSession(token)
// Login happens which sets some session properties
sess.AddProp(model.SessionPropPlatform, "chrome")
}()
go func() {
defer wg.Done()
// Called from *WebConn.shouldSendEvent
t.Log("session: ", wc1.GetSession().Props[model.SessionPropIsGuest] == "true")
}()
wg.Wait()
}
```
https://mattermost.atlassian.net/browse/MM-46604
```release-note
NONE
```
2022-09-14 04:57:24 -04:00
|
|
|
wc1.SetSession(&model.Session{})
|
MM-32950: Reliable WebSockets: Basic single server (#17406)
* MM-32950: Reliable WebSockets: Basic single server
This PR adds reliable websocket support for a single server.
Below is a brief overview of the three states of a connection:
Normal:
- All messages are routed via web hub.
- Each web conn has a send queue to which it gets pushed.
- A message gets pulled from the queue, and before it
gets written to the wire, it is added to the dead queue.
Disconnect:
- Hub Unregister gets called, where the connection is just
marked as inactive. And new messages keep getting pushed
to the send queue.
If it gets full, the channel is closed and the conn gets removed
from conn index.
Reconnect:
- We query the hub for the connection ID, and get back the
queues.
- We construct a WebConn reusing the old queues, or a fresh one
depending on whether the connection ID was found or not.
- Now there is a tricky bit here which needs to be carefully processed.
On register, we would always send the hello message in the send queue.
But we cannot do that now because the send queue might already have messages.
Therefore, we don't send the hello message from web hub, if we reuse a connection.
Instead, we move that logic to the web conn write pump. We check if
the sequence number is in dead queue, and if it is, then we drain
the dead queue, and start consuming from the active queue.
No hello message is sent here.
But if the message does not exist in the dead queue, and the sequence number
is actually something that should have existed, then we set
a new connction id and clear the dead queue, and send a hello message.
The client, on receiving a new connection id will automatically
set its sequence number to 0, and make the sync API calls to manage
any lost data.
https://mattermost.atlassian.net/browse/MM-32590
```release-note
NONE
```
* gofmt
* Add EnableReliableWebSockets to the client config
* Refactoring isInDeadQueue
* Passing index to drainDeadQueue
* refactoring webconn
* fix pointer
* review comments
* simplify hasMsgLoss
* safety comment
* fix test
* Trigger CI
* Trigger CI
Co-authored-by: Devin Binnie <devin.binnie@mattermost.com>
Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
2021-04-26 10:21:25 -04:00
|
|
|
|
|
|
|
|
// User2
|
|
|
|
|
wc2 := &WebConn{
|
2022-10-06 04:04:21 -04:00
|
|
|
Platform: th.Service,
|
|
|
|
|
UserId: model.NewId(),
|
MM-32950: Reliable WebSockets: Basic single server (#17406)
* MM-32950: Reliable WebSockets: Basic single server
This PR adds reliable websocket support for a single server.
Below is a brief overview of the three states of a connection:
Normal:
- All messages are routed via web hub.
- Each web conn has a send queue to which it gets pushed.
- A message gets pulled from the queue, and before it
gets written to the wire, it is added to the dead queue.
Disconnect:
- Hub Unregister gets called, where the connection is just
marked as inactive. And new messages keep getting pushed
to the send queue.
If it gets full, the channel is closed and the conn gets removed
from conn index.
Reconnect:
- We query the hub for the connection ID, and get back the
queues.
- We construct a WebConn reusing the old queues, or a fresh one
depending on whether the connection ID was found or not.
- Now there is a tricky bit here which needs to be carefully processed.
On register, we would always send the hello message in the send queue.
But we cannot do that now because the send queue might already have messages.
Therefore, we don't send the hello message from web hub, if we reuse a connection.
Instead, we move that logic to the web conn write pump. We check if
the sequence number is in dead queue, and if it is, then we drain
the dead queue, and start consuming from the active queue.
No hello message is sent here.
But if the message does not exist in the dead queue, and the sequence number
is actually something that should have existed, then we set
a new connction id and clear the dead queue, and send a hello message.
The client, on receiving a new connection id will automatically
set its sequence number to 0, and make the sync API calls to manage
any lost data.
https://mattermost.atlassian.net/browse/MM-32590
```release-note
NONE
```
* gofmt
* Add EnableReliableWebSockets to the client config
* Refactoring isInDeadQueue
* Passing index to drainDeadQueue
* refactoring webconn
* fix pointer
* review comments
* simplify hasMsgLoss
* safety comment
* fix test
* Trigger CI
* Trigger CI
Co-authored-by: Devin Binnie <devin.binnie@mattermost.com>
Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
2021-04-26 10:21:25 -04:00
|
|
|
}
|
2024-06-03 09:44:53 -04:00
|
|
|
wc2.Active.Store(true)
|
MM-32950: Reliable WebSockets: Basic single server (#17406)
* MM-32950: Reliable WebSockets: Basic single server
This PR adds reliable websocket support for a single server.
Below is a brief overview of the three states of a connection:
Normal:
- All messages are routed via web hub.
- Each web conn has a send queue to which it gets pushed.
- A message gets pulled from the queue, and before it
gets written to the wire, it is added to the dead queue.
Disconnect:
- Hub Unregister gets called, where the connection is just
marked as inactive. And new messages keep getting pushed
to the send queue.
If it gets full, the channel is closed and the conn gets removed
from conn index.
Reconnect:
- We query the hub for the connection ID, and get back the
queues.
- We construct a WebConn reusing the old queues, or a fresh one
depending on whether the connection ID was found or not.
- Now there is a tricky bit here which needs to be carefully processed.
On register, we would always send the hello message in the send queue.
But we cannot do that now because the send queue might already have messages.
Therefore, we don't send the hello message from web hub, if we reuse a connection.
Instead, we move that logic to the web conn write pump. We check if
the sequence number is in dead queue, and if it is, then we drain
the dead queue, and start consuming from the active queue.
No hello message is sent here.
But if the message does not exist in the dead queue, and the sequence number
is actually something that should have existed, then we set
a new connction id and clear the dead queue, and send a hello message.
The client, on receiving a new connection id will automatically
set its sequence number to 0, and make the sync API calls to manage
any lost data.
https://mattermost.atlassian.net/browse/MM-32590
```release-note
NONE
```
* gofmt
* Add EnableReliableWebSockets to the client config
* Refactoring isInDeadQueue
* Passing index to drainDeadQueue
* refactoring webconn
* fix pointer
* review comments
* simplify hasMsgLoss
* safety comment
* fix test
* Trigger CI
* Trigger CI
Co-authored-by: Devin Binnie <devin.binnie@mattermost.com>
Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
2021-04-26 10:21:25 -04:00
|
|
|
wc2.SetConnectionID("conn2")
|
MM-46604: Fix racy access to session props (#20996)
The core mistake was that the webconn doesn't really go out of scope
once the connection disconnects. It is kept in the webhub connIndex
to be reconnected if the user connects again. This was the new
behavior as part of reliable websockets.
Therefore, it was a mistake to return the session to the pool
once the connection drops. Because the connection would still
recieve events from the web_hub.
And once you release the session, another login might acquire the
session and set some props, while the web_hub might still try
to send events to it, which will cause a read of the map prop.
The following test case illustrates such a race. It is very
hard to trigger it organically, hence I artificially wrote
the code.
The right fix is to release the session only when the connection
is stale and gets deleted from the conn index. The PR has been
load tested in `-race` mode just for extra sanity check.
```go
func TestHubSessionRace(t *testing.T) {
th := Setup(t).InitBasic()
defer th.TearDown()
s := httptest.NewServer(dummyWebsocketHandler(t))
defer s.Close()
th.Server.HubStart()
wc1 := registerDummyWebConn(t, th.App, s.Listener.Addr(), th.BasicUser.Id)
defer wc1.Close()
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
token := wc1.GetSessionToken()
// Return to pool after *WebConn.Pump finishes
wc1.App.Srv().userService.ReturnSessionToPool(wc1.GetSession())
// A new HTTP requests acquires a session which gets it from the pool
sess, _ := wc1.App.GetSession(token)
// Login happens which sets some session properties
sess.AddProp(model.SessionPropPlatform, "chrome")
}()
go func() {
defer wg.Done()
// Called from *WebConn.shouldSendEvent
t.Log("session: ", wc1.GetSession().Props[model.SessionPropIsGuest] == "true")
}()
wg.Wait()
}
```
https://mattermost.atlassian.net/browse/MM-46604
```release-note
NONE
```
2022-09-14 04:57:24 -04:00
|
|
|
wc2.SetSession(&model.Session{})
|
|
|
|
|
|
MM-32950: Reliable WebSockets: Basic single server (#17406)
* MM-32950: Reliable WebSockets: Basic single server
This PR adds reliable websocket support for a single server.
Below is a brief overview of the three states of a connection:
Normal:
- All messages are routed via web hub.
- Each web conn has a send queue to which it gets pushed.
- A message gets pulled from the queue, and before it
gets written to the wire, it is added to the dead queue.
Disconnect:
- Hub Unregister gets called, where the connection is just
marked as inactive. And new messages keep getting pushed
to the send queue.
If it gets full, the channel is closed and the conn gets removed
from conn index.
Reconnect:
- We query the hub for the connection ID, and get back the
queues.
- We construct a WebConn reusing the old queues, or a fresh one
depending on whether the connection ID was found or not.
- Now there is a tricky bit here which needs to be carefully processed.
On register, we would always send the hello message in the send queue.
But we cannot do that now because the send queue might already have messages.
Therefore, we don't send the hello message from web hub, if we reuse a connection.
Instead, we move that logic to the web conn write pump. We check if
the sequence number is in dead queue, and if it is, then we drain
the dead queue, and start consuming from the active queue.
No hello message is sent here.
But if the message does not exist in the dead queue, and the sequence number
is actually something that should have existed, then we set
a new connction id and clear the dead queue, and send a hello message.
The client, on receiving a new connection id will automatically
set its sequence number to 0, and make the sync API calls to manage
any lost data.
https://mattermost.atlassian.net/browse/MM-32590
```release-note
NONE
```
* gofmt
* Add EnableReliableWebSockets to the client config
* Refactoring isInDeadQueue
* Passing index to drainDeadQueue
* refactoring webconn
* fix pointer
* review comments
* simplify hasMsgLoss
* safety comment
* fix test
* Trigger CI
* Trigger CI
Co-authored-by: Devin Binnie <devin.binnie@mattermost.com>
Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
2021-04-26 10:21:25 -04:00
|
|
|
wc3 := &WebConn{
|
2022-10-06 04:04:21 -04:00
|
|
|
Platform: th.Service,
|
|
|
|
|
UserId: wc2.UserId,
|
MM-32950: Reliable WebSockets: Basic single server (#17406)
* MM-32950: Reliable WebSockets: Basic single server
This PR adds reliable websocket support for a single server.
Below is a brief overview of the three states of a connection:
Normal:
- All messages are routed via web hub.
- Each web conn has a send queue to which it gets pushed.
- A message gets pulled from the queue, and before it
gets written to the wire, it is added to the dead queue.
Disconnect:
- Hub Unregister gets called, where the connection is just
marked as inactive. And new messages keep getting pushed
to the send queue.
If it gets full, the channel is closed and the conn gets removed
from conn index.
Reconnect:
- We query the hub for the connection ID, and get back the
queues.
- We construct a WebConn reusing the old queues, or a fresh one
depending on whether the connection ID was found or not.
- Now there is a tricky bit here which needs to be carefully processed.
On register, we would always send the hello message in the send queue.
But we cannot do that now because the send queue might already have messages.
Therefore, we don't send the hello message from web hub, if we reuse a connection.
Instead, we move that logic to the web conn write pump. We check if
the sequence number is in dead queue, and if it is, then we drain
the dead queue, and start consuming from the active queue.
No hello message is sent here.
But if the message does not exist in the dead queue, and the sequence number
is actually something that should have existed, then we set
a new connction id and clear the dead queue, and send a hello message.
The client, on receiving a new connection id will automatically
set its sequence number to 0, and make the sync API calls to manage
any lost data.
https://mattermost.atlassian.net/browse/MM-32590
```release-note
NONE
```
* gofmt
* Add EnableReliableWebSockets to the client config
* Refactoring isInDeadQueue
* Passing index to drainDeadQueue
* refactoring webconn
* fix pointer
* review comments
* simplify hasMsgLoss
* safety comment
* fix test
* Trigger CI
* Trigger CI
Co-authored-by: Devin Binnie <devin.binnie@mattermost.com>
Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
2021-04-26 10:21:25 -04:00
|
|
|
}
|
2024-06-03 09:44:53 -04:00
|
|
|
wc3.Active.Store(false)
|
MM-32950: Reliable WebSockets: Basic single server (#17406)
* MM-32950: Reliable WebSockets: Basic single server
This PR adds reliable websocket support for a single server.
Below is a brief overview of the three states of a connection:
Normal:
- All messages are routed via web hub.
- Each web conn has a send queue to which it gets pushed.
- A message gets pulled from the queue, and before it
gets written to the wire, it is added to the dead queue.
Disconnect:
- Hub Unregister gets called, where the connection is just
marked as inactive. And new messages keep getting pushed
to the send queue.
If it gets full, the channel is closed and the conn gets removed
from conn index.
Reconnect:
- We query the hub for the connection ID, and get back the
queues.
- We construct a WebConn reusing the old queues, or a fresh one
depending on whether the connection ID was found or not.
- Now there is a tricky bit here which needs to be carefully processed.
On register, we would always send the hello message in the send queue.
But we cannot do that now because the send queue might already have messages.
Therefore, we don't send the hello message from web hub, if we reuse a connection.
Instead, we move that logic to the web conn write pump. We check if
the sequence number is in dead queue, and if it is, then we drain
the dead queue, and start consuming from the active queue.
No hello message is sent here.
But if the message does not exist in the dead queue, and the sequence number
is actually something that should have existed, then we set
a new connction id and clear the dead queue, and send a hello message.
The client, on receiving a new connection id will automatically
set its sequence number to 0, and make the sync API calls to manage
any lost data.
https://mattermost.atlassian.net/browse/MM-32590
```release-note
NONE
```
* gofmt
* Add EnableReliableWebSockets to the client config
* Refactoring isInDeadQueue
* Passing index to drainDeadQueue
* refactoring webconn
* fix pointer
* review comments
* simplify hasMsgLoss
* safety comment
* fix test
* Trigger CI
* Trigger CI
Co-authored-by: Devin Binnie <devin.binnie@mattermost.com>
Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
2021-04-26 10:21:25 -04:00
|
|
|
wc3.SetConnectionID("conn3")
|
MM-46604: Fix racy access to session props (#20996)
The core mistake was that the webconn doesn't really go out of scope
once the connection disconnects. It is kept in the webhub connIndex
to be reconnected if the user connects again. This was the new
behavior as part of reliable websockets.
Therefore, it was a mistake to return the session to the pool
once the connection drops. Because the connection would still
recieve events from the web_hub.
And once you release the session, another login might acquire the
session and set some props, while the web_hub might still try
to send events to it, which will cause a read of the map prop.
The following test case illustrates such a race. It is very
hard to trigger it organically, hence I artificially wrote
the code.
The right fix is to release the session only when the connection
is stale and gets deleted from the conn index. The PR has been
load tested in `-race` mode just for extra sanity check.
```go
func TestHubSessionRace(t *testing.T) {
th := Setup(t).InitBasic()
defer th.TearDown()
s := httptest.NewServer(dummyWebsocketHandler(t))
defer s.Close()
th.Server.HubStart()
wc1 := registerDummyWebConn(t, th.App, s.Listener.Addr(), th.BasicUser.Id)
defer wc1.Close()
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
token := wc1.GetSessionToken()
// Return to pool after *WebConn.Pump finishes
wc1.App.Srv().userService.ReturnSessionToPool(wc1.GetSession())
// A new HTTP requests acquires a session which gets it from the pool
sess, _ := wc1.App.GetSession(token)
// Login happens which sets some session properties
sess.AddProp(model.SessionPropPlatform, "chrome")
}()
go func() {
defer wg.Done()
// Called from *WebConn.shouldSendEvent
t.Log("session: ", wc1.GetSession().Props[model.SessionPropIsGuest] == "true")
}()
wg.Wait()
}
```
https://mattermost.atlassian.net/browse/MM-46604
```release-note
NONE
```
2022-09-14 04:57:24 -04:00
|
|
|
wc3.SetSession(&model.Session{})
|
MM-32950: Reliable WebSockets: Basic single server (#17406)
* MM-32950: Reliable WebSockets: Basic single server
This PR adds reliable websocket support for a single server.
Below is a brief overview of the three states of a connection:
Normal:
- All messages are routed via web hub.
- Each web conn has a send queue to which it gets pushed.
- A message gets pulled from the queue, and before it
gets written to the wire, it is added to the dead queue.
Disconnect:
- Hub Unregister gets called, where the connection is just
marked as inactive. And new messages keep getting pushed
to the send queue.
If it gets full, the channel is closed and the conn gets removed
from conn index.
Reconnect:
- We query the hub for the connection ID, and get back the
queues.
- We construct a WebConn reusing the old queues, or a fresh one
depending on whether the connection ID was found or not.
- Now there is a tricky bit here which needs to be carefully processed.
On register, we would always send the hello message in the send queue.
But we cannot do that now because the send queue might already have messages.
Therefore, we don't send the hello message from web hub, if we reuse a connection.
Instead, we move that logic to the web conn write pump. We check if
the sequence number is in dead queue, and if it is, then we drain
the dead queue, and start consuming from the active queue.
No hello message is sent here.
But if the message does not exist in the dead queue, and the sequence number
is actually something that should have existed, then we set
a new connction id and clear the dead queue, and send a hello message.
The client, on receiving a new connection id will automatically
set its sequence number to 0, and make the sync API calls to manage
any lost data.
https://mattermost.atlassian.net/browse/MM-32590
```release-note
NONE
```
* gofmt
* Add EnableReliableWebSockets to the client config
* Refactoring isInDeadQueue
* Passing index to drainDeadQueue
* refactoring webconn
* fix pointer
* review comments
* simplify hasMsgLoss
* safety comment
* fix test
* Trigger CI
* Trigger CI
Co-authored-by: Devin Binnie <devin.binnie@mattermost.com>
Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
2021-04-26 10:21:25 -04:00
|
|
|
|
2025-04-29 04:23:53 -04:00
|
|
|
err := connIndex.Add(wc1)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
err = connIndex.Add(wc2)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
err = connIndex.Add(wc3)
|
|
|
|
|
require.NoError(t, err)
|
MM-32950: Reliable WebSockets: Basic single server (#17406)
* MM-32950: Reliable WebSockets: Basic single server
This PR adds reliable websocket support for a single server.
Below is a brief overview of the three states of a connection:
Normal:
- All messages are routed via web hub.
- Each web conn has a send queue to which it gets pushed.
- A message gets pulled from the queue, and before it
gets written to the wire, it is added to the dead queue.
Disconnect:
- Hub Unregister gets called, where the connection is just
marked as inactive. And new messages keep getting pushed
to the send queue.
If it gets full, the channel is closed and the conn gets removed
from conn index.
Reconnect:
- We query the hub for the connection ID, and get back the
queues.
- We construct a WebConn reusing the old queues, or a fresh one
depending on whether the connection ID was found or not.
- Now there is a tricky bit here which needs to be carefully processed.
On register, we would always send the hello message in the send queue.
But we cannot do that now because the send queue might already have messages.
Therefore, we don't send the hello message from web hub, if we reuse a connection.
Instead, we move that logic to the web conn write pump. We check if
the sequence number is in dead queue, and if it is, then we drain
the dead queue, and start consuming from the active queue.
No hello message is sent here.
But if the message does not exist in the dead queue, and the sequence number
is actually something that should have existed, then we set
a new connction id and clear the dead queue, and send a hello message.
The client, on receiving a new connection id will automatically
set its sequence number to 0, and make the sync API calls to manage
any lost data.
https://mattermost.atlassian.net/browse/MM-32590
```release-note
NONE
```
* gofmt
* Add EnableReliableWebSockets to the client config
* Refactoring isInDeadQueue
* Passing index to drainDeadQueue
* refactoring webconn
* fix pointer
* review comments
* simplify hasMsgLoss
* safety comment
* fix test
* Trigger CI
* Trigger CI
Co-authored-by: Devin Binnie <devin.binnie@mattermost.com>
Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
2021-04-26 10:21:25 -04:00
|
|
|
|
|
|
|
|
assert.Nil(t, connIndex.RemoveInactiveByConnectionID(wc2.UserId, "conn2"))
|
2024-04-30 09:58:55 -04:00
|
|
|
assert.Equal(t, connIndex.ForUserActiveCount(wc2.UserId), 1)
|
MM-32950: Reliable WebSockets: Basic single server (#17406)
* MM-32950: Reliable WebSockets: Basic single server
This PR adds reliable websocket support for a single server.
Below is a brief overview of the three states of a connection:
Normal:
- All messages are routed via web hub.
- Each web conn has a send queue to which it gets pushed.
- A message gets pulled from the queue, and before it
gets written to the wire, it is added to the dead queue.
Disconnect:
- Hub Unregister gets called, where the connection is just
marked as inactive. And new messages keep getting pushed
to the send queue.
If it gets full, the channel is closed and the conn gets removed
from conn index.
Reconnect:
- We query the hub for the connection ID, and get back the
queues.
- We construct a WebConn reusing the old queues, or a fresh one
depending on whether the connection ID was found or not.
- Now there is a tricky bit here which needs to be carefully processed.
On register, we would always send the hello message in the send queue.
But we cannot do that now because the send queue might already have messages.
Therefore, we don't send the hello message from web hub, if we reuse a connection.
Instead, we move that logic to the web conn write pump. We check if
the sequence number is in dead queue, and if it is, then we drain
the dead queue, and start consuming from the active queue.
No hello message is sent here.
But if the message does not exist in the dead queue, and the sequence number
is actually something that should have existed, then we set
a new connction id and clear the dead queue, and send a hello message.
The client, on receiving a new connection id will automatically
set its sequence number to 0, and make the sync API calls to manage
any lost data.
https://mattermost.atlassian.net/browse/MM-32590
```release-note
NONE
```
* gofmt
* Add EnableReliableWebSockets to the client config
* Refactoring isInDeadQueue
* Passing index to drainDeadQueue
* refactoring webconn
* fix pointer
* review comments
* simplify hasMsgLoss
* safety comment
* fix test
* Trigger CI
* Trigger CI
Co-authored-by: Devin Binnie <devin.binnie@mattermost.com>
Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
2021-04-26 10:21:25 -04:00
|
|
|
assert.NotNil(t, connIndex.RemoveInactiveByConnectionID(wc2.UserId, "conn3"))
|
2024-04-30 09:58:55 -04:00
|
|
|
assert.Equal(t, connIndex.ForUserActiveCount(wc2.UserId), 1)
|
MM-32950: Reliable WebSockets: Basic single server (#17406)
* MM-32950: Reliable WebSockets: Basic single server
This PR adds reliable websocket support for a single server.
Below is a brief overview of the three states of a connection:
Normal:
- All messages are routed via web hub.
- Each web conn has a send queue to which it gets pushed.
- A message gets pulled from the queue, and before it
gets written to the wire, it is added to the dead queue.
Disconnect:
- Hub Unregister gets called, where the connection is just
marked as inactive. And new messages keep getting pushed
to the send queue.
If it gets full, the channel is closed and the conn gets removed
from conn index.
Reconnect:
- We query the hub for the connection ID, and get back the
queues.
- We construct a WebConn reusing the old queues, or a fresh one
depending on whether the connection ID was found or not.
- Now there is a tricky bit here which needs to be carefully processed.
On register, we would always send the hello message in the send queue.
But we cannot do that now because the send queue might already have messages.
Therefore, we don't send the hello message from web hub, if we reuse a connection.
Instead, we move that logic to the web conn write pump. We check if
the sequence number is in dead queue, and if it is, then we drain
the dead queue, and start consuming from the active queue.
No hello message is sent here.
But if the message does not exist in the dead queue, and the sequence number
is actually something that should have existed, then we set
a new connction id and clear the dead queue, and send a hello message.
The client, on receiving a new connection id will automatically
set its sequence number to 0, and make the sync API calls to manage
any lost data.
https://mattermost.atlassian.net/browse/MM-32590
```release-note
NONE
```
* gofmt
* Add EnableReliableWebSockets to the client config
* Refactoring isInDeadQueue
* Passing index to drainDeadQueue
* refactoring webconn
* fix pointer
* review comments
* simplify hasMsgLoss
* safety comment
* fix test
* Trigger CI
* Trigger CI
Co-authored-by: Devin Binnie <devin.binnie@mattermost.com>
Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
2021-04-26 10:21:25 -04:00
|
|
|
assert.Nil(t, connIndex.RemoveInactiveByConnectionID(wc1.UserId, "conn3"))
|
|
|
|
|
assert.False(t, connIndex.Has(wc3))
|
2025-05-11 02:30:12 -04:00
|
|
|
assert.Len(t, slices.Collect(connIndex.ForUser(wc2.UserId)), 1)
|
MM-32950: Reliable WebSockets: Basic single server (#17406)
* MM-32950: Reliable WebSockets: Basic single server
This PR adds reliable websocket support for a single server.
Below is a brief overview of the three states of a connection:
Normal:
- All messages are routed via web hub.
- Each web conn has a send queue to which it gets pushed.
- A message gets pulled from the queue, and before it
gets written to the wire, it is added to the dead queue.
Disconnect:
- Hub Unregister gets called, where the connection is just
marked as inactive. And new messages keep getting pushed
to the send queue.
If it gets full, the channel is closed and the conn gets removed
from conn index.
Reconnect:
- We query the hub for the connection ID, and get back the
queues.
- We construct a WebConn reusing the old queues, or a fresh one
depending on whether the connection ID was found or not.
- Now there is a tricky bit here which needs to be carefully processed.
On register, we would always send the hello message in the send queue.
But we cannot do that now because the send queue might already have messages.
Therefore, we don't send the hello message from web hub, if we reuse a connection.
Instead, we move that logic to the web conn write pump. We check if
the sequence number is in dead queue, and if it is, then we drain
the dead queue, and start consuming from the active queue.
No hello message is sent here.
But if the message does not exist in the dead queue, and the sequence number
is actually something that should have existed, then we set
a new connction id and clear the dead queue, and send a hello message.
The client, on receiving a new connection id will automatically
set its sequence number to 0, and make the sync API calls to manage
any lost data.
https://mattermost.atlassian.net/browse/MM-32590
```release-note
NONE
```
* gofmt
* Add EnableReliableWebSockets to the client config
* Refactoring isInDeadQueue
* Passing index to drainDeadQueue
* refactoring webconn
* fix pointer
* review comments
* simplify hasMsgLoss
* safety comment
* fix test
* Trigger CI
* Trigger CI
Co-authored-by: Devin Binnie <devin.binnie@mattermost.com>
Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
2021-04-26 10:21:25 -04:00
|
|
|
|
|
|
|
|
wc3.lastUserActivityAt = model.GetMillis()
|
2025-04-29 04:23:53 -04:00
|
|
|
err = connIndex.Add(wc3)
|
|
|
|
|
require.NoError(t, err)
|
MM-32950: Reliable WebSockets: Basic single server (#17406)
* MM-32950: Reliable WebSockets: Basic single server
This PR adds reliable websocket support for a single server.
Below is a brief overview of the three states of a connection:
Normal:
- All messages are routed via web hub.
- Each web conn has a send queue to which it gets pushed.
- A message gets pulled from the queue, and before it
gets written to the wire, it is added to the dead queue.
Disconnect:
- Hub Unregister gets called, where the connection is just
marked as inactive. And new messages keep getting pushed
to the send queue.
If it gets full, the channel is closed and the conn gets removed
from conn index.
Reconnect:
- We query the hub for the connection ID, and get back the
queues.
- We construct a WebConn reusing the old queues, or a fresh one
depending on whether the connection ID was found or not.
- Now there is a tricky bit here which needs to be carefully processed.
On register, we would always send the hello message in the send queue.
But we cannot do that now because the send queue might already have messages.
Therefore, we don't send the hello message from web hub, if we reuse a connection.
Instead, we move that logic to the web conn write pump. We check if
the sequence number is in dead queue, and if it is, then we drain
the dead queue, and start consuming from the active queue.
No hello message is sent here.
But if the message does not exist in the dead queue, and the sequence number
is actually something that should have existed, then we set
a new connction id and clear the dead queue, and send a hello message.
The client, on receiving a new connection id will automatically
set its sequence number to 0, and make the sync API calls to manage
any lost data.
https://mattermost.atlassian.net/browse/MM-32590
```release-note
NONE
```
* gofmt
* Add EnableReliableWebSockets to the client config
* Refactoring isInDeadQueue
* Passing index to drainDeadQueue
* refactoring webconn
* fix pointer
* review comments
* simplify hasMsgLoss
* safety comment
* fix test
* Trigger CI
* Trigger CI
Co-authored-by: Devin Binnie <devin.binnie@mattermost.com>
Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
2021-04-26 10:21:25 -04:00
|
|
|
connIndex.RemoveInactiveConnections()
|
|
|
|
|
assert.True(t, connIndex.Has(wc3))
|
2025-05-11 02:30:12 -04:00
|
|
|
assert.Len(t, slices.Collect(connIndex.ForUser(wc2.UserId)), 2)
|
2024-04-30 09:58:55 -04:00
|
|
|
assert.Equal(t, connIndex.ForUserActiveCount(wc2.UserId), 1)
|
MM-32950: Reliable WebSockets: Basic single server (#17406)
* MM-32950: Reliable WebSockets: Basic single server
This PR adds reliable websocket support for a single server.
Below is a brief overview of the three states of a connection:
Normal:
- All messages are routed via web hub.
- Each web conn has a send queue to which it gets pushed.
- A message gets pulled from the queue, and before it
gets written to the wire, it is added to the dead queue.
Disconnect:
- Hub Unregister gets called, where the connection is just
marked as inactive. And new messages keep getting pushed
to the send queue.
If it gets full, the channel is closed and the conn gets removed
from conn index.
Reconnect:
- We query the hub for the connection ID, and get back the
queues.
- We construct a WebConn reusing the old queues, or a fresh one
depending on whether the connection ID was found or not.
- Now there is a tricky bit here which needs to be carefully processed.
On register, we would always send the hello message in the send queue.
But we cannot do that now because the send queue might already have messages.
Therefore, we don't send the hello message from web hub, if we reuse a connection.
Instead, we move that logic to the web conn write pump. We check if
the sequence number is in dead queue, and if it is, then we drain
the dead queue, and start consuming from the active queue.
No hello message is sent here.
But if the message does not exist in the dead queue, and the sequence number
is actually something that should have existed, then we set
a new connction id and clear the dead queue, and send a hello message.
The client, on receiving a new connection id will automatically
set its sequence number to 0, and make the sync API calls to manage
any lost data.
https://mattermost.atlassian.net/browse/MM-32590
```release-note
NONE
```
* gofmt
* Add EnableReliableWebSockets to the client config
* Refactoring isInDeadQueue
* Passing index to drainDeadQueue
* refactoring webconn
* fix pointer
* review comments
* simplify hasMsgLoss
* safety comment
* fix test
* Trigger CI
* Trigger CI
Co-authored-by: Devin Binnie <devin.binnie@mattermost.com>
Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
2021-04-26 10:21:25 -04:00
|
|
|
assert.Len(t, connIndex.All(), 3)
|
|
|
|
|
|
|
|
|
|
wc3.lastUserActivityAt = model.GetMillis() - (time.Minute).Milliseconds()
|
|
|
|
|
connIndex.RemoveInactiveConnections()
|
|
|
|
|
assert.False(t, connIndex.Has(wc3))
|
2025-05-11 02:30:12 -04:00
|
|
|
assert.Len(t, slices.Collect(connIndex.ForUser(wc2.UserId)), 1)
|
2024-04-30 09:58:55 -04:00
|
|
|
assert.Equal(t, connIndex.ForUserActiveCount(wc2.UserId), 1)
|
MM-32950: Reliable WebSockets: Basic single server (#17406)
* MM-32950: Reliable WebSockets: Basic single server
This PR adds reliable websocket support for a single server.
Below is a brief overview of the three states of a connection:
Normal:
- All messages are routed via web hub.
- Each web conn has a send queue to which it gets pushed.
- A message gets pulled from the queue, and before it
gets written to the wire, it is added to the dead queue.
Disconnect:
- Hub Unregister gets called, where the connection is just
marked as inactive. And new messages keep getting pushed
to the send queue.
If it gets full, the channel is closed and the conn gets removed
from conn index.
Reconnect:
- We query the hub for the connection ID, and get back the
queues.
- We construct a WebConn reusing the old queues, or a fresh one
depending on whether the connection ID was found or not.
- Now there is a tricky bit here which needs to be carefully processed.
On register, we would always send the hello message in the send queue.
But we cannot do that now because the send queue might already have messages.
Therefore, we don't send the hello message from web hub, if we reuse a connection.
Instead, we move that logic to the web conn write pump. We check if
the sequence number is in dead queue, and if it is, then we drain
the dead queue, and start consuming from the active queue.
No hello message is sent here.
But if the message does not exist in the dead queue, and the sequence number
is actually something that should have existed, then we set
a new connction id and clear the dead queue, and send a hello message.
The client, on receiving a new connection id will automatically
set its sequence number to 0, and make the sync API calls to manage
any lost data.
https://mattermost.atlassian.net/browse/MM-32590
```release-note
NONE
```
* gofmt
* Add EnableReliableWebSockets to the client config
* Refactoring isInDeadQueue
* Passing index to drainDeadQueue
* refactoring webconn
* fix pointer
* review comments
* simplify hasMsgLoss
* safety comment
* fix test
* Trigger CI
* Trigger CI
Co-authored-by: Devin Binnie <devin.binnie@mattermost.com>
Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
2021-04-26 10:21:25 -04:00
|
|
|
assert.Len(t, connIndex.All(), 2)
|
|
|
|
|
}
|
|
|
|
|
|
2022-02-22 07:43:38 -05:00
|
|
|
func TestReliableWebSocketSend(t *testing.T) {
|
2025-05-30 07:58:26 -04:00
|
|
|
mainHelper.Parallel(t)
|
2022-02-22 07:43:38 -05:00
|
|
|
testCluster := &testlib.FakeClusterInterface{}
|
|
|
|
|
|
2022-10-06 04:04:21 -04:00
|
|
|
th := SetupWithCluster(t, testCluster)
|
2022-02-23 10:01:23 -05:00
|
|
|
|
2022-09-02 06:17:22 -04:00
|
|
|
ev := model.NewWebSocketEvent("test_unreliable_event", "", "", "", nil, "")
|
2022-02-22 07:43:38 -05:00
|
|
|
ev = ev.SetBroadcast(&model.WebsocketBroadcast{})
|
2022-10-06 04:04:21 -04:00
|
|
|
th.Service.Publish(ev)
|
2022-09-02 06:17:22 -04:00
|
|
|
ev2 := model.NewWebSocketEvent("test_reliable_event", "", "", "", nil, "")
|
2022-10-06 04:04:21 -04:00
|
|
|
|
2022-02-23 10:01:23 -05:00
|
|
|
ev2 = ev2.SetBroadcast(&model.WebsocketBroadcast{
|
2022-02-22 07:43:38 -05:00
|
|
|
ReliableClusterSend: true,
|
|
|
|
|
})
|
2022-10-06 04:04:21 -04:00
|
|
|
th.Service.Publish(ev2)
|
2022-02-22 07:43:38 -05:00
|
|
|
|
|
|
|
|
messages := testCluster.GetMessages()
|
2022-02-23 10:01:23 -05:00
|
|
|
|
|
|
|
|
evJSON, err := ev.ToJSON()
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
ev2JSON, err := ev2.ToJSON()
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
require.Contains(t, messages, &model.ClusterMessage{
|
|
|
|
|
Event: model.ClusterEventPublish,
|
|
|
|
|
Data: evJSON,
|
|
|
|
|
SendType: model.ClusterSendBestEffort,
|
|
|
|
|
})
|
|
|
|
|
require.Contains(t, messages, &model.ClusterMessage{
|
|
|
|
|
Event: model.ClusterEventPublish,
|
|
|
|
|
Data: ev2JSON,
|
|
|
|
|
SendType: model.ClusterSendReliable,
|
|
|
|
|
})
|
2022-02-22 07:43:38 -05:00
|
|
|
}
|
|
|
|
|
|
2020-07-07 01:53:45 -04:00
|
|
|
func TestHubIsRegistered(t *testing.T) {
|
2025-05-30 07:58:26 -04:00
|
|
|
mainHelper.Parallel(t)
|
2025-11-12 07:00:51 -05:00
|
|
|
th := Setup(t).InitBasic(t)
|
2020-07-07 01:53:45 -04:00
|
|
|
|
2023-10-11 07:08:55 -04:00
|
|
|
session, err := th.Service.CreateSession(th.Context, &model.Session{
|
2022-10-06 04:04:21 -04:00
|
|
|
UserId: th.BasicUser.Id,
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
mockSuite := &platform_mocks.SuiteIFace{}
|
|
|
|
|
mockSuite.On("GetSession", session.Token).Return(session, nil)
|
2025-09-12 02:35:06 -04:00
|
|
|
mockSuite.On("MFARequired", mock.Anything).Return(nil)
|
2022-10-06 04:04:21 -04:00
|
|
|
th.Suite = mockSuite
|
|
|
|
|
|
2020-07-07 01:53:45 -04:00
|
|
|
s := httptest.NewServer(dummyWebsocketHandler(t))
|
|
|
|
|
defer s.Close()
|
|
|
|
|
|
2022-10-06 04:04:21 -04:00
|
|
|
wc1 := registerDummyWebConn(t, th, s.Listener.Addr(), session)
|
|
|
|
|
wc2 := registerDummyWebConn(t, th, s.Listener.Addr(), session)
|
|
|
|
|
wc3 := registerDummyWebConn(t, th, s.Listener.Addr(), session)
|
2020-07-07 01:53:45 -04:00
|
|
|
defer wc1.Close()
|
|
|
|
|
defer wc2.Close()
|
|
|
|
|
defer wc3.Close()
|
|
|
|
|
|
2023-05-31 06:56:01 -04:00
|
|
|
assert.True(t, th.Service.SessionIsRegistered(*wc1.session.Load()))
|
|
|
|
|
assert.True(t, th.Service.SessionIsRegistered(*wc2.session.Load()))
|
|
|
|
|
assert.True(t, th.Service.SessionIsRegistered(*wc3.session.Load()))
|
2020-07-07 01:53:45 -04:00
|
|
|
|
2023-10-11 07:08:55 -04:00
|
|
|
session4, err := th.Service.CreateSession(th.Context, &model.Session{
|
2020-07-07 01:53:45 -04:00
|
|
|
UserId: th.BasicUser2.Id,
|
|
|
|
|
})
|
2022-10-06 04:04:21 -04:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
assert.False(t, th.Service.SessionIsRegistered(*session4))
|
2020-07-07 01:53:45 -04:00
|
|
|
}
|
|
|
|
|
|
2024-04-30 09:58:55 -04:00
|
|
|
func TestHubWebConnCount(t *testing.T) {
|
2025-05-30 07:58:26 -04:00
|
|
|
mainHelper.Parallel(t)
|
2025-11-12 07:00:51 -05:00
|
|
|
th := Setup(t).InitBasic(t)
|
2024-04-30 09:58:55 -04:00
|
|
|
|
|
|
|
|
session, err := th.Service.CreateSession(th.Context, &model.Session{
|
|
|
|
|
UserId: th.BasicUser.Id,
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
mockSuite := &platform_mocks.SuiteIFace{}
|
|
|
|
|
mockSuite.On("GetSession", session.Token).Return(session, nil)
|
2025-09-12 02:35:06 -04:00
|
|
|
mockSuite.On("MFARequired", mock.Anything).Return(nil)
|
2024-04-30 09:58:55 -04:00
|
|
|
th.Suite = mockSuite
|
|
|
|
|
|
|
|
|
|
s := httptest.NewServer(dummyWebsocketHandler(t))
|
|
|
|
|
defer s.Close()
|
|
|
|
|
|
|
|
|
|
wc1 := registerDummyWebConn(t, th, s.Listener.Addr(), session)
|
|
|
|
|
wc2 := registerDummyWebConn(t, th, s.Listener.Addr(), session)
|
|
|
|
|
defer wc1.Close()
|
|
|
|
|
|
|
|
|
|
assert.Equal(t, 2, th.Service.WebConnCountForUser(th.BasicUser.Id))
|
|
|
|
|
|
|
|
|
|
wc2.Close()
|
|
|
|
|
|
|
|
|
|
assert.Equal(t, 1, th.Service.WebConnCountForUser(th.BasicUser.Id))
|
|
|
|
|
assert.Equal(t, 0, th.Service.WebConnCountForUser("none"))
|
|
|
|
|
}
|
|
|
|
|
|
2025-05-11 02:30:12 -04:00
|
|
|
var globalIter iter.Seq[*WebConn]
|
|
|
|
|
|
|
|
|
|
func BenchmarkHubConnIndexIteratorForUser(b *testing.B) {
|
|
|
|
|
th := Setup(b)
|
|
|
|
|
|
|
|
|
|
connIndex := newHubConnectionIndex(2*time.Second, th.Service.Store, th.Service.logger, false)
|
|
|
|
|
|
|
|
|
|
// User1
|
|
|
|
|
wc1 := &WebConn{
|
|
|
|
|
Platform: th.Service,
|
|
|
|
|
UserId: model.NewId(),
|
|
|
|
|
}
|
|
|
|
|
wc1.Active.Store(true)
|
|
|
|
|
wc1.SetConnectionID("conn1")
|
|
|
|
|
wc1.SetSession(&model.Session{})
|
|
|
|
|
|
|
|
|
|
// User2
|
|
|
|
|
wc2 := &WebConn{
|
|
|
|
|
Platform: th.Service,
|
|
|
|
|
UserId: model.NewId(),
|
|
|
|
|
}
|
|
|
|
|
wc2.Active.Store(true)
|
|
|
|
|
wc2.SetConnectionID("conn2")
|
|
|
|
|
wc2.SetSession(&model.Session{})
|
|
|
|
|
|
|
|
|
|
wc3 := &WebConn{
|
|
|
|
|
Platform: th.Service,
|
|
|
|
|
UserId: wc2.UserId,
|
|
|
|
|
}
|
|
|
|
|
wc3.Active.Store(false)
|
|
|
|
|
wc3.SetConnectionID("conn3")
|
|
|
|
|
wc3.SetSession(&model.Session{})
|
|
|
|
|
|
|
|
|
|
require.NoError(b, connIndex.Add(wc1))
|
|
|
|
|
require.NoError(b, connIndex.Add(wc2))
|
|
|
|
|
require.NoError(b, connIndex.Add(wc3))
|
|
|
|
|
|
|
|
|
|
b.Run("2 users", func(b *testing.B) {
|
2025-07-18 06:54:51 -04:00
|
|
|
for b.Loop() {
|
2025-05-11 02:30:12 -04:00
|
|
|
globalIter = connIndex.ForUser(wc2.UserId)
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
wc4 := &WebConn{
|
|
|
|
|
Platform: th.Service,
|
|
|
|
|
UserId: wc2.UserId,
|
|
|
|
|
}
|
|
|
|
|
wc4.Active.Store(false)
|
|
|
|
|
wc4.SetConnectionID("conn4")
|
|
|
|
|
wc4.SetSession(&model.Session{})
|
|
|
|
|
|
|
|
|
|
require.NoError(b, connIndex.Add(wc4))
|
|
|
|
|
b.Run("3 users", func(b *testing.B) {
|
2025-07-18 06:54:51 -04:00
|
|
|
for b.Loop() {
|
2025-05-11 02:30:12 -04:00
|
|
|
globalIter = connIndex.ForUser(wc2.UserId)
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
wc5 := &WebConn{
|
|
|
|
|
Platform: th.Service,
|
|
|
|
|
UserId: wc2.UserId,
|
|
|
|
|
}
|
|
|
|
|
wc5.Active.Store(false)
|
|
|
|
|
wc5.SetConnectionID("conn5")
|
|
|
|
|
wc5.SetSession(&model.Session{})
|
|
|
|
|
|
|
|
|
|
require.NoError(b, connIndex.Add(wc5))
|
|
|
|
|
b.Run("4 users", func(b *testing.B) {
|
2025-07-18 06:54:51 -04:00
|
|
|
for b.Loop() {
|
2025-05-11 02:30:12 -04:00
|
|
|
globalIter = connIndex.ForUser(wc2.UserId)
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func BenchmarkHubConnIndexIteratorForChannel(b *testing.B) {
|
2025-11-12 07:00:51 -05:00
|
|
|
th := Setup(b).InitBasic(b)
|
2025-05-11 02:30:12 -04:00
|
|
|
|
|
|
|
|
_, err := th.Service.Store.Channel().SaveMember(th.Context, &model.ChannelMember{
|
|
|
|
|
ChannelId: th.BasicChannel.Id,
|
|
|
|
|
UserId: th.BasicUser.Id,
|
|
|
|
|
NotifyProps: model.GetDefaultChannelNotifyProps(),
|
|
|
|
|
SchemeGuest: th.BasicUser.IsGuest(),
|
|
|
|
|
SchemeUser: !th.BasicUser.IsGuest(),
|
|
|
|
|
})
|
|
|
|
|
require.NoError(b, err)
|
|
|
|
|
_, err = th.Service.Store.Channel().SaveMember(th.Context, &model.ChannelMember{
|
|
|
|
|
ChannelId: th.BasicChannel.Id,
|
|
|
|
|
UserId: th.BasicUser2.Id,
|
|
|
|
|
NotifyProps: model.GetDefaultChannelNotifyProps(),
|
|
|
|
|
SchemeGuest: th.BasicUser2.IsGuest(),
|
|
|
|
|
SchemeUser: !th.BasicUser2.IsGuest(),
|
|
|
|
|
})
|
|
|
|
|
require.NoError(b, err)
|
|
|
|
|
|
|
|
|
|
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, true)
|
|
|
|
|
|
|
|
|
|
// User1
|
|
|
|
|
wc1ID := model.NewId()
|
|
|
|
|
wc1 := &WebConn{
|
|
|
|
|
Platform: th.Service,
|
|
|
|
|
Suite: th.Suite,
|
|
|
|
|
UserId: th.BasicUser.Id,
|
|
|
|
|
}
|
|
|
|
|
wc1.SetConnectionID(wc1ID)
|
|
|
|
|
wc1.SetSession(&model.Session{})
|
|
|
|
|
|
|
|
|
|
// User2
|
|
|
|
|
wc2ID := model.NewId()
|
|
|
|
|
wc2 := &WebConn{
|
|
|
|
|
Platform: th.Service,
|
|
|
|
|
Suite: th.Suite,
|
|
|
|
|
UserId: th.BasicUser2.Id,
|
|
|
|
|
}
|
|
|
|
|
wc2.SetConnectionID(wc2ID)
|
|
|
|
|
wc2.SetSession(&model.Session{})
|
|
|
|
|
|
|
|
|
|
wc3ID := model.NewId()
|
|
|
|
|
wc3 := &WebConn{
|
|
|
|
|
Platform: th.Service,
|
|
|
|
|
Suite: th.Suite,
|
|
|
|
|
UserId: wc2.UserId,
|
|
|
|
|
}
|
|
|
|
|
wc3.SetConnectionID(wc3ID)
|
|
|
|
|
wc3.SetSession(&model.Session{})
|
|
|
|
|
|
|
|
|
|
require.NoError(b, connIndex.Add(wc1))
|
|
|
|
|
require.NoError(b, connIndex.Add(wc2))
|
|
|
|
|
require.NoError(b, connIndex.Add(wc3))
|
|
|
|
|
|
2025-07-18 06:54:51 -04:00
|
|
|
for b.Loop() {
|
2025-05-11 02:30:12 -04:00
|
|
|
globalIter = connIndex.ForChannel(th.BasicChannel.Id)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2020-05-22 01:09:21 -04:00
|
|
|
// Always run this with -benchtime=0.1s
|
|
|
|
|
// See: https://github.com/golang/go/issues/27217.
|
|
|
|
|
func BenchmarkHubConnIndex(b *testing.B) {
|
2025-11-12 07:00:51 -05:00
|
|
|
th := Setup(b).InitBasic(b)
|
2025-02-13 22:49:36 -05:00
|
|
|
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, false)
|
2020-05-22 01:09:21 -04:00
|
|
|
|
|
|
|
|
// User1
|
|
|
|
|
wc1 := &WebConn{
|
2022-10-06 04:04:21 -04:00
|
|
|
Platform: th.Service,
|
|
|
|
|
Suite: th.Suite,
|
|
|
|
|
UserId: model.NewId(),
|
2020-05-22 01:09:21 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// User2
|
|
|
|
|
wc2 := &WebConn{
|
2022-10-06 04:04:21 -04:00
|
|
|
Platform: th.Service,
|
|
|
|
|
Suite: th.Suite,
|
|
|
|
|
UserId: model.NewId(),
|
2020-05-22 01:09:21 -04:00
|
|
|
}
|
|
|
|
|
b.Run("Add", func(b *testing.B) {
|
2025-07-18 06:54:51 -04:00
|
|
|
for b.Loop() {
|
2025-04-29 04:23:53 -04:00
|
|
|
err := connIndex.Add(wc1)
|
|
|
|
|
require.NoError(b, err)
|
|
|
|
|
err = connIndex.Add(wc2)
|
|
|
|
|
require.NoError(b, err)
|
2020-05-22 01:09:21 -04:00
|
|
|
|
2025-07-18 06:54:51 -04:00
|
|
|
// Cleanup
|
2020-05-22 01:09:21 -04:00
|
|
|
b.StopTimer()
|
|
|
|
|
connIndex.Remove(wc1)
|
|
|
|
|
connIndex.Remove(wc2)
|
|
|
|
|
b.StartTimer()
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
b.Run("Remove", func(b *testing.B) {
|
2025-07-18 06:54:51 -04:00
|
|
|
for b.Loop() {
|
|
|
|
|
// Setup
|
2020-05-22 01:09:21 -04:00
|
|
|
b.StopTimer()
|
2025-04-29 04:23:53 -04:00
|
|
|
err := connIndex.Add(wc1)
|
|
|
|
|
require.NoError(b, err)
|
|
|
|
|
err = connIndex.Add(wc2)
|
|
|
|
|
require.NoError(b, err)
|
2020-05-22 01:09:21 -04:00
|
|
|
|
2025-07-18 06:54:51 -04:00
|
|
|
b.StartTimer()
|
2020-05-22 01:09:21 -04:00
|
|
|
connIndex.Remove(wc1)
|
|
|
|
|
connIndex.Remove(wc2)
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
2023-03-24 01:52:37 -04:00
|
|
|
func TestHubConnIndexRemoveMemLeak(t *testing.T) {
|
2025-05-30 07:58:26 -04:00
|
|
|
mainHelper.Parallel(t)
|
2023-03-24 01:52:37 -04:00
|
|
|
th := Setup(t)
|
|
|
|
|
|
2025-02-13 22:49:36 -05:00
|
|
|
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, false)
|
2023-03-24 01:52:37 -04:00
|
|
|
|
|
|
|
|
wc := &WebConn{
|
|
|
|
|
Platform: th.Service,
|
|
|
|
|
Suite: th.Suite,
|
|
|
|
|
}
|
|
|
|
|
wc.SetConnectionID(model.NewId())
|
|
|
|
|
wc.SetSession(&model.Session{})
|
|
|
|
|
|
|
|
|
|
ch := make(chan struct{})
|
|
|
|
|
|
|
|
|
|
runtime.SetFinalizer(wc, func(*WebConn) {
|
|
|
|
|
close(ch)
|
|
|
|
|
})
|
|
|
|
|
|
2025-04-29 04:23:53 -04:00
|
|
|
err := connIndex.Add(wc)
|
|
|
|
|
require.NoError(t, err)
|
2023-03-24 01:52:37 -04:00
|
|
|
connIndex.Remove(wc)
|
|
|
|
|
|
|
|
|
|
runtime.GC()
|
|
|
|
|
|
|
|
|
|
timer := time.NewTimer(3 * time.Second)
|
|
|
|
|
defer timer.Stop()
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case <-ch:
|
|
|
|
|
case <-timer.C:
|
|
|
|
|
require.Fail(t, "timeout waiting for collection of wc")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
assert.Len(t, connIndex.byConnection, 0)
|
|
|
|
|
}
|
|
|
|
|
|
2020-07-07 01:53:45 -04:00
|
|
|
var hubSink *Hub
|
2020-05-19 10:25:52 -04:00
|
|
|
|
2020-07-07 01:53:45 -04:00
|
|
|
func BenchmarkGetHubForUserId(b *testing.B) {
|
2025-11-12 07:00:51 -05:00
|
|
|
th := Setup(b).InitBasic(b)
|
2020-05-19 10:25:52 -04:00
|
|
|
|
2025-04-29 04:23:53 -04:00
|
|
|
err := th.Service.Start(nil)
|
|
|
|
|
require.NoError(b, err)
|
2020-05-19 10:25:52 -04:00
|
|
|
|
2025-07-18 06:54:51 -04:00
|
|
|
for b.Loop() {
|
2022-10-06 04:04:21 -04:00
|
|
|
hubSink = th.Service.GetHubForUserId(th.BasicUser.Id)
|
2020-07-07 01:53:45 -04:00
|
|
|
}
|
2020-05-19 10:25:52 -04:00
|
|
|
}
|
2023-01-30 11:10:39 -05:00
|
|
|
|
|
|
|
|
func TestClusterBroadcast(t *testing.T) {
|
2025-05-30 07:58:26 -04:00
|
|
|
mainHelper.Parallel(t)
|
2023-01-30 11:10:39 -05:00
|
|
|
testCluster := &testlib.FakeClusterInterface{}
|
|
|
|
|
|
|
|
|
|
th := SetupWithCluster(t, testCluster)
|
|
|
|
|
|
|
|
|
|
ev := model.NewWebSocketEvent("test_event", "", "", "", nil, "")
|
|
|
|
|
broadcast := &model.WebsocketBroadcast{
|
|
|
|
|
ContainsSanitizedData: true,
|
|
|
|
|
ContainsSensitiveData: true,
|
|
|
|
|
}
|
|
|
|
|
ev = ev.SetBroadcast(broadcast)
|
|
|
|
|
th.Service.Publish(ev)
|
|
|
|
|
|
|
|
|
|
messages := testCluster.GetMessages()
|
|
|
|
|
|
|
|
|
|
var clusterEvent struct {
|
|
|
|
|
Event string `json:"event"`
|
|
|
|
|
Data map[string]any `json:"data"`
|
|
|
|
|
Broadcast *model.WebsocketBroadcast `json:"broadcast"`
|
|
|
|
|
Sequence int64 `json:"seq"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err := json.Unmarshal(messages[0].Data, &clusterEvent)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
require.Equal(t, clusterEvent.Broadcast, broadcast)
|
|
|
|
|
}
|
2023-11-08 16:17:07 -05:00
|
|
|
|
|
|
|
|
func TestClusterBroadcastHooks(t *testing.T) {
|
2025-05-30 07:58:26 -04:00
|
|
|
mainHelper.Parallel(t)
|
2023-11-08 16:17:07 -05:00
|
|
|
t.Run("should send broadcast hook information across cluster", func(t *testing.T) {
|
|
|
|
|
testCluster := &testlib.FakeClusterInterface{}
|
|
|
|
|
|
|
|
|
|
th := SetupWithCluster(t, testCluster)
|
|
|
|
|
|
|
|
|
|
hookID := broadcastTest
|
|
|
|
|
hookArgs := map[string]any{
|
|
|
|
|
"makes_changes": true,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
event := model.NewWebSocketEvent(model.WebsocketEventPosted, "", "", "", nil, "")
|
|
|
|
|
event.GetBroadcast().AddHook(hookID, hookArgs)
|
|
|
|
|
|
|
|
|
|
th.Service.Publish(event)
|
|
|
|
|
|
|
|
|
|
received, err := model.WebSocketEventFromJSON(bytes.NewReader(testCluster.GetMessages()[0].Data))
|
|
|
|
|
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
assert.Equal(t, []string{hookID}, received.GetBroadcast().BroadcastHooks)
|
|
|
|
|
assert.Equal(t, []map[string]any{hookArgs}, received.GetBroadcast().BroadcastHookArgs)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
t.Run("should not preserve type information for args", func(t *testing.T) {
|
|
|
|
|
// This behaviour isn't ideal, but this test confirms that it hasn't changed
|
|
|
|
|
testCluster := &testlib.FakeClusterInterface{}
|
|
|
|
|
|
|
|
|
|
th := SetupWithCluster(t, testCluster)
|
|
|
|
|
|
|
|
|
|
hookID := "test_broadcast_hook_with_args"
|
|
|
|
|
hookArgs := map[string]any{
|
|
|
|
|
"user": &model.User{Id: "user1"},
|
|
|
|
|
"array": []string{"a", "b", "c"},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
event := model.NewWebSocketEvent(model.WebsocketEventPosted, "", "", "", nil, "")
|
|
|
|
|
event.GetBroadcast().AddHook(hookID, hookArgs)
|
|
|
|
|
|
|
|
|
|
th.Service.Publish(event)
|
|
|
|
|
|
|
|
|
|
received, err := model.WebSocketEventFromJSON(bytes.NewReader(testCluster.GetMessages()[0].Data))
|
|
|
|
|
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
assert.Equal(t, []string{hookID}, received.GetBroadcast().BroadcastHooks)
|
|
|
|
|
assert.IsType(t, map[string]any{}, received.GetBroadcast().BroadcastHookArgs[0]["user"])
|
|
|
|
|
assert.IsType(t, []any{}, received.GetBroadcast().BroadcastHookArgs[0]["array"])
|
|
|
|
|
})
|
|
|
|
|
}
|