mirror of
https://github.com/mattermost/mattermost.git
synced 2026-05-27 12:13:29 -04:00
Some checks are pending
API / build (push) Waiting to run
Server CI / Compute Go Version (push) Waiting to run
Server CI / Check mocks (push) Blocked by required conditions
Server CI / Check go mod tidy (push) Blocked by required conditions
Server CI / check-style (push) Blocked by required conditions
Server CI / Check serialization methods for hot structs (push) Blocked by required conditions
Server CI / Vet API (push) Blocked by required conditions
Server CI / Check migration files (push) Blocked by required conditions
Server CI / Generate email templates (push) Blocked by required conditions
Server CI / Check store layers (push) Blocked by required conditions
Server CI / Check mmctl docs (push) Blocked by required conditions
Server CI / Postgres with binary parameters (push) Blocked by required conditions
Server CI / Postgres (shard 0) (push) Blocked by required conditions
Server CI / Postgres (shard 1) (push) Blocked by required conditions
Server CI / Postgres (shard 2) (push) Blocked by required conditions
Server CI / Postgres (shard 3) (push) Blocked by required conditions
Server CI / Merge Postgres Test Results (push) Blocked by required conditions
Server CI / Postgres (FIPS) (push) Blocked by required conditions
Server CI / Generate Test Coverage (push) Blocked by required conditions
Server CI / Run mmctl tests (push) Blocked by required conditions
Server CI / Run mmctl tests (FIPS) (push) Blocked by required conditions
Server CI / Build mattermost server app (push) Blocked by required conditions
Tools CI / check-style (mattermost-govet) (push) Waiting to run
Tools CI / Test (mattermost-govet) (push) Waiting to run
Web App CI / check-lint (push) Waiting to run
Web App CI / check-i18n (push) Blocked by required conditions
Web App CI / check-external-links (push) Blocked by required conditions
Web App CI / check-types (push) Blocked by required conditions
Web App CI / test (platform) (push) Blocked by required conditions
Web App CI / test (mattermost-redux) (push) Blocked by required conditions
Web App CI / test (channels shard 1/4) (push) Blocked by required conditions
Web App CI / test (channels shard 2/4) (push) Blocked by required conditions
Web App CI / test (channels shard 3/4) (push) Blocked by required conditions
Web App CI / test (channels shard 4/4) (push) Blocked by required conditions
Web App CI / upload-coverage (push) Blocked by required conditions
Web App CI / build (push) Blocked by required conditions
* Use multi-level logging for shared channel and remote cluster service errors Service-specific log levels (LvlRemoteClusterServiceError, LvlSharedChannelServiceError) were hidden from the main log by default, requiring explicit configuration to see them. Switch all call sites to use LogM with multi-level combos so each log line is attributed to both the standard level (error/warn) and the service-specific level. This surfaces errors in the main log while preserving the ability to isolate them into dedicated files. Each instance was reviewed and either kept as error (DB failures, security issues, config errors, exhausted retries on critical data) or downgraded to warn (transient network failures, remote-side reported errors with retry logic, non-critical data like profile images, reactions, acknowledgements, and status).
152 lines
4.7 KiB
Go
152 lines
4.7 KiB
Go
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
|
// See LICENSE.txt for license information.
|
|
|
|
package remotecluster
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"mime/multipart"
|
|
"net/http"
|
|
"net/url"
|
|
"path"
|
|
"time"
|
|
|
|
"github.com/mattermost/mattermost/server/public/model"
|
|
"github.com/mattermost/mattermost/server/public/shared/mlog"
|
|
)
|
|
|
|
type SendProfileImageResultFunc func(userId string, rc *model.RemoteCluster, resp *Response, err error)
|
|
|
|
type sendProfileImageTask struct {
|
|
rc *model.RemoteCluster
|
|
userID string
|
|
provider ProfileImageProvider
|
|
f SendProfileImageResultFunc
|
|
}
|
|
|
|
type ProfileImageProvider interface {
|
|
GetProfileImage(user *model.User) ([]byte, bool, *model.AppError)
|
|
}
|
|
|
|
// SendProfileImage asynchronously sends a user's profile image to a remote cluster.
|
|
//
|
|
// `ctx` determines behaviour when the outbound queue is full. A timeout or deadline context will return a
|
|
// BufferFullError if the task cannot be enqueued before the timeout. A background context will block indefinitely.
|
|
//
|
|
// Nil or error return indicates success or failure of task enqueue only.
|
|
//
|
|
// An optional callback can be provided that receives the response from the remote cluster. The `err` provided to the
|
|
// callback is regarding image delivery only. The `resp` contains the decoded bytes returned from the remote.
|
|
// If a callback is provided it should return quickly.
|
|
func (rcs *Service) SendProfileImage(ctx context.Context, userID string, rc *model.RemoteCluster, provider ProfileImageProvider, f SendProfileImageResultFunc) error {
|
|
task := sendProfileImageTask{
|
|
rc: rc,
|
|
userID: userID,
|
|
provider: provider,
|
|
f: f,
|
|
}
|
|
return rcs.enqueueTask(ctx, rc.RemoteId, task)
|
|
}
|
|
|
|
// sendProfileImage is called when a sendProfileImageTask is popped from the send channel.
|
|
func (rcs *Service) sendProfileImage(task sendProfileImageTask) {
|
|
err := rcs.sendProfileImageToRemote(SendTimeout, task)
|
|
var response Response
|
|
|
|
if err != nil {
|
|
rcs.server.Log().LogM(mlog.MlvlRemoteClusterServiceWarn, "Remote Cluster send profile image failed",
|
|
mlog.String("remote", task.rc.DisplayName),
|
|
mlog.String("UserId", task.userID),
|
|
mlog.Err(err),
|
|
)
|
|
response.Status = ResponseStatusFail
|
|
response.Err = err.Error()
|
|
} else {
|
|
rcs.server.Log().Log(mlog.LvlRemoteClusterServiceDebug, "Remote Cluster profile image sent successfully",
|
|
mlog.String("remote", task.rc.DisplayName),
|
|
mlog.String("UserId", task.userID),
|
|
)
|
|
response.Status = ResponseStatusOK
|
|
}
|
|
|
|
// If callback provided then call it with the results.
|
|
if task.f != nil {
|
|
task.f(task.userID, task.rc, &response, err)
|
|
}
|
|
}
|
|
|
|
func (rcs *Service) sendProfileImageToRemote(timeout time.Duration, task sendProfileImageTask) error {
|
|
start := time.Now()
|
|
defer func() {
|
|
if metrics := rcs.server.GetMetrics(); metrics != nil {
|
|
metrics.ObserveSharedChannelsSyncSendStepDuration(task.rc.RemoteId, "ProfileImages", time.Since(start).Seconds())
|
|
}
|
|
}()
|
|
|
|
rcs.server.Log().Log(mlog.LvlRemoteClusterServiceDebug, "sending profile image to remote...",
|
|
mlog.String("remote", task.rc.DisplayName),
|
|
mlog.String("UserId", task.userID),
|
|
)
|
|
|
|
user, err := rcs.server.GetStore().User().Get(context.Background(), task.userID)
|
|
if err != nil {
|
|
return fmt.Errorf("error fetching user while sending profile image to remote %s: %w", task.rc.RemoteId, err)
|
|
}
|
|
|
|
img, _, appErr := task.provider.GetProfileImage(user) // get Reader for the file
|
|
if appErr != nil {
|
|
return fmt.Errorf("error fetching profile image for user (%s) while sending to remote %s: %w", task.userID, task.rc.RemoteId, appErr)
|
|
}
|
|
|
|
u, err := url.Parse(task.rc.SiteURL)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid siteURL while sending file to remote %s: %w", task.rc.RemoteId, err)
|
|
}
|
|
u.Path = path.Join(u.Path, model.APIURLSuffix, "remotecluster", task.userID, "image")
|
|
|
|
body := &bytes.Buffer{}
|
|
writer := multipart.NewWriter(body)
|
|
|
|
part, err := writer.CreateFormFile("image", "profile.png")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err = io.Copy(part, bytes.NewBuffer(img)); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = writer.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
req, err := http.NewRequest("POST", u.String(), body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.Header.Set("Content-Type", writer.FormDataContentType())
|
|
req.Header.Set(model.HeaderRemoteclusterId, task.rc.RemoteId)
|
|
req.Header.Set(model.HeaderRemoteclusterToken, task.rc.RemoteToken)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
|
|
resp, err := rcs.httpClient.Do(req.WithContext(ctx))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
_, err = io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return fmt.Errorf("unexpected response: %d - %s", resp.StatusCode, resp.Status)
|
|
}
|
|
return nil
|
|
}
|