From 3e38cbc5ca2230b513d12748d7def54075d5f3fc Mon Sep 17 00:00:00 2001 From: Doug Lauder Date: Fri, 13 Mar 2026 21:30:32 -0400 Subject: [PATCH] Add --workers flag to mmctl import process to control concurrency (#35582) * Add --workers flag to mmctl import process to control concurrency The bulk import worker count was hardcoded to runtime.NumCPU(), causing high database load on the master during imports on live systems. This is particularly impactful for incremental Slack imports where all users are re-imported each time, generating 8-15 DB operations per user against the master (due to LockToMaster). The new --workers flag allows administrators to reduce concurrency (e.g., --workers 1) to minimize impact on live users at the cost of longer import duration. Defaults to 0 which preserves the existing runtime.NumCPU() behavior. * Add max workers limit, capped at CPU Count * 4 --- server/channels/jobs/import_process/worker.go | 10 +- server/cmd/mmctl/commands/import.go | 23 ++++- server/cmd/mmctl/commands/import_test.go | 99 +++++++++++++++---- 3 files changed, 106 insertions(+), 26 deletions(-) diff --git a/server/channels/jobs/import_process/worker.go b/server/channels/jobs/import_process/worker.go index f9e974b8405..df1113b8479 100644 --- a/server/channels/jobs/import_process/worker.go +++ b/server/channels/jobs/import_process/worker.go @@ -124,8 +124,16 @@ func MakeWorker(jobServer *jobs.JobServer, app AppIface) *jobs.SimpleWorker { defer jsonFile.Close() extractContent := job.Data["extract_content"] == "true" + + numWorkers := runtime.NumCPU() + if workersStr, ok := job.Data["workers"]; ok { + if n, err := strconv.Atoi(workersStr); err == nil && n > 0 { + numWorkers = n + } + } + // do the actual import. - lineNumber, appErr := app.BulkImportWithPath(appContext, jsonFile, importZipReader, false, extractContent, runtime.NumCPU(), model.ExportDataDir) + lineNumber, appErr := app.BulkImportWithPath(appContext, jsonFile, importZipReader, false, extractContent, numWorkers, model.ExportDataDir) if appErr != nil { job.Data["line_number"] = strconv.Itoa(lineNumber) return appErr diff --git a/server/cmd/mmctl/commands/import.go b/server/cmd/mmctl/commands/import.go index 8283127dd0a..cb6173d35e8 100644 --- a/server/cmd/mmctl/commands/import.go +++ b/server/cmd/mmctl/commands/import.go @@ -11,6 +11,7 @@ import ( "os" "path" "path/filepath" + "runtime" "strconv" "strings" "text/template" @@ -123,6 +124,7 @@ func init() { ImportProcessCmd.Flags().Bool("bypass-upload", false, "If this is set, the file is not processed from the server, but rather directly read from the filesystem. Works only in --local mode.") ImportProcessCmd.Flags().Bool("extract-content", true, "If this is set, document attachments will be extracted and indexed during the import process. It is advised to disable it to improve performance.") + ImportProcessCmd.Flags().Int("workers", 0, "The number of concurrent import worker goroutines. Controls database load during import. When set to 0 (default), uses the number of CPUs available. Maximum allowed is 4x the CPU count.") ImportListCmd.AddCommand( ImportListAvailableCmd, @@ -310,14 +312,25 @@ func importProcessCmdF(c client.Client, command *cobra.Command, args []string) e } extractContent, _ := command.Flags().GetBool("extract-content") + workers, _ := command.Flags().GetInt("workers") + + maxWorkers := runtime.NumCPU() * 4 + if workers > maxWorkers { + return fmt.Errorf("workers value %d exceeds maximum allowed (%d = 4 * CPU count)", workers, maxWorkers) + } + + jobData := map[string]string{ + "import_file": importFile, + "local_mode": strconv.FormatBool(isLocal && bypassUpload), + "extract_content": strconv.FormatBool(extractContent), + } + if workers > 0 { + jobData["workers"] = strconv.Itoa(workers) + } job, _, err := c.CreateJob(context.TODO(), &model.Job{ Type: model.JobTypeImportProcess, - Data: map[string]string{ - "import_file": importFile, - "local_mode": strconv.FormatBool(isLocal && bypassUpload), - "extract_content": strconv.FormatBool(extractContent), - }, + Data: jobData, }) if err != nil { return fmt.Errorf("failed to create import process job: %w", err) diff --git a/server/cmd/mmctl/commands/import_test.go b/server/cmd/mmctl/commands/import_test.go index 68cbb2242c6..f29433e0d16 100644 --- a/server/cmd/mmctl/commands/import_test.go +++ b/server/cmd/mmctl/commands/import_test.go @@ -10,6 +10,8 @@ import ( "net/http" "os" "path/filepath" + "runtime" + "strconv" "strings" "github.com/pkg/errors" @@ -211,28 +213,85 @@ func (s *MmctlUnitTestSuite) TestImportJobListCmdF() { } func (s *MmctlUnitTestSuite) TestImportProcessCmdF() { - printer.Clean() - importFile := "import.zip" - mockJob := &model.Job{ - Type: model.JobTypeImportProcess, - Data: map[string]string{ - "import_file": importFile, - "local_mode": "false", - "extract_content": "false", - }, - } + s.Run("default workers", func() { + printer.Clean() + importFile := "import.zip" + mockJob := &model.Job{ + Type: model.JobTypeImportProcess, + Data: map[string]string{ + "import_file": importFile, + "local_mode": "false", + "extract_content": "false", + }, + } - s.client. - EXPECT(). - CreateJob(context.TODO(), mockJob). - Return(mockJob, &model.Response{}, nil). - Times(1) + s.client. + EXPECT(). + CreateJob(context.TODO(), mockJob). + Return(mockJob, &model.Response{}, nil). + Times(1) - err := importProcessCmdF(s.client, &cobra.Command{}, []string{importFile}) - s.Require().Nil(err) - s.Len(printer.GetLines(), 1) - s.Empty(printer.GetErrorLines()) - s.Equal(mockJob, printer.GetLines()[0].(*model.Job)) + cmd := &cobra.Command{} + cmd.Flags().Bool("bypass-upload", false, "") + cmd.Flags().Bool("extract-content", false, "") + cmd.Flags().Int("workers", 0, "") + + err := importProcessCmdF(s.client, cmd, []string{importFile}) + s.Require().Nil(err) + s.Len(printer.GetLines(), 1) + s.Empty(printer.GetErrorLines()) + s.Equal(mockJob, printer.GetLines()[0].(*model.Job)) + }) + + s.Run("workers exceeds max", func() { + printer.Clean() + importFile := "import.zip" + tooMany := runtime.NumCPU()*4 + 1 + + cmd := &cobra.Command{} + cmd.Flags().Bool("bypass-upload", false, "") + cmd.Flags().Bool("extract-content", false, "") + cmd.Flags().Int("workers", 0, "") + _ = cmd.Flags().Set("workers", strconv.Itoa(tooMany)) + + err := importProcessCmdF(s.client, cmd, []string{importFile}) + s.Require().NotNil(err) + s.Contains(err.Error(), "exceeds maximum allowed") + s.Empty(printer.GetLines()) + s.Empty(printer.GetErrorLines()) + }) + + s.Run("custom workers", func() { + printer.Clean() + importFile := "import.zip" + mockJob := &model.Job{ + Type: model.JobTypeImportProcess, + Data: map[string]string{ + "import_file": importFile, + "local_mode": "false", + "extract_content": "false", + "workers": "2", + }, + } + + s.client. + EXPECT(). + CreateJob(context.TODO(), mockJob). + Return(mockJob, &model.Response{}, nil). + Times(1) + + cmd := &cobra.Command{} + cmd.Flags().Bool("bypass-upload", false, "") + cmd.Flags().Bool("extract-content", false, "") + cmd.Flags().Int("workers", 0, "") + _ = cmd.Flags().Set("workers", "2") + + err := importProcessCmdF(s.client, cmd, []string{importFile}) + s.Require().Nil(err) + s.Len(printer.GetLines(), 1) + s.Empty(printer.GetErrorLines()) + s.Equal(mockJob, printer.GetLines()[0].(*model.Job)) + }) } func (s *MmctlUnitTestSuite) TestImportValidateCmdF() {