mirror of
https://github.com/mattermost/mattermost.git
synced 2026-05-28 04:35:04 -04:00
Add --workers flag to mmctl import process to control concurrency (#35582)
Some checks are pending
API / build (push) Waiting to run
Server CI / Compute Go Version (push) Waiting to run
Server CI / Check mocks (push) Blocked by required conditions
Server CI / Check go mod tidy (push) Blocked by required conditions
Server CI / check-style (push) Blocked by required conditions
Server CI / Check serialization methods for hot structs (push) Blocked by required conditions
Server CI / Vet API (push) Blocked by required conditions
Server CI / Check migration files (push) Blocked by required conditions
Server CI / Generate email templates (push) Blocked by required conditions
Server CI / Check store layers (push) Blocked by required conditions
Server CI / Check mmctl docs (push) Blocked by required conditions
Server CI / Postgres with binary parameters (push) Blocked by required conditions
Server CI / Postgres (push) Blocked by required conditions
Server CI / Postgres (FIPS) (push) Blocked by required conditions
Server CI / Generate Test Coverage (push) Blocked by required conditions
Server CI / Run mmctl tests (push) Blocked by required conditions
Server CI / Run mmctl tests (FIPS) (push) Blocked by required conditions
Server CI / Build mattermost server app (push) Blocked by required conditions
Web App CI / check-lint (push) Waiting to run
Web App CI / check-i18n (push) Blocked by required conditions
Web App CI / check-external-links (push) Blocked by required conditions
Web App CI / check-types (push) Blocked by required conditions
Web App CI / test (platform) (push) Blocked by required conditions
Web App CI / test (mattermost-redux) (push) Blocked by required conditions
Web App CI / test (channels shard 1/4) (push) Blocked by required conditions
Web App CI / test (channels shard 2/4) (push) Blocked by required conditions
Web App CI / test (channels shard 3/4) (push) Blocked by required conditions
Web App CI / test (channels shard 4/4) (push) Blocked by required conditions
Web App CI / upload-coverage (push) Blocked by required conditions
Web App CI / build (push) Blocked by required conditions
Some checks are pending
API / build (push) Waiting to run
Server CI / Compute Go Version (push) Waiting to run
Server CI / Check mocks (push) Blocked by required conditions
Server CI / Check go mod tidy (push) Blocked by required conditions
Server CI / check-style (push) Blocked by required conditions
Server CI / Check serialization methods for hot structs (push) Blocked by required conditions
Server CI / Vet API (push) Blocked by required conditions
Server CI / Check migration files (push) Blocked by required conditions
Server CI / Generate email templates (push) Blocked by required conditions
Server CI / Check store layers (push) Blocked by required conditions
Server CI / Check mmctl docs (push) Blocked by required conditions
Server CI / Postgres with binary parameters (push) Blocked by required conditions
Server CI / Postgres (push) Blocked by required conditions
Server CI / Postgres (FIPS) (push) Blocked by required conditions
Server CI / Generate Test Coverage (push) Blocked by required conditions
Server CI / Run mmctl tests (push) Blocked by required conditions
Server CI / Run mmctl tests (FIPS) (push) Blocked by required conditions
Server CI / Build mattermost server app (push) Blocked by required conditions
Web App CI / check-lint (push) Waiting to run
Web App CI / check-i18n (push) Blocked by required conditions
Web App CI / check-external-links (push) Blocked by required conditions
Web App CI / check-types (push) Blocked by required conditions
Web App CI / test (platform) (push) Blocked by required conditions
Web App CI / test (mattermost-redux) (push) Blocked by required conditions
Web App CI / test (channels shard 1/4) (push) Blocked by required conditions
Web App CI / test (channels shard 2/4) (push) Blocked by required conditions
Web App CI / test (channels shard 3/4) (push) Blocked by required conditions
Web App CI / test (channels shard 4/4) (push) Blocked by required conditions
Web App CI / upload-coverage (push) Blocked by required conditions
Web App CI / build (push) Blocked by required conditions
* Add --workers flag to mmctl import process to control concurrency The bulk import worker count was hardcoded to runtime.NumCPU(), causing high database load on the master during imports on live systems. This is particularly impactful for incremental Slack imports where all users are re-imported each time, generating 8-15 DB operations per user against the master (due to LockToMaster). The new --workers flag allows administrators to reduce concurrency (e.g., --workers 1) to minimize impact on live users at the cost of longer import duration. Defaults to 0 which preserves the existing runtime.NumCPU() behavior. * Add max workers limit, capped at CPU Count * 4
This commit is contained in:
parent
33437b7ef6
commit
3e38cbc5ca
3 changed files with 106 additions and 26 deletions
|
|
@ -124,8 +124,16 @@ func MakeWorker(jobServer *jobs.JobServer, app AppIface) *jobs.SimpleWorker {
|
|||
defer jsonFile.Close()
|
||||
|
||||
extractContent := job.Data["extract_content"] == "true"
|
||||
|
||||
numWorkers := runtime.NumCPU()
|
||||
if workersStr, ok := job.Data["workers"]; ok {
|
||||
if n, err := strconv.Atoi(workersStr); err == nil && n > 0 {
|
||||
numWorkers = n
|
||||
}
|
||||
}
|
||||
|
||||
// do the actual import.
|
||||
lineNumber, appErr := app.BulkImportWithPath(appContext, jsonFile, importZipReader, false, extractContent, runtime.NumCPU(), model.ExportDataDir)
|
||||
lineNumber, appErr := app.BulkImportWithPath(appContext, jsonFile, importZipReader, false, extractContent, numWorkers, model.ExportDataDir)
|
||||
if appErr != nil {
|
||||
job.Data["line_number"] = strconv.Itoa(lineNumber)
|
||||
return appErr
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import (
|
|||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"text/template"
|
||||
|
|
@ -123,6 +124,7 @@ func init() {
|
|||
|
||||
ImportProcessCmd.Flags().Bool("bypass-upload", false, "If this is set, the file is not processed from the server, but rather directly read from the filesystem. Works only in --local mode.")
|
||||
ImportProcessCmd.Flags().Bool("extract-content", true, "If this is set, document attachments will be extracted and indexed during the import process. It is advised to disable it to improve performance.")
|
||||
ImportProcessCmd.Flags().Int("workers", 0, "The number of concurrent import worker goroutines. Controls database load during import. When set to 0 (default), uses the number of CPUs available. Maximum allowed is 4x the CPU count.")
|
||||
|
||||
ImportListCmd.AddCommand(
|
||||
ImportListAvailableCmd,
|
||||
|
|
@ -310,14 +312,25 @@ func importProcessCmdF(c client.Client, command *cobra.Command, args []string) e
|
|||
}
|
||||
|
||||
extractContent, _ := command.Flags().GetBool("extract-content")
|
||||
workers, _ := command.Flags().GetInt("workers")
|
||||
|
||||
maxWorkers := runtime.NumCPU() * 4
|
||||
if workers > maxWorkers {
|
||||
return fmt.Errorf("workers value %d exceeds maximum allowed (%d = 4 * CPU count)", workers, maxWorkers)
|
||||
}
|
||||
|
||||
jobData := map[string]string{
|
||||
"import_file": importFile,
|
||||
"local_mode": strconv.FormatBool(isLocal && bypassUpload),
|
||||
"extract_content": strconv.FormatBool(extractContent),
|
||||
}
|
||||
if workers > 0 {
|
||||
jobData["workers"] = strconv.Itoa(workers)
|
||||
}
|
||||
|
||||
job, _, err := c.CreateJob(context.TODO(), &model.Job{
|
||||
Type: model.JobTypeImportProcess,
|
||||
Data: map[string]string{
|
||||
"import_file": importFile,
|
||||
"local_mode": strconv.FormatBool(isLocal && bypassUpload),
|
||||
"extract_content": strconv.FormatBool(extractContent),
|
||||
},
|
||||
Data: jobData,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create import process job: %w", err)
|
||||
|
|
|
|||
|
|
@ -10,6 +10,8 @@ import (
|
|||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
|
@ -211,28 +213,85 @@ func (s *MmctlUnitTestSuite) TestImportJobListCmdF() {
|
|||
}
|
||||
|
||||
func (s *MmctlUnitTestSuite) TestImportProcessCmdF() {
|
||||
printer.Clean()
|
||||
importFile := "import.zip"
|
||||
mockJob := &model.Job{
|
||||
Type: model.JobTypeImportProcess,
|
||||
Data: map[string]string{
|
||||
"import_file": importFile,
|
||||
"local_mode": "false",
|
||||
"extract_content": "false",
|
||||
},
|
||||
}
|
||||
s.Run("default workers", func() {
|
||||
printer.Clean()
|
||||
importFile := "import.zip"
|
||||
mockJob := &model.Job{
|
||||
Type: model.JobTypeImportProcess,
|
||||
Data: map[string]string{
|
||||
"import_file": importFile,
|
||||
"local_mode": "false",
|
||||
"extract_content": "false",
|
||||
},
|
||||
}
|
||||
|
||||
s.client.
|
||||
EXPECT().
|
||||
CreateJob(context.TODO(), mockJob).
|
||||
Return(mockJob, &model.Response{}, nil).
|
||||
Times(1)
|
||||
s.client.
|
||||
EXPECT().
|
||||
CreateJob(context.TODO(), mockJob).
|
||||
Return(mockJob, &model.Response{}, nil).
|
||||
Times(1)
|
||||
|
||||
err := importProcessCmdF(s.client, &cobra.Command{}, []string{importFile})
|
||||
s.Require().Nil(err)
|
||||
s.Len(printer.GetLines(), 1)
|
||||
s.Empty(printer.GetErrorLines())
|
||||
s.Equal(mockJob, printer.GetLines()[0].(*model.Job))
|
||||
cmd := &cobra.Command{}
|
||||
cmd.Flags().Bool("bypass-upload", false, "")
|
||||
cmd.Flags().Bool("extract-content", false, "")
|
||||
cmd.Flags().Int("workers", 0, "")
|
||||
|
||||
err := importProcessCmdF(s.client, cmd, []string{importFile})
|
||||
s.Require().Nil(err)
|
||||
s.Len(printer.GetLines(), 1)
|
||||
s.Empty(printer.GetErrorLines())
|
||||
s.Equal(mockJob, printer.GetLines()[0].(*model.Job))
|
||||
})
|
||||
|
||||
s.Run("workers exceeds max", func() {
|
||||
printer.Clean()
|
||||
importFile := "import.zip"
|
||||
tooMany := runtime.NumCPU()*4 + 1
|
||||
|
||||
cmd := &cobra.Command{}
|
||||
cmd.Flags().Bool("bypass-upload", false, "")
|
||||
cmd.Flags().Bool("extract-content", false, "")
|
||||
cmd.Flags().Int("workers", 0, "")
|
||||
_ = cmd.Flags().Set("workers", strconv.Itoa(tooMany))
|
||||
|
||||
err := importProcessCmdF(s.client, cmd, []string{importFile})
|
||||
s.Require().NotNil(err)
|
||||
s.Contains(err.Error(), "exceeds maximum allowed")
|
||||
s.Empty(printer.GetLines())
|
||||
s.Empty(printer.GetErrorLines())
|
||||
})
|
||||
|
||||
s.Run("custom workers", func() {
|
||||
printer.Clean()
|
||||
importFile := "import.zip"
|
||||
mockJob := &model.Job{
|
||||
Type: model.JobTypeImportProcess,
|
||||
Data: map[string]string{
|
||||
"import_file": importFile,
|
||||
"local_mode": "false",
|
||||
"extract_content": "false",
|
||||
"workers": "2",
|
||||
},
|
||||
}
|
||||
|
||||
s.client.
|
||||
EXPECT().
|
||||
CreateJob(context.TODO(), mockJob).
|
||||
Return(mockJob, &model.Response{}, nil).
|
||||
Times(1)
|
||||
|
||||
cmd := &cobra.Command{}
|
||||
cmd.Flags().Bool("bypass-upload", false, "")
|
||||
cmd.Flags().Bool("extract-content", false, "")
|
||||
cmd.Flags().Int("workers", 0, "")
|
||||
_ = cmd.Flags().Set("workers", "2")
|
||||
|
||||
err := importProcessCmdF(s.client, cmd, []string{importFile})
|
||||
s.Require().Nil(err)
|
||||
s.Len(printer.GetLines(), 1)
|
||||
s.Empty(printer.GetErrorLines())
|
||||
s.Equal(mockJob, printer.GetLines()[0].(*model.Job))
|
||||
})
|
||||
}
|
||||
|
||||
func (s *MmctlUnitTestSuite) TestImportValidateCmdF() {
|
||||
|
|
|
|||
Loading…
Reference in a new issue