mirror of
https://github.com/mattermost/mattermost.git
synced 2026-05-28 04:35:04 -04:00
Some checks are pending
API / build (push) Waiting to run
Server CI / Compute Go Version (push) Waiting to run
Server CI / Check mocks (push) Blocked by required conditions
Server CI / Check go mod tidy (push) Blocked by required conditions
Server CI / check-style (push) Blocked by required conditions
Server CI / Check serialization methods for hot structs (push) Blocked by required conditions
Server CI / Vet API (push) Blocked by required conditions
Server CI / Check migration files (push) Blocked by required conditions
Server CI / Generate email templates (push) Blocked by required conditions
Server CI / Check store layers (push) Blocked by required conditions
Server CI / Check mmctl docs (push) Blocked by required conditions
Server CI / Postgres with binary parameters (push) Blocked by required conditions
Server CI / Postgres (push) Blocked by required conditions
Server CI / Postgres (FIPS) (push) Blocked by required conditions
Server CI / Generate Test Coverage (push) Blocked by required conditions
Server CI / Run mmctl tests (push) Blocked by required conditions
Server CI / Run mmctl tests (FIPS) (push) Blocked by required conditions
Server CI / Build mattermost server app (push) Blocked by required conditions
Web App CI / check-lint (push) Waiting to run
Web App CI / check-i18n (push) Blocked by required conditions
Web App CI / check-external-links (push) Blocked by required conditions
Web App CI / check-types (push) Blocked by required conditions
Web App CI / test (platform) (push) Blocked by required conditions
Web App CI / test (mattermost-redux) (push) Blocked by required conditions
Web App CI / test (channels shard 1/4) (push) Blocked by required conditions
Web App CI / test (channels shard 2/4) (push) Blocked by required conditions
Web App CI / test (channels shard 3/4) (push) Blocked by required conditions
Web App CI / test (channels shard 4/4) (push) Blocked by required conditions
Web App CI / upload-coverage (push) Blocked by required conditions
Web App CI / build (push) Blocked by required conditions
* Add --workers flag to mmctl import process to control concurrency The bulk import worker count was hardcoded to runtime.NumCPU(), causing high database load on the master during imports on live systems. This is particularly impactful for incremental Slack imports where all users are re-imported each time, generating 8-15 DB operations per user against the master (due to LockToMaster). The new --workers flag allows administrators to reduce concurrency (e.g., --workers 1) to minimize impact on live users at the cost of longer import duration. Defaults to 0 which preserves the existing runtime.NumCPU() behavior. * Add max workers limit, capped at CPU Count * 4
153 lines
4.9 KiB
Go
153 lines
4.9 KiB
Go
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
|
// See LICENSE.txt for license information.
|
|
|
|
package import_process
|
|
|
|
import (
|
|
"archive/zip"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/mattermost/mattermost/server/public/model"
|
|
"github.com/mattermost/mattermost/server/public/shared/configservice"
|
|
"github.com/mattermost/mattermost/server/public/shared/mlog"
|
|
"github.com/mattermost/mattermost/server/public/shared/request"
|
|
"github.com/mattermost/mattermost/server/v8/channels/app/imports"
|
|
"github.com/mattermost/mattermost/server/v8/channels/jobs"
|
|
"github.com/mattermost/mattermost/server/v8/platform/shared/filestore"
|
|
)
|
|
|
|
type AppIface interface {
|
|
configservice.ConfigService
|
|
RemoveFile(path string) *model.AppError
|
|
FileExists(path string) (bool, *model.AppError)
|
|
FileSize(path string) (int64, *model.AppError)
|
|
FileReader(path string) (filestore.ReadCloseSeeker, *model.AppError)
|
|
BulkImportWithPath(rctx request.CTX, jsonlReader io.Reader, attachmentsReader *zip.Reader, dryRun, extractContent bool, workers int, importPath string) (int, *model.AppError)
|
|
Log() *mlog.Logger
|
|
}
|
|
|
|
func MakeWorker(jobServer *jobs.JobServer, app AppIface) *jobs.SimpleWorker {
|
|
const workerName = "ImportProcess"
|
|
|
|
appContext := request.EmptyContext(jobServer.Logger())
|
|
isEnabled := func(cfg *model.Config) bool {
|
|
return true
|
|
}
|
|
execute := func(logger mlog.LoggerIFace, job *model.Job) error {
|
|
defer jobServer.HandleJobPanic(logger, job)
|
|
|
|
importFileName, ok := job.Data["import_file"]
|
|
if !ok {
|
|
return model.NewAppError("ImportProcessWorker", "import_process.worker.do_job.missing_file", nil, "", http.StatusBadRequest)
|
|
}
|
|
|
|
var importFilePath string
|
|
var importFileSize int64
|
|
var importFile filestore.ReadCloseSeeker
|
|
if job.Data["local_mode"] == "true" {
|
|
// We simply read the file from the local filesystem.
|
|
info, err := os.Stat(importFileName)
|
|
if errors.Is(err, os.ErrNotExist) {
|
|
return fmt.Errorf("file %s doesn't exist.", importFileName)
|
|
}
|
|
|
|
importFileSize = info.Size()
|
|
|
|
importFile, err = os.Open(importFileName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer importFile.Close()
|
|
} else {
|
|
importFilePath = filepath.Join(*app.Config().ImportSettings.Directory, importFileName)
|
|
if ok, err := app.FileExists(importFilePath); err != nil {
|
|
return err
|
|
} else if !ok {
|
|
return model.NewAppError("ImportProcessWorker", "import_process.worker.do_job.file_exists", nil, "", http.StatusBadRequest)
|
|
}
|
|
|
|
var appErr *model.AppError
|
|
importFileSize, appErr = app.FileSize(importFilePath)
|
|
if appErr != nil {
|
|
return appErr
|
|
}
|
|
|
|
importFile, appErr = app.FileReader(importFilePath)
|
|
if appErr != nil {
|
|
return appErr
|
|
}
|
|
defer importFile.Close()
|
|
|
|
// The import is a long running operation, try to cancel any timeouts attached to the reader.
|
|
type TimeoutCanceler interface{ CancelTimeout() bool }
|
|
if tc, ok := importFile.(TimeoutCanceler); ok {
|
|
if !tc.CancelTimeout() {
|
|
appContext.Logger().Warn("Could not cancel the timeout for the file reader. The import may fail due to a timeout.")
|
|
}
|
|
}
|
|
}
|
|
|
|
importZipReader, err := zip.NewReader(importFile.(io.ReaderAt), importFileSize)
|
|
if err != nil {
|
|
return model.NewAppError("ImportProcessWorker", "import_process.worker.do_job.open_file", nil, "", http.StatusInternalServerError).Wrap(err)
|
|
}
|
|
|
|
// find JSONL import file.
|
|
var jsonZipFile *zip.File
|
|
for _, f := range importZipReader.File {
|
|
if imports.IsRootJsonlFile(f.Name) {
|
|
jsonZipFile = f
|
|
break
|
|
}
|
|
}
|
|
if jsonZipFile == nil {
|
|
return model.NewAppError("ImportProcessWorker", "import_process.worker.do_job.missing_jsonl", nil, "jsonFile was nil", http.StatusBadRequest)
|
|
}
|
|
|
|
// avoid "zip slip"
|
|
if strings.Contains(jsonZipFile.Name, "..") {
|
|
return model.NewAppError("ImportProcessWorker", "import_process.worker.do_job.open_file", nil, "jsonFilePath contains path traversal", http.StatusForbidden)
|
|
}
|
|
|
|
jsonFile, err := jsonZipFile.Open()
|
|
if err != nil {
|
|
return model.NewAppError("ImportProcessWorker", "import_process.worker.do_job.open_file", nil, "", http.StatusInternalServerError).Wrap(err)
|
|
}
|
|
defer jsonFile.Close()
|
|
|
|
extractContent := job.Data["extract_content"] == "true"
|
|
|
|
numWorkers := runtime.NumCPU()
|
|
if workersStr, ok := job.Data["workers"]; ok {
|
|
if n, err := strconv.Atoi(workersStr); err == nil && n > 0 {
|
|
numWorkers = n
|
|
}
|
|
}
|
|
|
|
// do the actual import.
|
|
lineNumber, appErr := app.BulkImportWithPath(appContext, jsonFile, importZipReader, false, extractContent, numWorkers, model.ExportDataDir)
|
|
if appErr != nil {
|
|
job.Data["line_number"] = strconv.Itoa(lineNumber)
|
|
return appErr
|
|
}
|
|
|
|
// No need to remove the file in local mode.
|
|
if job.Data["local_mode"] != "true" {
|
|
// remove import file when done.
|
|
if appErr := app.RemoveFile(importFilePath); appErr != nil {
|
|
return appErr
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
worker := jobs.NewSimpleWorker(workerName, jobServer, execute, isEnabled)
|
|
return worker
|
|
}
|