mirror of
https://github.com/mattermost/mattermost.git
synced 2026-05-28 04:35:04 -04:00
MM - 60506 - notify user failed scheduled msgs (#29208)
* MM-60506 - notify user for failed scheduled messages * add unit tests for handleFailedScheduledMessages and send system-bot message * fix vet issues * adjust test for vet report * make sure to send the message to every user there was a failed message --------- Co-authored-by: Harshil Sharma <harshil.sharma@mattermost.com>
This commit is contained in:
parent
5eef415a39
commit
4e0fc5734c
4 changed files with 248 additions and 0 deletions
|
|
@ -11,6 +11,7 @@ import (
|
|||
"github.com/mattermost/mattermost/server/v8/platform/services/telemetry"
|
||||
|
||||
"github.com/mattermost/mattermost/server/public/model"
|
||||
"github.com/mattermost/mattermost/server/public/shared/i18n"
|
||||
"github.com/mattermost/mattermost/server/public/shared/mlog"
|
||||
"github.com/mattermost/mattermost/server/public/shared/request"
|
||||
"github.com/pkg/errors"
|
||||
|
|
@ -374,5 +375,61 @@ func (a *App) handleFailedScheduledPosts(rctx request.CTX, failedScheduledPosts
|
|||
"scheduled_posts_failed",
|
||||
map[string]any{"count": len(failedScheduledPosts)},
|
||||
)
|
||||
a.notifyUserAboutFailedScheduledMessages(rctx, failedScheduledPosts)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *App) notifyUserAboutFailedScheduledMessages(rctx request.CTX, failedMessages []*model.ScheduledPost) {
|
||||
failedMessagesByUser := aggregateFailMessagesByUser(failedMessages)
|
||||
systemBot, err := a.GetSystemBot(rctx)
|
||||
if err != nil {
|
||||
rctx.Logger().Error("Failed to get the system bot", mlog.Err(err))
|
||||
return
|
||||
}
|
||||
|
||||
for userId, userFailedMessages := range failedMessagesByUser {
|
||||
a.Srv().Go(func(userId string, userFailedMessages []*model.ScheduledPost) func() {
|
||||
return func() {
|
||||
a.notifyUser(rctx, userId, userFailedMessages, systemBot)
|
||||
}
|
||||
}(userId, userFailedMessages))
|
||||
}
|
||||
}
|
||||
|
||||
func aggregateFailMessagesByUser(failedMessages []*model.ScheduledPost) map[string][]*model.ScheduledPost {
|
||||
aggregated := make(map[string][]*model.ScheduledPost)
|
||||
for _, msg := range failedMessages {
|
||||
aggregated[msg.UserId] = append(aggregated[msg.UserId], msg)
|
||||
}
|
||||
return aggregated
|
||||
}
|
||||
|
||||
func (a *App) notifyUser(rctx request.CTX, userId string, userFailedMessages []*model.ScheduledPost, systemBot *model.Bot) {
|
||||
channel, err := a.GetOrCreateDirectChannel(rctx, userId, systemBot.UserId)
|
||||
if err != nil {
|
||||
rctx.Logger().Error("Failed to get or create the DM", mlog.Err(err))
|
||||
return
|
||||
}
|
||||
|
||||
user, err := a.GetUser(userId)
|
||||
if err != nil {
|
||||
rctx.Logger().Error("Failed to get the user", mlog.Err(err))
|
||||
return
|
||||
}
|
||||
|
||||
T := i18n.GetUserTranslations(user.Locale)
|
||||
messageContent := T("app.scheduled_post.failed_messages", map[string]interface{}{
|
||||
"Count": len(userFailedMessages),
|
||||
})
|
||||
|
||||
post := &model.Post{
|
||||
ChannelId: channel.Id,
|
||||
Message: messageContent,
|
||||
Type: model.PostTypeDefault,
|
||||
UserId: systemBot.UserId,
|
||||
}
|
||||
|
||||
if _, err := a.CreatePost(rctx, post, channel, model.CreatePostFlags{SetOnline: true}); err != nil {
|
||||
rctx.Logger().Error("Failed to post notification about failed scheduled messages", mlog.Err(err))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/mattermost/mattermost/server/public/model"
|
||||
"github.com/mattermost/mattermost/server/public/shared/i18n"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
|
|
@ -262,3 +263,132 @@ func TestProcessScheduledPosts(t *testing.T) {
|
|||
assert.Greater(t, scheduledPosts[1].ProcessedAt, int64(0))
|
||||
})
|
||||
}
|
||||
|
||||
func TestHandleFailedScheduledPosts(t *testing.T) {
|
||||
th := Setup(t).InitBasic()
|
||||
defer th.TearDown()
|
||||
|
||||
t.Run("should handle failed scheduled posts correctly and notify users about failure via system-bot", func(t *testing.T) {
|
||||
rctx := th.Context
|
||||
var err error
|
||||
var appErr *model.AppError
|
||||
var systemBot *model.Bot
|
||||
|
||||
systemBot, appErr = th.App.GetSystemBot(rctx)
|
||||
assert.True(t, appErr == nil)
|
||||
assert.NotNil(t, systemBot)
|
||||
|
||||
user1 := th.BasicUser
|
||||
user2 := th.BasicUser2
|
||||
|
||||
// Create failed scheduled posts: 1 for user1 and 2 for user2
|
||||
failedScheduledPosts := []*model.ScheduledPost{
|
||||
{
|
||||
Id: model.NewId(),
|
||||
Draft: model.Draft{
|
||||
CreateAt: model.GetMillis(),
|
||||
UserId: user1.Id,
|
||||
ChannelId: th.BasicChannel.Id,
|
||||
Message: "Failed scheduled post for user 1",
|
||||
},
|
||||
ErrorCode: model.ScheduledPostErrorUnknownError,
|
||||
},
|
||||
{
|
||||
Id: model.NewId(),
|
||||
Draft: model.Draft{
|
||||
CreateAt: model.GetMillis(),
|
||||
UserId: user2.Id,
|
||||
ChannelId: th.BasicChannel.Id,
|
||||
Message: "Failed scheduled post 1 for user 2",
|
||||
},
|
||||
ErrorCode: model.ScheduledPostErrorCodeNoChannelPermission,
|
||||
},
|
||||
{
|
||||
Id: model.NewId(),
|
||||
Draft: model.Draft{
|
||||
CreateAt: model.GetMillis(),
|
||||
UserId: user2.Id,
|
||||
ChannelId: th.BasicChannel.Id,
|
||||
Message: "Failed scheduled post 2 for user 2",
|
||||
},
|
||||
ErrorCode: model.ScheduledPostErrorNoChannelMember,
|
||||
},
|
||||
}
|
||||
|
||||
// Save the failed scheduled posts in the store
|
||||
for _, sp := range failedScheduledPosts {
|
||||
_, err = th.Server.Store().ScheduledPost().CreateScheduledPost(sp)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
// Mock WebSocket channels for both of the two users
|
||||
messagesUser1, closeWSUser1 := connectFakeWebSocket(t, th, user1.Id, "", []model.WebsocketEventType{model.WebsocketScheduledPostUpdated})
|
||||
defer closeWSUser1()
|
||||
|
||||
messagesUser2, closeWSUser2 := connectFakeWebSocket(t, th, user2.Id, "", []model.WebsocketEventType{model.WebsocketScheduledPostUpdated})
|
||||
defer closeWSUser2()
|
||||
|
||||
th.App.handleFailedScheduledPosts(rctx, failedScheduledPosts)
|
||||
|
||||
// Validate that the WebSocket events for both users are sent and received correctly
|
||||
for i := 0; i < len(failedScheduledPosts); i++ {
|
||||
var received *model.WebSocketEvent
|
||||
select {
|
||||
case received = <-messagesUser1:
|
||||
if received.GetBroadcast().UserId == user1.Id {
|
||||
assert.Equal(t, model.WebsocketScheduledPostUpdated, received.EventType())
|
||||
}
|
||||
case received = <-messagesUser2:
|
||||
if received.GetBroadcast().UserId == user2.Id {
|
||||
assert.Equal(t, model.WebsocketScheduledPostUpdated, received.EventType())
|
||||
}
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Errorf("Timeout while waiting for a WebSocket event for scheduled post %d", i+1)
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to check notifications for a specific user
|
||||
checkUserNotification := func(user *model.User, expectedCount int) {
|
||||
// Wait time for notifications to be sent (adding 2 secs because it is run in a separate rountine)
|
||||
var timeout = 2 * time.Second
|
||||
begin := time.Now()
|
||||
channel, appErr := th.App.GetOrCreateDirectChannel(rctx, user.Id, systemBot.UserId)
|
||||
assert.True(t, appErr == nil)
|
||||
|
||||
var posts *model.PostList
|
||||
// wait for the notification to be sent into the channel.
|
||||
// idea is to get the channel and try to find posts, if not, wait 100ms and try again until timout or there is posts lengh
|
||||
for {
|
||||
if time.Since(begin) > timeout {
|
||||
break
|
||||
}
|
||||
posts, appErr = th.App.GetPosts(channel.Id, 0, 10)
|
||||
assert.True(t, appErr == nil)
|
||||
if len(posts.Posts) > 0 {
|
||||
break
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
assert.NotEmpty(t, posts.Posts, "Expected notification for user %s to have been sent", user.Id)
|
||||
|
||||
// Validate the actual content of the notification posted (to include count verification)
|
||||
T := i18n.GetUserTranslations(user.Locale)
|
||||
messageContent := T("app.scheduled_post.failed_messages", map[string]interface{}{
|
||||
"Count": expectedCount,
|
||||
})
|
||||
|
||||
found := false
|
||||
for _, post := range posts.Posts {
|
||||
if post.UserId == systemBot.UserId && post.Message == messageContent {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
assert.True(t, found, "Notification post not found for user %s with expected count %d", user.Id, expectedCount)
|
||||
}
|
||||
|
||||
// Check notifications sent for failed messages for both users
|
||||
checkUserNotification(user1, 1)
|
||||
checkUserNotification(user2, 2)
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
package app
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
|
@ -753,3 +754,56 @@ func TestDeleteScheduledPost(t *testing.T) {
|
|||
require.Nil(t, deletedScheduledPost)
|
||||
})
|
||||
}
|
||||
|
||||
func TestPublishScheduledPostEvent(t *testing.T) {
|
||||
th := Setup(t).InitBasic()
|
||||
defer th.TearDown()
|
||||
|
||||
userID := th.BasicUser.Id
|
||||
|
||||
messages, closeWS := connectFakeWebSocket(t, th, userID, "", []model.WebsocketEventType{model.WebsocketScheduledPostCreated})
|
||||
defer closeWS()
|
||||
|
||||
t.Run("should publish ws event when scheduledPost is valid", func(t *testing.T) {
|
||||
scheduledPost := &model.ScheduledPost{
|
||||
Draft: model.Draft{
|
||||
CreateAt: model.GetMillis(),
|
||||
UserId: userID,
|
||||
ChannelId: th.BasicChannel.Id,
|
||||
Message: "this is a scheduled post",
|
||||
},
|
||||
ScheduledAt: model.GetMillis() + 100000,
|
||||
}
|
||||
|
||||
th.App.PublishScheduledPostEvent(th.Context, model.WebsocketScheduledPostCreated, scheduledPost, "fake_connection_id")
|
||||
|
||||
received := <-messages
|
||||
require.Equal(t, model.WebsocketScheduledPostCreated, received.EventType())
|
||||
require.Equal(t, userID, received.GetBroadcast().UserId)
|
||||
|
||||
scheduledPostJSON, err := json.Marshal(scheduledPost)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, string(scheduledPostJSON), received.GetData()["scheduledPost"])
|
||||
})
|
||||
|
||||
t.Run("should handle nil scheduledPost scenario", func(t *testing.T) {
|
||||
// Drain any existing messages
|
||||
drained := false
|
||||
for !drained {
|
||||
select {
|
||||
case <-messages:
|
||||
default:
|
||||
drained = true
|
||||
}
|
||||
}
|
||||
|
||||
th.App.PublishScheduledPostEvent(th.Context, model.WebsocketScheduledPostCreated, nil, "fake_connection_id")
|
||||
|
||||
select {
|
||||
case msg := <-messages:
|
||||
t.Errorf("Expected no message, but got one: %+v", msg)
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
// there was no message sent to the channel, so test is successful
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6542,6 +6542,13 @@
|
|||
"id": "app.save_scheduled_post.save.app_error",
|
||||
"translation": "Error occurred saving the scheduled post."
|
||||
},
|
||||
{
|
||||
"id": "app.scheduled_post.failed_messages",
|
||||
"translation": {
|
||||
"one": "Failed to send {{.Count}} scheduled post.",
|
||||
"other": "Failed to send {{.Count}} scheduled posts."
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "app.scheme.delete.app_error",
|
||||
"translation": "Unable to delete this scheme."
|
||||
|
|
|
|||
Loading…
Reference in a new issue