2021-04-01 13:44:56 -04:00
|
|
|
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
|
|
|
|
// See LICENSE.txt for license information.
|
|
|
|
|
|
|
|
|
|
package remotecluster
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"bytes"
|
|
|
|
|
"context"
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"fmt"
|
2022-08-09 07:25:46 -04:00
|
|
|
"io"
|
2021-04-01 13:44:56 -04:00
|
|
|
"net/http"
|
|
|
|
|
"net/url"
|
|
|
|
|
"os"
|
|
|
|
|
"path"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/wiggin77/merror"
|
|
|
|
|
|
2023-06-11 01:24:35 -04:00
|
|
|
"github.com/mattermost/mattermost/server/public/model"
|
|
|
|
|
"github.com/mattermost/mattermost/server/public/shared/mlog"
|
2021-04-01 13:44:56 -04:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type SendMsgResultFunc func(msg model.RemoteClusterMsg, rc *model.RemoteCluster, resp *Response, err error)
|
|
|
|
|
|
|
|
|
|
type sendMsgTask struct {
|
|
|
|
|
rc *model.RemoteCluster
|
|
|
|
|
msg model.RemoteClusterMsg
|
|
|
|
|
f SendMsgResultFunc
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// BroadcastMsg asynchronously sends a message to all remote clusters interested in the message's topic.
|
|
|
|
|
//
|
|
|
|
|
// `ctx` determines behaviour when the outbound queue is full. A timeout or deadline context will return a
|
|
|
|
|
// BufferFullError if the message cannot be enqueued before the timeout. A background context will block indefinitely.
|
|
|
|
|
//
|
|
|
|
|
// An optional callback can be provided that receives the success or fail result of sending to each remote cluster.
|
|
|
|
|
// Success or fail is regarding message delivery only. If a callback is provided it should return quickly.
|
|
|
|
|
func (rcs *Service) BroadcastMsg(ctx context.Context, msg model.RemoteClusterMsg, f SendMsgResultFunc) error {
|
|
|
|
|
// get list of interested remotes.
|
|
|
|
|
filter := model.RemoteClusterQueryFilter{
|
|
|
|
|
Topic: msg.Topic,
|
|
|
|
|
}
|
2024-07-04 04:35:26 -04:00
|
|
|
list, err := rcs.server.GetStore().RemoteCluster().GetAll(0, 999999, filter)
|
2021-04-01 13:44:56 -04:00
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
errs := merror.New()
|
|
|
|
|
|
|
|
|
|
for _, rc := range list {
|
|
|
|
|
if err := rcs.SendMsg(ctx, msg, rc, f); err != nil {
|
|
|
|
|
errs.Append(err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return errs.ErrorOrNil()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// SendMsg asynchronously sends a message to a remote cluster.
|
|
|
|
|
//
|
|
|
|
|
// `ctx` determines behaviour when the outbound queue is full. A timeout or deadline context will return a
|
|
|
|
|
// BufferFullError if the message cannot be enqueued before the timeout. A background context will block indefinitely.
|
|
|
|
|
//
|
|
|
|
|
// Nil or error return indicates success or failure of message enqueue only.
|
|
|
|
|
//
|
|
|
|
|
// An optional callback can be provided that receives the response from the remote cluster. The `err` provided to the
|
|
|
|
|
// callback is regarding response decoding only. The `resp` contains the decoded bytes returned from the remote.
|
|
|
|
|
// If a callback is provided it should return quickly.
|
|
|
|
|
func (rcs *Service) SendMsg(ctx context.Context, msg model.RemoteClusterMsg, rc *model.RemoteCluster, f SendMsgResultFunc) error {
|
|
|
|
|
task := sendMsgTask{
|
|
|
|
|
rc: rc,
|
|
|
|
|
msg: msg,
|
|
|
|
|
f: f,
|
|
|
|
|
}
|
|
|
|
|
return rcs.enqueueTask(ctx, rc.RemoteId, task)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// sendMsg is called when a sendMsgTask is popped from the send channel.
|
|
|
|
|
func (rcs *Service) sendMsg(task sendMsgTask) {
|
|
|
|
|
var errResp error
|
|
|
|
|
var response Response
|
|
|
|
|
|
|
|
|
|
// Ensure a panic from the callback does not exit the pool goroutine.
|
|
|
|
|
defer func() {
|
|
|
|
|
if errResp != nil {
|
|
|
|
|
response.Err = errResp.Error()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// If callback provided then call it with the results.
|
|
|
|
|
if task.f != nil {
|
|
|
|
|
task.f(task.msg, task.rc, &response, errResp)
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
frame := &model.RemoteClusterFrame{
|
|
|
|
|
RemoteId: task.rc.RemoteId,
|
|
|
|
|
Msg: task.msg,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
u, err := url.Parse(task.rc.SiteURL)
|
|
|
|
|
if err != nil {
|
MM-68204: Use multi-level logging for shared channel and remote cluster service errors (#35949)
* Use multi-level logging for shared channel and remote cluster service errors
Service-specific log levels (LvlRemoteClusterServiceError, LvlSharedChannelServiceError)
were hidden from the main log by default, requiring explicit configuration to see them.
Switch all call sites to use LogM with multi-level combos so each log line is attributed
to both the standard level (error/warn) and the service-specific level. This surfaces
errors in the main log while preserving the ability to isolate them into dedicated files.
Each instance was reviewed and either kept as error (DB failures, security issues, config
errors, exhausted retries on critical data) or downgraded to warn (transient network
failures, remote-side reported errors with retry logic, non-critical data like profile
images, reactions, acknowledgements, and status).
2026-04-06 12:17:47 -04:00
|
|
|
rcs.server.Log().LogM(mlog.MlvlRemoteClusterServiceError, "Invalid siteURL while sending message to remote",
|
2021-04-01 13:44:56 -04:00
|
|
|
mlog.String("remote", task.rc.DisplayName),
|
|
|
|
|
mlog.String("msgId", task.msg.Id),
|
|
|
|
|
mlog.Err(err),
|
|
|
|
|
)
|
|
|
|
|
errResp = err
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
u.Path = path.Join(u.Path, SendMsgURL)
|
|
|
|
|
|
|
|
|
|
respJSON, err := rcs.sendFrameToRemote(SendTimeout, task.rc, frame, u.String())
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
MM-68204: Use multi-level logging for shared channel and remote cluster service errors (#35949)
* Use multi-level logging for shared channel and remote cluster service errors
Service-specific log levels (LvlRemoteClusterServiceError, LvlSharedChannelServiceError)
were hidden from the main log by default, requiring explicit configuration to see them.
Switch all call sites to use LogM with multi-level combos so each log line is attributed
to both the standard level (error/warn) and the service-specific level. This surfaces
errors in the main log while preserving the ability to isolate them into dedicated files.
Each instance was reviewed and either kept as error (DB failures, security issues, config
errors, exhausted retries on critical data) or downgraded to warn (transient network
failures, remote-side reported errors with retry logic, non-critical data like profile
images, reactions, acknowledgements, and status).
2026-04-06 12:17:47 -04:00
|
|
|
rcs.server.Log().LogM(mlog.MlvlRemoteClusterServiceError, "Remote Cluster send message failed",
|
2021-04-01 13:44:56 -04:00
|
|
|
mlog.String("remote", task.rc.DisplayName),
|
|
|
|
|
mlog.String("msgId", task.msg.Id),
|
|
|
|
|
mlog.Err(err),
|
|
|
|
|
)
|
|
|
|
|
errResp = err
|
|
|
|
|
} else {
|
2022-08-24 03:10:56 -04:00
|
|
|
rcs.server.Log().Log(mlog.LvlRemoteClusterServiceDebug, "Remote Cluster message sent successfully",
|
2021-04-01 13:44:56 -04:00
|
|
|
mlog.String("remote", task.rc.DisplayName),
|
|
|
|
|
mlog.String("msgId", task.msg.Id),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if err = json.Unmarshal(respJSON, &response); err != nil {
|
2022-08-24 03:10:56 -04:00
|
|
|
rcs.server.Log().Error("Invalid response sending message to remote cluster",
|
2021-04-01 13:44:56 -04:00
|
|
|
mlog.String("remote", task.rc.DisplayName),
|
|
|
|
|
mlog.Err(err),
|
|
|
|
|
)
|
|
|
|
|
errResp = err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (rcs *Service) sendFrameToRemote(timeout time.Duration, rc *model.RemoteCluster, frame *model.RemoteClusterFrame, url string) ([]byte, error) {
|
|
|
|
|
body, err := json.Marshal(frame)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
|
|
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
req.Header.Set("Content-Type", "application/json")
|
2021-07-12 14:05:36 -04:00
|
|
|
req.Header.Set(model.HeaderRemoteclusterId, rc.RemoteId)
|
|
|
|
|
req.Header.Set(model.HeaderRemoteclusterToken, rc.RemoteToken)
|
2021-04-01 13:44:56 -04:00
|
|
|
|
|
|
|
|
resp, err := rcs.httpClient.Do(req.WithContext(ctx))
|
|
|
|
|
if metrics := rcs.server.GetMetrics(); metrics != nil {
|
|
|
|
|
if err != nil || resp.StatusCode != http.StatusOK {
|
|
|
|
|
metrics.IncrementRemoteClusterMsgErrorsCounter(frame.RemoteId, os.IsTimeout(err))
|
|
|
|
|
} else {
|
|
|
|
|
metrics.IncrementRemoteClusterMsgSentCounter(frame.RemoteId)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
defer resp.Body.Close()
|
2022-08-09 07:25:46 -04:00
|
|
|
body, err = io.ReadAll(resp.Body)
|
2021-04-01 13:44:56 -04:00
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
|
|
|
return body, fmt.Errorf("unexpected response: %d - %s", resp.StatusCode, resp.Status)
|
|
|
|
|
}
|
|
|
|
|
return body, nil
|
|
|
|
|
}
|