diff --git a/api4/api.go b/api4/api.go index 0e8ba10e266..ad761ec14a4 100644 --- a/api4/api.go +++ b/api4/api.go @@ -123,6 +123,8 @@ type Routes struct { Groups *mux.Router // 'api/v4/groups' Cloud *mux.Router // 'api/v4/cloud' + + Imports *mux.Router // 'api/v4/imports' } type API struct { @@ -235,6 +237,8 @@ func Init(configservice configservice.ConfigService, globalOptionsFunc app.AppOp api.BaseRoutes.Cloud = api.BaseRoutes.ApiRoot.PathPrefix("/cloud").Subrouter() + api.BaseRoutes.Imports = api.BaseRoutes.ApiRoot.PathPrefix("/imports").Subrouter() + api.InitUser() api.InitBot() api.InitTeam() @@ -271,6 +275,7 @@ func Init(configservice configservice.ConfigService, globalOptionsFunc app.AppOp api.InitGroup() api.InitAction() api.InitCloud() + api.InitImport() root.Handle("/api/v4/{anything:.*}", http.HandlerFunc(api.Handle404)) @@ -335,6 +340,13 @@ func InitLocal(configservice configservice.ConfigService, globalOptionsFunc app. api.BaseRoutes.Roles = api.BaseRoutes.ApiRoot.PathPrefix("/roles").Subrouter() + api.BaseRoutes.Uploads = api.BaseRoutes.ApiRoot.PathPrefix("/uploads").Subrouter() + api.BaseRoutes.Upload = api.BaseRoutes.Uploads.PathPrefix("/{upload_id:[A-Za-z0-9]+}").Subrouter() + + api.BaseRoutes.Imports = api.BaseRoutes.ApiRoot.PathPrefix("/imports").Subrouter() + + api.BaseRoutes.Jobs = api.BaseRoutes.ApiRoot.PathPrefix("/jobs").Subrouter() + api.InitUserLocal() api.InitTeamLocal() api.InitChannelLocal() @@ -349,6 +361,9 @@ func InitLocal(configservice configservice.ConfigService, globalOptionsFunc app. api.InitSystemLocal() api.InitPostLocal() api.InitRoleLocal() + api.InitUploadLocal() + api.InitImportLocal() + api.InitJobLocal() root.Handle("/api/v4/{anything:.*}", http.HandlerFunc(api.Handle404)) diff --git a/api4/import.go b/api4/import.go new file mode 100644 index 00000000000..b91d5c5a679 --- /dev/null +++ b/api4/import.go @@ -0,0 +1,36 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package api4 + +import ( + "encoding/json" + "net/http" + + "github.com/mattermost/mattermost-server/v5/model" +) + +func (api *API) InitImport() { + api.BaseRoutes.Imports.Handle("", api.ApiSessionRequired(listImports)).Methods("GET") +} + +func listImports(c *Context, w http.ResponseWriter, r *http.Request) { + if !c.IsSystemAdmin() { + c.SetPermissionError(model.PERMISSION_MANAGE_SYSTEM) + return + } + + imports, appErr := c.App.ListImports() + if appErr != nil { + c.Err = appErr + return + } + + data, err := json.Marshal(imports) + if err != nil { + c.Err = model.NewAppError("listImports", "app.import.marshal.app_error", nil, err.Error(), http.StatusInternalServerError) + return + } + + w.Write(data) +} diff --git a/api4/import_local.go b/api4/import_local.go new file mode 100644 index 00000000000..bf338c88f60 --- /dev/null +++ b/api4/import_local.go @@ -0,0 +1,8 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package api4 + +func (api *API) InitImportLocal() { + api.BaseRoutes.Imports.Handle("", api.ApiLocal(listImports)).Methods("GET") +} diff --git a/api4/import_test.go b/api4/import_test.go new file mode 100644 index 00000000000..746226f6f1f --- /dev/null +++ b/api4/import_test.go @@ -0,0 +1,106 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package api4 + +import ( + "os" + "path/filepath" + "testing" + + "github.com/mattermost/mattermost-server/v5/model" + "github.com/mattermost/mattermost-server/v5/utils/fileutils" + + "github.com/stretchr/testify/require" +) + +func TestListImports(t *testing.T) { + th := Setup(t) + defer th.TearDown() + + testsDir, _ := fileutils.FindDir("tests") + require.NotEmpty(t, testsDir) + + uploadNewImport := func(c *model.Client4, t *testing.T) string { + file, err := os.Open(testsDir + "/import_test.zip") + require.Nil(t, err) + + info, err := file.Stat() + require.Nil(t, err) + + us := &model.UploadSession{ + Filename: info.Name(), + FileSize: info.Size(), + Type: model.UploadTypeImport, + } + + if c == th.LocalClient { + us.UserId = model.UploadNoUserID + } + + u, resp := c.CreateUpload(us) + require.Nil(t, resp.Error) + require.NotNil(t, u) + + finfo, resp := c.UploadData(u.Id, file) + require.Nil(t, resp.Error) + require.NotNil(t, finfo) + + return u.Id + } + + t.Run("no permissions", func(t *testing.T) { + imports, resp := th.Client.ListImports() + require.Error(t, resp.Error) + require.Equal(t, "api.context.permissions.app_error", resp.Error.Id) + require.Nil(t, imports) + }) + + dataDir, found := fileutils.FindDir("data") + require.True(t, found) + + th.TestForSystemAdminAndLocal(t, func(t *testing.T, c *model.Client4) { + imports, resp := c.ListImports() + require.Nil(t, resp.Error) + require.Empty(t, imports) + }, "no imports") + + th.TestForSystemAdminAndLocal(t, func(t *testing.T, c *model.Client4) { + id := uploadNewImport(c, t) + id2 := uploadNewImport(c, t) + + importDir := filepath.Join(dataDir, "import") + f, err := os.Create(filepath.Join(importDir, "import.zip.tmp")) + require.Nil(t, err) + f.Close() + + imports, resp := c.ListImports() + require.Nil(t, resp.Error) + require.NotEmpty(t, imports) + require.Len(t, imports, 2) + require.Contains(t, imports, id+"_import_test.zip") + require.Contains(t, imports, id2+"_import_test.zip") + + require.Nil(t, os.RemoveAll(importDir)) + }, "expected imports") + + th.TestForSystemAdminAndLocal(t, func(t *testing.T, c *model.Client4) { + th.App.UpdateConfig(func(cfg *model.Config) { *cfg.ImportSettings.Directory = "import_new" }) + defer th.App.UpdateConfig(func(cfg *model.Config) { *cfg.ImportSettings.Directory = "import" }) + + importDir := filepath.Join(dataDir, "import_new") + + imports, resp := c.ListImports() + require.Nil(t, resp.Error) + require.Empty(t, imports) + + id := uploadNewImport(c, t) + imports, resp = c.ListImports() + require.Nil(t, resp.Error) + require.NotEmpty(t, imports) + require.Len(t, imports, 1) + require.Equal(t, id+"_import_test.zip", imports[0]) + + require.Nil(t, os.RemoveAll(importDir)) + }, "change import directory") +} diff --git a/api4/job_local.go b/api4/job_local.go new file mode 100644 index 00000000000..0e9dc346748 --- /dev/null +++ b/api4/job_local.go @@ -0,0 +1,12 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package api4 + +func (api *API) InitJobLocal() { + api.BaseRoutes.Jobs.Handle("", api.ApiLocal(getJobs)).Methods("GET") + api.BaseRoutes.Jobs.Handle("", api.ApiLocal(createJob)).Methods("POST") + api.BaseRoutes.Jobs.Handle("/{job_id:[A-Za-z0-9]+}", api.ApiLocal(getJob)).Methods("GET") + api.BaseRoutes.Jobs.Handle("/{job_id:[A-Za-z0-9]+}/cancel", api.ApiLocal(cancelJob)).Methods("POST") + api.BaseRoutes.Jobs.Handle("/type/{job_type:[A-Za-z0-9_-]+}", api.ApiLocal(getJobsByType)).Methods("GET") +} diff --git a/api4/upload.go b/api4/upload.go index 236276009af..c1a2ac0b909 100644 --- a/api4/upload.go +++ b/api4/upload.go @@ -37,14 +37,23 @@ func createUpload(c *Context, w http.ResponseWriter, r *http.Request) { defer c.LogAuditRec(auditRec) auditRec.AddMeta("upload", us) - if !c.App.SessionHasPermissionToChannel(*c.App.Session(), us.ChannelId, model.PERMISSION_UPLOAD_FILE) { - c.SetPermissionError(model.PERMISSION_UPLOAD_FILE) - return + if us.Type == model.UploadTypeImport { + if !c.IsSystemAdmin() { + c.SetPermissionError(model.PERMISSION_MANAGE_SYSTEM) + return + } + } else { + if !c.App.SessionHasPermissionToChannel(*c.App.Session(), us.ChannelId, model.PERMISSION_UPLOAD_FILE) { + c.SetPermissionError(model.PERMISSION_UPLOAD_FILE) + return + } + us.Type = model.UploadTypeAttachment } us.Id = model.NewId() - us.Type = model.UploadTypeAttachment - us.UserId = c.App.Session().UserId + if c.App.Session().UserId != "" { + us.UserId = c.App.Session().UserId + } us, err := c.App.CreateUploadSession(us) if err != nil { c.Err = err @@ -68,7 +77,7 @@ func getUpload(c *Context, w http.ResponseWriter, r *http.Request) { return } - if us.UserId != c.App.Session().UserId { + if us.UserId != c.App.Session().UserId && !c.IsSystemAdmin() { c.Err = model.NewAppError("getUpload", "api.upload.get_upload.forbidden.app_error", nil, "", http.StatusForbidden) return } @@ -98,9 +107,16 @@ func uploadData(c *Context, w http.ResponseWriter, r *http.Request) { return } - if us.UserId != c.App.Session().UserId || !c.App.SessionHasPermissionToChannel(*c.App.Session(), us.ChannelId, model.PERMISSION_UPLOAD_FILE) { - c.SetPermissionError(model.PERMISSION_UPLOAD_FILE) - return + if us.Type == model.UploadTypeImport { + if !c.IsSystemAdmin() { + c.SetPermissionError(model.PERMISSION_MANAGE_SYSTEM) + return + } + } else { + if us.UserId != c.App.Session().UserId || !c.App.SessionHasPermissionToChannel(*c.App.Session(), us.ChannelId, model.PERMISSION_UPLOAD_FILE) { + c.SetPermissionError(model.PERMISSION_UPLOAD_FILE) + return + } } boundary, parseErr := parseMultipartRequestHeader(r) diff --git a/api4/upload_local.go b/api4/upload_local.go new file mode 100644 index 00000000000..136beb7c2fc --- /dev/null +++ b/api4/upload_local.go @@ -0,0 +1,10 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package api4 + +func (api *API) InitUploadLocal() { + api.BaseRoutes.Uploads.Handle("", api.ApiLocal(createUpload)).Methods("POST") + api.BaseRoutes.Upload.Handle("", api.ApiLocal(getUpload)).Methods("GET") + api.BaseRoutes.Upload.Handle("", api.ApiLocal(uploadData)).Methods("POST") +} diff --git a/api4/upload_test.go b/api4/upload_test.go index 92b0c8704a6..bd3ac4c1a3b 100644 --- a/api4/upload_test.go +++ b/api4/upload_test.go @@ -8,9 +8,11 @@ import ( "io" "mime/multipart" "net/http" + "os" "testing" "github.com/mattermost/mattermost-server/v5/model" + "github.com/mattermost/mattermost-server/v5/utils/fileutils" "github.com/stretchr/testify/require" ) @@ -51,6 +53,41 @@ func TestCreateUpload(t *testing.T) { require.NotEmpty(t, u) require.Equal(t, http.StatusCreated, resp.StatusCode) }) + + t.Run("import file", func(t *testing.T) { + testsDir, _ := fileutils.FindDir("tests") + + importFile, err := os.Open(testsDir + "/import_test.zip") + require.Nil(t, err) + defer importFile.Close() + + info, err := importFile.Stat() + require.Nil(t, err) + + t.Run("permissions error", func(t *testing.T) { + us := &model.UploadSession{ + Filename: info.Name(), + FileSize: info.Size(), + Type: model.UploadTypeImport, + } + u, resp := th.Client.CreateUpload(us) + require.Nil(t, u) + require.Error(t, resp.Error) + require.Equal(t, "api.context.permissions.app_error", resp.Error.Id) + require.Equal(t, http.StatusForbidden, resp.StatusCode) + }) + + t.Run("success", func(t *testing.T) { + us := &model.UploadSession{ + Filename: info.Name(), + FileSize: info.Size(), + Type: model.UploadTypeImport, + } + u, resp := th.SystemAdminClient.CreateUpload(us) + require.Nil(t, resp.Error) + require.NotEmpty(t, u) + }) + }) } func TestGetUpload(t *testing.T) { diff --git a/api4/user_local.go b/api4/user_local.go index 5b4b0488288..de0294389d5 100644 --- a/api4/user_local.go +++ b/api4/user_local.go @@ -38,6 +38,8 @@ func (api *API) InitUserLocal() { api.BaseRoutes.Users.Handle("/migrate_auth/ldap", api.ApiLocal(migrateAuthToLDAP)).Methods("POST") api.BaseRoutes.Users.Handle("/migrate_auth/saml", api.ApiLocal(migrateAuthToSaml)).Methods("POST") + + api.BaseRoutes.User.Handle("/uploads", api.ApiLocal(localGetUploadsForUser)).Methods("GET") } func localGetUsers(c *Context, w http.ResponseWriter, r *http.Request) { @@ -315,3 +317,13 @@ func localGetUserByEmail(c *Context, w http.ResponseWriter, r *http.Request) { w.Header().Set(model.HEADER_ETAG_SERVER, etag) w.Write([]byte(user.ToJson())) } + +func localGetUploadsForUser(c *Context, w http.ResponseWriter, r *http.Request) { + uss, err := c.App.GetUploadSessionsForUser(c.Params.UserId) + if err != nil { + c.Err = err + return + } + + w.Write([]byte(model.UploadSessionsToJson(uss))) +} diff --git a/app/app.go b/app/app.go index 3cfe92f7e1a..d52d27c4813 100644 --- a/app/app.go +++ b/app/app.go @@ -115,6 +115,9 @@ func (a *App) initJobs() { if productNoticesJobInterface != nil { a.srv.Jobs.ProductNotices = productNoticesJobInterface(a) } + if jobsImportProcessInterface != nil { + a.srv.Jobs.ImportProcess = jobsImportProcessInterface(a) + } if jobsActiveUsersInterface != nil { a.srv.Jobs.ActiveUsers = jobsActiveUsersInterface(a) diff --git a/app/app_iface.go b/app/app_iface.go index da786f83286..f16eeb87efb 100644 --- a/app/app_iface.go +++ b/app/app_iface.go @@ -385,6 +385,7 @@ type AppIface interface { BuildSamlMetadataObject(idpMetadata []byte) (*model.SamlMetadataResponse, *model.AppError) BulkExport(writer io.Writer, file string, pathToEmojiDir string, dirNameToExportEmoji string) *model.AppError BulkImport(fileReader io.Reader, dryRun bool, workers int) (*model.AppError, int) + BulkImportWithPath(fileReader io.Reader, dryRun bool, workers int, importPath string) (*model.AppError, int) CancelJob(jobId string) *model.AppError ChannelMembersToAdd(since int64, channelID *string) ([]*model.UserChannelIDPair, *model.AppError) ChannelMembersToRemove(teamID *string) ([]*model.ChannelMember, *model.AppError) @@ -494,6 +495,7 @@ type AppIface interface { FetchSamlMetadataFromIdp(url string) ([]byte, *model.AppError) FileBackend() (filesstore.FileBackend, *model.AppError) FileExists(path string) (bool, *model.AppError) + FileSize(path string) (int64, *model.AppError) FillInChannelProps(channel *model.Channel) *model.AppError FillInChannelsProps(channelList *model.ChannelList) *model.AppError FilterUsersByVisible(viewer *model.User, otherUsers []*model.User) ([]*model.User, *model.AppError) @@ -770,6 +772,7 @@ type AppIface interface { LimitedClientConfig() map[string]string ListAllCommands(teamId string, T goi18n.TranslateFunc) ([]*model.Command, *model.AppError) ListDirectory(path string) ([]string, *model.AppError) + ListImports() ([]string, *model.AppError) ListPluginKeys(pluginId string, page, perPage int) ([]string, *model.AppError) ListTeamCommands(teamId string) ([]*model.Command, *model.AppError) Log() *mlog.Logger diff --git a/app/enterprise.go b/app/enterprise.go index 7a18b625f21..523d31fc404 100644 --- a/app/enterprise.go +++ b/app/enterprise.go @@ -108,6 +108,12 @@ func RegisterJobsExpiryNotifyJobInterface(f func(*App) tjobs.ExpiryNotifyJobInte jobsExpiryNotifyInterface = f } +var jobsImportProcessInterface func(*App) tjobs.ImportProcessInterface + +func RegisterJobsImportProcessInterface(f func(*App) tjobs.ImportProcessInterface) { + jobsImportProcessInterface = f +} + var productNoticesJobInterface func(*App) tjobs.ProductNoticesJobInterface func RegisterProductNoticesJobInterface(f func(*App) tjobs.ProductNoticesJobInterface) { diff --git a/app/file.go b/app/file.go index 92dec77a4b2..83f32c749f3 100644 --- a/app/file.go +++ b/app/file.go @@ -101,6 +101,14 @@ func (a *App) FileExists(path string) (bool, *model.AppError) { return backend.FileExists(path) } +func (a *App) FileSize(path string) (int64, *model.AppError) { + backend, err := a.FileBackend() + if err != nil { + return 0, err + } + return backend.FileSize(path) +} + func (a *App) MoveFile(oldPath, newPath string) *model.AppError { backend, err := a.FileBackend() if err != nil { diff --git a/app/import.go b/app/import.go index 42c9180b702..cefbb97dc1f 100644 --- a/app/import.go +++ b/app/import.go @@ -5,9 +5,11 @@ package app import ( "bufio" + "bytes" "encoding/json" "io" "net/http" + "path/filepath" "strings" "sync" @@ -29,6 +31,46 @@ func stopOnError(err LineImportWorkerError) bool { return true } +func rewriteAttachmentPaths(files *[]AttachmentImportData, basePath string) { + if files == nil { + return + } + for _, f := range *files { + if f.Path != nil { + *f.Path = filepath.Join(basePath, *f.Path) + } + } +} + +func rewriteFilePaths(line *LineImportData, basePath string) { + switch line.Type { + case "post", "direct_post": + var replies []ReplyImportData + if line.Type == "direct_post" { + rewriteAttachmentPaths(line.DirectPost.Attachments, basePath) + if line.DirectPost.Replies != nil { + replies = *line.DirectPost.Replies + } + } else { + rewriteAttachmentPaths(line.Post.Attachments, basePath) + if line.Post.Replies != nil { + replies = *line.Post.Replies + } + } + for _, reply := range replies { + rewriteAttachmentPaths(reply.Attachments, basePath) + } + case "user": + if line.User.ProfileImage != nil { + *line.User.ProfileImage = filepath.Join(basePath, *line.User.ProfileImage) + } + case "emoji": + if line.Emoji.Image != nil { + *line.Emoji.Image = filepath.Join(basePath, *line.Emoji.Image) + } + } +} + func (a *App) bulkImportWorker(dryRun bool, wg *sync.WaitGroup, lines <-chan LineImportWorkerData, errors chan<- LineImportWorkerError) { postLines := []LineImportWorkerData{} directPostLines := []LineImportWorkerData{} @@ -77,6 +119,14 @@ func (a *App) bulkImportWorker(dryRun bool, wg *sync.WaitGroup, lines <-chan Lin } func (a *App) BulkImport(fileReader io.Reader, dryRun bool, workers int) (*model.AppError, int) { + return a.bulkImport(fileReader, dryRun, workers, "") +} + +func (a *App) BulkImportWithPath(fileReader io.Reader, dryRun bool, workers int, importPath string) (*model.AppError, int) { + return a.bulkImport(fileReader, dryRun, workers, importPath) +} + +func (a *App) bulkImport(fileReader io.Reader, dryRun bool, workers int, importPath string) (*model.AppError, int) { scanner := bufio.NewScanner(fileReader) buf := make([]byte, 0, 64*1024) scanner.Buffer(buf, maxScanTokenSize) @@ -92,7 +142,7 @@ func (a *App) BulkImport(fileReader io.Reader, dryRun bool, workers int) (*model lastLineType := "" for scanner.Scan() { - decoder := json.NewDecoder(strings.NewReader(scanner.Text())) + decoder := json.NewDecoder(bytes.NewReader(scanner.Bytes())) lineNumber++ var line LineImportData @@ -100,6 +150,10 @@ func (a *App) BulkImport(fileReader io.Reader, dryRun bool, workers int) (*model return model.NewAppError("BulkImport", "app.import.bulk_import.json_decode.error", nil, err.Error(), http.StatusBadRequest), lineNumber } + if importPath != "" { + rewriteFilePaths(&line, importPath) + } + if lineNumber == 1 { importDataFileVersion, appErr := processImportDataFileVersionLine(line) if appErr != nil { @@ -214,3 +268,20 @@ func (a *App) importLine(line LineImportData, dryRun bool) *model.AppError { return model.NewAppError("BulkImport", "app.import.import_line.unknown_line_type.error", map[string]interface{}{"Type": line.Type}, "", http.StatusBadRequest) } } + +func (a *App) ListImports() ([]string, *model.AppError) { + imports, appErr := a.ListDirectory(*a.Config().ImportSettings.Directory) + if appErr != nil { + return nil, appErr + } + + results := make([]string, 0, len(imports)) + for i := 0; i < len(imports); i++ { + filename := filepath.Base(imports[i]) + if !strings.HasSuffix(filename, incompleteUploadSuffix) { + results = append(results, filename) + } + } + + return results, nil +} diff --git a/app/import_functions.go b/app/import_functions.go index 73487474aca..9a51b8c1033 100644 --- a/app/import_functions.go +++ b/app/import_functions.go @@ -532,6 +532,7 @@ func (a *App) importUser(data *UserImportData, dryRun bool) *model.AppError { if err != nil { mlog.Error("Unable to open the profile image.", mlog.Any("err", err)) } + defer file.Close() if err := a.SetProfileImageFromMultiPartFile(savedUser.Id, file); err != nil { mlog.Error("Unable to set the profile image from a file.", mlog.Any("err", err)) } @@ -1740,6 +1741,7 @@ func (a *App) importEmoji(data *EmojiImportData, dryRun bool) *model.AppError { if err != nil { return model.NewAppError("BulkImport", "app.import.emoji.bad_file.error", map[string]interface{}{"EmojiName": *data.Name}, "", http.StatusBadRequest) } + defer file.Close() if _, err := a.WriteFile(file, getEmojiImagePath(emoji.Id)); err != nil { return err diff --git a/app/import_test.go b/app/import_test.go index 6d25f1b2fce..6bb0e9d20d8 100644 --- a/app/import_test.go +++ b/app/import_test.go @@ -4,8 +4,11 @@ package app import ( + "io/ioutil" "net/http" + "os" "path/filepath" + "runtime" "strings" "testing" @@ -13,6 +16,7 @@ import ( "github.com/stretchr/testify/require" "github.com/mattermost/mattermost-server/v5/model" + "github.com/mattermost/mattermost-server/v5/utils" "github.com/mattermost/mattermost-server/v5/utils/fileutils" ) @@ -192,7 +196,7 @@ func TestImportBulkImport(t *testing.T) { // Run bulk import using a valid and large input and a \r\n line break. t.Run("", func(t *testing.T) { - posts := `{"type": "post"` + strings.Repeat(`, "post": {"team": "`+teamName+`", "channel": "`+channelName+`", "user": "`+username+`", "message": "Repeat after me", "create_at": 193456789012}`, 1E4) + "}" + posts := `{"type": "post"` + strings.Repeat(`, "post": {"team": "`+teamName+`", "channel": "`+channelName+`", "user": "`+username+`", "message": "Repeat after me", "create_at": 193456789012}`, 1e4) + "}" data4 := `{"type": "version", "version": 1} {"type": "team", "team": {"type": "O", "display_name": "lskmw2d7a5ao7ppwqh5ljchvr4", "name": "` + teamName + `"}} {"type": "channel", "channel": {"type": "O", "display_name": "xr6m6udffngark2uekvr3hoeny", "team": "` + teamName + `", "name": "` + channelName + `"}} @@ -263,3 +267,124 @@ func AssertFileIdsInPost(files []*model.FileInfo, th *TestHelper, t *testing.T) assert.Contains(t, posts[0].FileIds, file.Id) } } + +func TestRewriteFilePaths(t *testing.T) { + genAttachments := func() *[]AttachmentImportData { + return &[]AttachmentImportData{ + { + Path: model.NewString("file.jpg"), + }, + { + Path: model.NewString("somedir/file.jpg"), + }, + } + } + + line := LineImportData{ + Type: "post", + Post: &PostImportData{ + Attachments: genAttachments(), + }, + } + + line2 := LineImportData{ + Type: "direct_post", + DirectPost: &DirectPostImportData{ + Attachments: genAttachments(), + }, + } + + userLine := LineImportData{ + Type: "user", + User: &UserImportData{ + ProfileImage: model.NewString("profile.jpg"), + }, + } + + emojiLine := LineImportData{ + Type: "emoji", + Emoji: &EmojiImportData{ + Image: model.NewString("emoji.png"), + }, + } + + t.Run("empty path", func(t *testing.T) { + expected := &[]AttachmentImportData{ + { + Path: model.NewString("file.jpg"), + }, + { + Path: model.NewString("somedir/file.jpg"), + }, + } + rewriteFilePaths(&line, "") + require.Equal(t, expected, line.Post.Attachments) + rewriteFilePaths(&line2, "") + require.Equal(t, expected, line2.DirectPost.Attachments) + }) + + t.Run("valid path", func(t *testing.T) { + expected := &[]AttachmentImportData{ + { + Path: model.NewString("/tmp/file.jpg"), + }, + { + Path: model.NewString("/tmp/somedir/file.jpg"), + }, + } + + t.Run("post attachments", func(t *testing.T) { + rewriteFilePaths(&line, "/tmp") + require.Equal(t, expected, line.Post.Attachments) + }) + + t.Run("direct post attachments", func(t *testing.T) { + rewriteFilePaths(&line2, "/tmp") + require.Equal(t, expected, line2.DirectPost.Attachments) + }) + + t.Run("profile image", func(t *testing.T) { + expected := "/tmp/profile.jpg" + rewriteFilePaths(&userLine, "/tmp") + require.Equal(t, expected, *userLine.User.ProfileImage) + }) + + t.Run("emoji", func(t *testing.T) { + expected := "/tmp/emoji.png" + rewriteFilePaths(&emojiLine, "/tmp") + require.Equal(t, expected, *emojiLine.Emoji.Image) + }) + }) +} + +func BenchmarkBulkImport(b *testing.B) { + th := Setup(b) + defer th.TearDown() + + testsDir, _ := fileutils.FindDir("tests") + + importFile, err := os.Open(testsDir + "/import_test.zip") + require.Nil(b, err) + defer importFile.Close() + + info, err := importFile.Stat() + require.Nil(b, err) + + dir, err := ioutil.TempDir("", "testimport") + require.Nil(b, err) + defer os.RemoveAll(dir) + + _, err = utils.UnzipToPath(importFile, info.Size(), dir) + require.Nil(b, err) + + jsonFile, err := os.Open(dir + "/import.jsonl") + require.Nil(b, err) + defer jsonFile.Close() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + err, _ := th.App.BulkImportWithPath(jsonFile, false, runtime.NumCPU(), dir) + require.Nil(b, err) + } + b.StopTimer() +} diff --git a/app/opentracing/opentracing_layer.go b/app/opentracing/opentracing_layer.go index acf02250fe2..f033a9e9dad 100644 --- a/app/opentracing/opentracing_layer.go +++ b/app/opentracing/opentracing_layer.go @@ -922,6 +922,28 @@ func (a *OpenTracingAppLayer) BulkImport(fileReader io.Reader, dryRun bool, work return resultVar0, resultVar1 } +func (a *OpenTracingAppLayer) BulkImportWithPath(fileReader io.Reader, dryRun bool, workers int, importPath string) (*model.AppError, int) { + origCtx := a.ctx + span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.BulkImportWithPath") + + a.ctx = newCtx + a.app.Srv().Store.SetContext(newCtx) + defer func() { + a.app.Srv().Store.SetContext(origCtx) + a.ctx = origCtx + }() + + defer span.Finish() + resultVar0, resultVar1 := a.app.BulkImportWithPath(fileReader, dryRun, workers, importPath) + + if resultVar0 != nil { + span.LogFields(spanlog.Error(resultVar0)) + ext.Error.Set(span, true) + } + + return resultVar0, resultVar1 +} + func (a *OpenTracingAppLayer) CancelJob(jobId string) *model.AppError { origCtx := a.ctx span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.CancelJob") @@ -3667,6 +3689,28 @@ func (a *OpenTracingAppLayer) FileReader(path string) (filesstore.ReadCloseSeeke return resultVar0, resultVar1 } +func (a *OpenTracingAppLayer) FileSize(path string) (int64, *model.AppError) { + origCtx := a.ctx + span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.FileSize") + + a.ctx = newCtx + a.app.Srv().Store.SetContext(newCtx) + defer func() { + a.app.Srv().Store.SetContext(origCtx) + a.ctx = origCtx + }() + + defer span.Finish() + resultVar0, resultVar1 := a.app.FileSize(path) + + if resultVar1 != nil { + span.LogFields(spanlog.Error(resultVar1)) + ext.Error.Set(span, true) + } + + return resultVar0, resultVar1 +} + func (a *OpenTracingAppLayer) FillInChannelProps(channel *model.Channel) *model.AppError { origCtx := a.ctx span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.FillInChannelProps") @@ -10252,6 +10296,28 @@ func (a *OpenTracingAppLayer) ListDirectory(path string) ([]string, *model.AppEr return resultVar0, resultVar1 } +func (a *OpenTracingAppLayer) ListImports() ([]string, *model.AppError) { + origCtx := a.ctx + span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.ListImports") + + a.ctx = newCtx + a.app.Srv().Store.SetContext(newCtx) + defer func() { + a.app.Srv().Store.SetContext(origCtx) + a.ctx = origCtx + }() + + defer span.Finish() + resultVar0, resultVar1 := a.app.ListImports() + + if resultVar1 != nil { + span.LogFields(spanlog.Error(resultVar1)) + ext.Error.Set(span, true) + } + + return resultVar0, resultVar1 +} + func (a *OpenTracingAppLayer) ListPluginKeys(pluginId string, page int, perPage int) ([]string, *model.AppError) { origCtx := a.ctx span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.ListPluginKeys") diff --git a/app/upload.go b/app/upload.go index 0019d7eae34..b56552484bc 100644 --- a/app/upload.go +++ b/app/upload.go @@ -19,6 +19,7 @@ import ( ) const minFirstPartSize = 5 * 1024 * 1024 // 5MB +const incompleteUploadSuffix = ".tmp" func (a *App) runPluginsHook(info *model.FileInfo, file io.Reader) *model.AppError { pluginsEnvironment := a.GetPluginsEnvironment() @@ -108,19 +109,25 @@ func (a *App) CreateUploadSession(us *model.UploadSession) (*model.UploadSession us.FileOffset = 0 now := time.Now() us.CreateAt = model.GetMillisForTime(now) - us.Path = now.Format("20060102") + "/teams/noteam/channels/" + us.ChannelId + "/users/" + us.UserId + "/" + us.Id + "/" + filepath.Base(us.Filename) + if us.Type == model.UploadTypeAttachment { + us.Path = now.Format("20060102") + "/teams/noteam/channels/" + us.ChannelId + "/users/" + us.UserId + "/" + us.Id + "/" + filepath.Base(us.Filename) + } else if us.Type == model.UploadTypeImport { + us.Path = *a.Config().ImportSettings.Directory + "/" + us.Id + "_" + filepath.Base(us.Filename) + } if err := us.IsValid(); err != nil { return nil, err } - channel, err := a.GetChannel(us.ChannelId) - if err != nil { - return nil, model.NewAppError("CreateUploadSession", "app.upload.create.incorrect_channel_id.app_error", - map[string]interface{}{"channelId": us.ChannelId}, "", http.StatusBadRequest) - } - if channel.DeleteAt != 0 { - return nil, model.NewAppError("CreateUploadSession", "app.upload.create.cannot_upload_to_deleted_channel.app_error", - map[string]interface{}{"channelId": us.ChannelId}, "", http.StatusBadRequest) + if us.Type == model.UploadTypeAttachment { + channel, err := a.GetChannel(us.ChannelId) + if err != nil { + return nil, model.NewAppError("CreateUploadSession", "app.upload.create.incorrect_channel_id.app_error", + map[string]interface{}{"channelId": us.ChannelId}, "", http.StatusBadRequest) + } + if channel.DeleteAt != 0 { + return nil, model.NewAppError("CreateUploadSession", "app.upload.create.cannot_upload_to_deleted_channel.app_error", + map[string]interface{}{"channelId": us.ChannelId}, "", http.StatusBadRequest) + } } us, storeErr := a.Srv().Store.UploadSession().Save(us) @@ -186,6 +193,11 @@ func (a *App) UploadData(us *model.UploadSession, rd io.Reader) (*model.FileInfo nil, "FileOffset mismatch", http.StatusBadRequest) } + uploadPath := us.Path + if us.Type == model.UploadTypeImport { + uploadPath += incompleteUploadSuffix + } + // make sure it's not possible to upload more data than what is expected. lr := &io.LimitedReader{ R: rd, @@ -195,12 +207,12 @@ func (a *App) UploadData(us *model.UploadSession, rd io.Reader) (*model.FileInfo var written int64 if us.FileOffset == 0 { // new upload - written, err = a.WriteFile(lr, us.Path) + written, err = a.WriteFile(lr, uploadPath) if err != nil && written == 0 { return nil, err } if written < minFirstPartSize && written != us.FileSize { - a.RemoveFile(us.Path) + a.RemoveFile(uploadPath) var errStr string if err != nil { errStr = err.Error() @@ -210,7 +222,7 @@ func (a *App) UploadData(us *model.UploadSession, rd io.Reader) (*model.FileInfo } } else if us.FileOffset < us.FileSize { // resume upload - written, err = a.AppendFile(lr, us.Path) + written, err = a.AppendFile(lr, uploadPath) } if written > 0 { us.FileOffset += written @@ -228,7 +240,7 @@ func (a *App) UploadData(us *model.UploadSession, rd io.Reader) (*model.FileInfo } // upload is done, create FileInfo - file, err := a.FileReader(us.Path) + file, err := a.FileReader(uploadPath) if err != nil { return nil, model.NewAppError("UploadData", "app.upload.upload_data.read_file.app_error", nil, err.Error(), http.StatusInternalServerError) } @@ -260,13 +272,19 @@ func (a *App) UploadData(us *model.UploadSession, rd io.Reader) (*model.FileInfo nameWithoutExtension := info.Name[:strings.LastIndex(info.Name, ".")] info.PreviewPath = filepath.Dir(info.Path) + "/" + nameWithoutExtension + "_preview.jpg" info.ThumbnailPath = filepath.Dir(info.Path) + "/" + nameWithoutExtension + "_thumb.jpg" - imgData, fileErr := a.ReadFile(us.Path) + imgData, fileErr := a.ReadFile(uploadPath) if fileErr != nil { return nil, fileErr } a.HandleImages([]string{info.PreviewPath}, []string{info.ThumbnailPath}, [][]byte{imgData}) } + if us.Type == model.UploadTypeImport { + if err := a.MoveFile(uploadPath, us.Path); err != nil { + return nil, model.NewAppError("UploadData", "app.upload.upload_data.move_file.app_error", nil, err.Error(), http.StatusInternalServerError) + } + } + var storeErr error if info, storeErr = a.Srv().Store.FileInfo().Save(info); storeErr != nil { var appErr *model.AppError diff --git a/i18n/en.json b/i18n/en.json index a3bba9f6d0e..f8cfbe6304d 100644 --- a/i18n/en.json +++ b/i18n/en.json @@ -1344,6 +1344,14 @@ "id": "api.file.file_exists.s3.app_error", "translation": "Unable to check if the file exists." }, + { + "id": "api.file.file_size.local.app_error", + "translation": "Unable to get the file size." + }, + { + "id": "api.file.file_size.s3.app_error", + "translation": "Unable to get the file size." + }, { "id": "api.file.get_file.public_invalid.app_error", "translation": "The public link does not appear to be valid." @@ -4254,6 +4262,10 @@ "id": "app.import.import_user_teams.save_preferences.error", "translation": "Unable to save the team theme preferences" }, + { + "id": "app.import.marshal.app_error", + "translation": "Unable to marshal response." + }, { "id": "app.import.process_import_data_file_version_line.invalid_version.error", "translation": "Unable to read the version of the data import file." @@ -5518,6 +5530,10 @@ "id": "app.upload.upload_data.large_image.app_error", "translation": "{{.Filename}} dimensions ({{.Width}} by {{.Height}} pixels) exceed the limits." }, + { + "id": "app.upload.upload_data.move_file.app_error", + "translation": "Failed to move uploaded file." + }, { "id": "app.upload.upload_data.read_file.app_error", "translation": "Failed to read a file." @@ -6626,6 +6642,30 @@ "id": "groups.unsupported_syncable_type", "translation": "Unsupported syncable type '{{.Value}}'." }, + { + "id": "import_process.worker.do_job.file_exists", + "translation": "Unable to process import: file does not exists." + }, + { + "id": "import_process.worker.do_job.missing_file", + "translation": "Unable to process import: import_file parameter is missing." + }, + { + "id": "import_process.worker.do_job.missing_jsonl", + "translation": "Unable to process import: JSONL file is missing." + }, + { + "id": "import_process.worker.do_job.open_file", + "translation": "Unable to process import: failed to open file." + }, + { + "id": "import_process.worker.do_job.tmp_dir", + "translation": "Unable to process import: failed to create temporary directory." + }, + { + "id": "import_process.worker.do_job.unzip", + "translation": "Unable to process import: failed to unzip file." + }, { "id": "interactive_message.decode_trigger_id.base64_decode_failed", "translation": "Failed to decode base64 for trigger ID for interactive dialog." @@ -7126,6 +7166,14 @@ "id": "model.config.is_valid.image_proxy_type.app_error", "translation": "Invalid image proxy type. Must be 'local' or 'atmos/camo'." }, + { + "id": "model.config.is_valid.import.directory.app_error", + "translation": "Invalid value for Directory." + }, + { + "id": "model.config.is_valid.import.retention_days_too_low.app_error", + "translation": "Invalid value for RetentionDays. Value is too low." + }, { "id": "model.config.is_valid.ldap_basedn", "translation": "AD/LDAP field \"BaseDN\" is required." diff --git a/imports/placeholder.go b/imports/placeholder.go index d345cc270b5..2eb8b1b15c4 100644 --- a/imports/placeholder.go +++ b/imports/placeholder.go @@ -21,4 +21,7 @@ import ( // This is a placeholder so this package can be imported in Team Edition when it will be otherwise empty. _ "github.com/mattermost/mattermost-server/v5/jobs/product_notices" + + // This is a placeholder so this package can be imported in Team Edition when it will be otherwise empty. + _ "github.com/mattermost/mattermost-server/v5/jobs/import_process" ) diff --git a/jobs/import_process/worker.go b/jobs/import_process/worker.go new file mode 100644 index 00000000000..cd6fffb281b --- /dev/null +++ b/jobs/import_process/worker.go @@ -0,0 +1,196 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package import_process + +import ( + "io" + "io/ioutil" + "net/http" + "os" + "path/filepath" + "runtime" + "strconv" + + "github.com/mattermost/mattermost-server/v5/app" + "github.com/mattermost/mattermost-server/v5/jobs" + tjobs "github.com/mattermost/mattermost-server/v5/jobs/interfaces" + "github.com/mattermost/mattermost-server/v5/mlog" + "github.com/mattermost/mattermost-server/v5/model" + "github.com/mattermost/mattermost-server/v5/utils" +) + +func init() { + app.RegisterJobsImportProcessInterface(func(a *app.App) tjobs.ImportProcessInterface { + return &ImportProcessInterfaceImpl{a} + }) +} + +type ImportProcessInterfaceImpl struct { + app *app.App +} + +type ImportProcessWorker struct { + name string + stopChan chan struct{} + stoppedChan chan struct{} + jobsChan chan model.Job + jobServer *jobs.JobServer + app *app.App +} + +func (i *ImportProcessInterfaceImpl) MakeWorker() model.Worker { + return &ImportProcessWorker{ + name: "ImportProcess", + stopChan: make(chan struct{}), + stoppedChan: make(chan struct{}), + jobsChan: make(chan model.Job), + jobServer: i.app.Srv().Jobs, + app: i.app, + } +} + +func (w *ImportProcessWorker) JobChannel() chan<- model.Job { + return w.jobsChan +} + +func (w *ImportProcessWorker) Run() { + mlog.Debug("Worker started", mlog.String("worker", w.name)) + + defer func() { + mlog.Debug("Worker finished", mlog.String("worker", w.name)) + close(w.stoppedChan) + }() + + for { + select { + case <-w.stopChan: + mlog.Debug("Worker received stop signal", mlog.String("worker", w.name)) + return + case job := <-w.jobsChan: + mlog.Debug("Worker received a new candidate job.", mlog.String("worker", w.name)) + w.doJob(&job) + } + } +} + +func (w *ImportProcessWorker) Stop() { + mlog.Debug("Worker stopping", mlog.String("worker", w.name)) + close(w.stopChan) + <-w.stoppedChan +} + +func (w *ImportProcessWorker) doJob(job *model.Job) { + if claimed, err := w.jobServer.ClaimJob(job); err != nil { + mlog.Warn("Worker experienced an error while trying to claim job", + mlog.String("worker", w.name), + mlog.String("job_id", job.Id), + mlog.String("error", err.Error())) + return + } else if !claimed { + return + } + + importFileName, ok := job.Data["import_file"] + if !ok { + appError := model.NewAppError("ImportProcessWorker", "import_process.worker.do_job.missing_file", nil, "", http.StatusBadRequest) + w.setJobError(job, appError) + return + } + + importFilePath := filepath.Join(*w.app.Config().ImportSettings.Directory, importFileName) + if ok, err := w.app.FileExists(importFilePath); err != nil { + w.setJobError(job, err) + return + } else if !ok { + appError := model.NewAppError("ImportProcessWorker", "import_process.worker.do_job.file_exists", nil, "", http.StatusBadRequest) + w.setJobError(job, appError) + return + } + + importFileSize, appErr := w.app.FileSize(importFilePath) + if appErr != nil { + w.setJobError(job, appErr) + return + } + + importFile, appErr := w.app.FileReader(importFilePath) + if appErr != nil { + w.setJobError(job, appErr) + return + } + defer importFile.Close() + + // TODO (MM-30187): improve this process by eliminating the need to unzip the import + // file locally and instead do the whole bulk import process in memory by + // streaming the import file. + + // create a temporary dir to extract the zipped import file. + dir, err := ioutil.TempDir("", "import") + if err != nil { + appError := model.NewAppError("ImportProcessWorker", "import_process.worker.do_job.tmp_dir", nil, err.Error(), http.StatusInternalServerError) + w.setJobError(job, appError) + return + } + defer os.RemoveAll(dir) + + // extract the contents of the zipped file. + paths, err := utils.UnzipToPath(importFile.(io.ReaderAt), importFileSize, dir) + if err != nil { + appError := model.NewAppError("ImportProcessWorker", "import_process.worker.do_job.unzip", nil, err.Error(), http.StatusInternalServerError) + w.setJobError(job, appError) + return + } + + // find JSONL import file. + var jsonFilePath string + for _, path := range paths { + if filepath.Ext(path) == ".jsonl" { + jsonFilePath = path + break + } + } + + if jsonFilePath == "" { + appError := model.NewAppError("ImportProcessWorker", "import_process.worker.do_job.missing_jsonl", nil, "", http.StatusBadRequest) + w.setJobError(job, appError) + return + } + + jsonFile, err := os.Open(jsonFilePath) + if err != nil { + appError := model.NewAppError("ImportProcessWorker", "import_process.worker.do_job.open_file", nil, err.Error(), http.StatusInternalServerError) + w.setJobError(job, appError) + return + } + + // do the actual import. + appErr, lineNumber := w.app.BulkImportWithPath(jsonFile, false, runtime.NumCPU(), dir) + if appErr != nil { + job.Data["line_number"] = strconv.Itoa(lineNumber) + w.setJobError(job, appErr) + return + } + + // remove import file when done. + if appErr := w.app.RemoveFile(importFilePath); appErr != nil { + w.setJobError(job, appErr) + return + } + + mlog.Info("Worker: Job is complete", mlog.String("worker", w.name), mlog.String("job_id", job.Id)) + w.setJobSuccess(job) +} + +func (w *ImportProcessWorker) setJobSuccess(job *model.Job) { + if err := w.app.Srv().Jobs.SetJobSuccess(job); err != nil { + mlog.Error("Worker: Failed to set success for job", mlog.String("worker", w.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) + w.setJobError(job, err) + } +} + +func (w *ImportProcessWorker) setJobError(job *model.Job, appError *model.AppError) { + if err := w.app.Srv().Jobs.SetJobError(job, appError); err != nil { + mlog.Error("Worker: Failed to set job error", mlog.String("worker", w.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) + } +} diff --git a/jobs/interfaces/import_process_interface.go b/jobs/interfaces/import_process_interface.go new file mode 100644 index 00000000000..3be39974105 --- /dev/null +++ b/jobs/interfaces/import_process_interface.go @@ -0,0 +1,12 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package interfaces + +import ( + "github.com/mattermost/mattermost-server/v5/model" +) + +type ImportProcessInterface interface { + MakeWorker() model.Worker +} diff --git a/jobs/jobs_watcher.go b/jobs/jobs_watcher.go index 42f9eb8d1a9..748e9fca23a 100644 --- a/jobs/jobs_watcher.go +++ b/jobs/jobs_watcher.go @@ -149,6 +149,13 @@ func (watcher *Watcher) PollAndNotify() { default: } } + } else if job.Type == model.JOB_TYPE_IMPORT_PROCESS { + if watcher.workers.ImportProcess != nil { + select { + case watcher.workers.ImportProcess.JobChannel() <- *job: + default: + } + } } else if job.Type == model.JOB_TYPE_CLOUD { if watcher.workers.Cloud != nil { select { diff --git a/jobs/server.go b/jobs/server.go index 212a3b80a2e..f69e0b1e65a 100644 --- a/jobs/server.go +++ b/jobs/server.go @@ -28,6 +28,7 @@ type JobServer struct { ExpiryNotify tjobs.ExpiryNotifyJobInterface ProductNotices tjobs.ProductNoticesJobInterface ActiveUsers tjobs.ActiveUsersJobInterface + ImportProcess tjobs.ImportProcessInterface Cloud ejobs.CloudJobInterface } diff --git a/jobs/workers.go b/jobs/workers.go index 759f8950667..1b18a94c2ed 100644 --- a/jobs/workers.go +++ b/jobs/workers.go @@ -27,6 +27,7 @@ type Workers struct { ExpiryNotify model.Worker ProductNotices model.Worker ActiveUsers model.Worker + ImportProcess model.Worker Cloud model.Worker listenerId string @@ -82,6 +83,10 @@ func (srv *JobServer) InitWorkers() *Workers { workers.ProductNotices = productNoticesInterface.MakeWorker() } + if importProcessInterface := srv.ImportProcess; importProcessInterface != nil { + workers.ImportProcess = importProcessInterface.MakeWorker() + } + if cloudInterface := srv.Cloud; cloudInterface != nil { workers.Cloud = cloudInterface.MakeWorker() } @@ -137,6 +142,10 @@ func (workers *Workers) Start() *Workers { go workers.ProductNotices.Run() } + if workers.ImportProcess != nil { + go workers.ImportProcess.Run() + } + if workers.Cloud != nil { go workers.Cloud.Run() } @@ -245,10 +254,15 @@ func (workers *Workers) Stop() *Workers { if workers.ActiveUsers != nil { workers.ActiveUsers.Stop() } + if workers.ProductNotices != nil { workers.ProductNotices.Stop() } + if workers.ImportProcess != nil { + workers.ImportProcess.Stop() + } + if workers.Cloud != nil { workers.Cloud.Stop() } diff --git a/model/client4.go b/model/client4.go index a6b1f52eb87..a6a76a959bf 100644 --- a/model/client4.go +++ b/model/client4.go @@ -543,6 +543,10 @@ func (c *Client4) GetGroupSyncablesRoute(groupID string, syncableType GroupSynca return fmt.Sprintf("%s/%ss", c.GetGroupRoute(groupID), strings.ToLower(syncableType.String())) } +func (c *Client4) GetImportsRoute() string { + return "/imports" +} + func (c *Client4) DoApiGet(url string, etag string) (*http.Response, *AppError) { return c.DoApiRequest(http.MethodGet, c.ApiUrl+url, "", etag) } @@ -5754,6 +5758,15 @@ func (c *Client4) UpdateCloudCustomerAddress(address *Address) (*CloudCustomer, return customer, BuildResponse(r) } +func (c *Client4) ListImports() ([]string, *Response) { + r, err := c.DoApiGet(c.GetImportsRoute(), "") + if err != nil { + return nil, BuildErrorResponse(r, err) + } + defer closeBody(r) + return ArrayFromJson(r.Body), BuildResponse(r) +} + func (c *Client4) GetUserThreads(userId string, options GetUserThreadsOpts) (*Threads, *Response) { v := url.Values{} if options.Since != 0 { diff --git a/model/config.go b/model/config.go index 35f62567bf5..6fe05937346 100644 --- a/model/config.go +++ b/model/config.go @@ -117,6 +117,9 @@ const ( FILE_SETTINGS_DEFAULT_DIRECTORY = "./data/" + IMPORT_SETTINGS_DEFAULT_DIRECTORY = "./import" + IMPORT_SETTINGS_DEFAULT_RETENTION_DAYS = 30 + EMAIL_SETTINGS_DEFAULT_FEEDBACK_ORGANIZATION = "" SUPPORT_SETTINGS_DEFAULT_TERMS_OF_SERVICE_LINK = "https://about.mattermost.com/default-terms/" @@ -2854,6 +2857,37 @@ func (s *ImageProxySettings) SetDefaults(ss ServiceSettings) { } } +// ImportSettings defines configuration settings for file imports. +type ImportSettings struct { + // The directory where to store the imported files. + Directory *string + // The number of days to retain the imported files before deleting them. + RetentionDays *int +} + +func (s *ImportSettings) isValid() *AppError { + if *s.Directory == "" { + return NewAppError("Config.IsValid", "model.config.is_valid.import.directory.app_error", nil, "", http.StatusBadRequest) + } + + if *s.RetentionDays <= 0 { + return NewAppError("Config.IsValid", "model.config.is_valid.import.retention_days_too_low.app_error", nil, "", http.StatusBadRequest) + } + + return nil +} + +// SetDefaults applies the default settings to the struct. +func (s *ImportSettings) SetDefaults() { + if s.Directory == nil || *s.Directory == "" { + s.Directory = NewString(IMPORT_SETTINGS_DEFAULT_DIRECTORY) + } + + if s.RetentionDays == nil { + s.RetentionDays = NewInt(IMPORT_SETTINGS_DEFAULT_RETENTION_DAYS) + } +} + type ConfigFunc func() *Config const ConfigAccessTagType = "access" @@ -2929,6 +2963,7 @@ type Config struct { ImageProxySettings ImageProxySettings CloudSettings CloudSettings FeatureFlags *FeatureFlags `json:",omitempty"` + ImportSettings ImportSettings } func (o *Config) Clone() *Config { @@ -3032,6 +3067,7 @@ func (o *Config) SetDefaults() { o.FeatureFlags = &FeatureFlags{} o.FeatureFlags.SetDefaults() } + o.ImportSettings.SetDefaults() } func (o *Config) IsValid() *AppError { @@ -3110,6 +3146,10 @@ func (o *Config) IsValid() *AppError { if err := o.ImageProxySettings.isValid(); err != nil { return err } + + if err := o.ImportSettings.isValid(); err != nil { + return err + } return nil } diff --git a/model/config_test.go b/model/config_test.go index eda49930075..b8142da37aa 100644 --- a/model/config_test.go +++ b/model/config_test.go @@ -1389,3 +1389,31 @@ func TestSetDefaultFeatureFlagBehaviour(t *testing.T) { require.Equal(t, "somevalue", cfg.FeatureFlags.TestFeature) } + +func TestConfigImportSettingsDefaults(t *testing.T) { + cfg := Config{} + cfg.SetDefaults() + + require.Equal(t, "./import", *cfg.ImportSettings.Directory) + require.Equal(t, 30, *cfg.ImportSettings.RetentionDays) +} + +func TestConfigImportSettingsIsValid(t *testing.T) { + cfg := Config{} + cfg.SetDefaults() + + err := cfg.ImportSettings.isValid() + require.Nil(t, err) + + *cfg.ImportSettings.Directory = "" + err = cfg.ImportSettings.isValid() + require.NotNil(t, err) + require.Equal(t, "model.config.is_valid.import.directory.app_error", err.Id) + + cfg.SetDefaults() + + *cfg.ImportSettings.RetentionDays = 0 + err = cfg.ImportSettings.isValid() + require.NotNil(t, err) + require.Equal(t, "model.config.is_valid.import.retention_days_too_low.app_error", err.Id) +} diff --git a/model/job.go b/model/job.go index 072bfb2bac5..b6c053a42df 100644 --- a/model/job.go +++ b/model/job.go @@ -22,6 +22,7 @@ const ( JOB_TYPE_EXPIRY_NOTIFY = "expiry_notify" JOB_TYPE_PRODUCT_NOTICES = "product_notices" JOB_TYPE_ACTIVE_USERS = "active_users" + JOB_TYPE_IMPORT_PROCESS = "import_process" JOB_TYPE_CLOUD = "cloud" JOB_STATUS_PENDING = "pending" @@ -66,6 +67,7 @@ func (j *Job) IsValid() *AppError { case JOB_TYPE_PRODUCT_NOTICES: case JOB_TYPE_EXPIRY_NOTIFY: case JOB_TYPE_ACTIVE_USERS: + case JOB_TYPE_IMPORT_PROCESS: case JOB_TYPE_CLOUD: default: return NewAppError("Job.IsValid", "model.job.is_valid.type.app_error", nil, "id="+j.Id, http.StatusBadRequest) diff --git a/model/upload_session.go b/model/upload_session.go index 663ee0b1746..05994b9e935 100644 --- a/model/upload_session.go +++ b/model/upload_session.go @@ -18,6 +18,9 @@ const ( UploadTypeImport UploadType = "import" ) +// UploadNoUserID is a "fake" user id used by the API layer when in local mode. +const UploadNoUserID = "nouser" + // UploadSession contains information used to keep track of a file upload. type UploadSession struct { // The unique identifier for the session. @@ -29,7 +32,7 @@ type UploadSession struct { // The id of the user performing the upload. UserId string `json:"user_id"` // The id of the channel to upload to. - ChannelId string `json:"channel_id"` + ChannelId string `json:"channel_id,omitempty"` // The name of the file to upload. Filename string `json:"filename"` // The path where the file is stored. @@ -109,7 +112,7 @@ func (us *UploadSession) IsValid() *AppError { return NewAppError("UploadSession.IsValid", "model.upload_session.is_valid.type.app_error", nil, err.Error(), http.StatusBadRequest) } - if !IsValidId(us.UserId) { + if !IsValidId(us.UserId) && us.UserId != UploadNoUserID { return NewAppError("UploadSession.IsValid", "model.upload_session.is_valid.user_id.app_error", nil, "id="+us.Id, http.StatusBadRequest) } diff --git a/services/filesstore/filesstore.go b/services/filesstore/filesstore.go index 7ca4c1c2d85..32335102852 100644 --- a/services/filesstore/filesstore.go +++ b/services/filesstore/filesstore.go @@ -21,6 +21,7 @@ type FileBackend interface { Reader(path string) (ReadCloseSeeker, *model.AppError) ReadFile(path string) ([]byte, *model.AppError) FileExists(path string) (bool, *model.AppError) + FileSize(path string) (int64, *model.AppError) CopyFile(oldPath, newPath string) *model.AppError MoveFile(oldPath, newPath string) *model.AppError WriteFile(fr io.Reader, path string) (int64, *model.AppError) diff --git a/services/filesstore/filesstore_test.go b/services/filesstore/filesstore_test.go index 3bb5f2298ff..c4ae492e40d 100644 --- a/services/filesstore/filesstore_test.go +++ b/services/filesstore/filesstore_test.go @@ -7,6 +7,7 @@ import ( "bytes" "fmt" "io/ioutil" + "math/rand" "os" "testing" @@ -365,6 +366,28 @@ func (s *FileBackendTestSuite) TestAppendFile() { }) } +func (s *FileBackendTestSuite) TestFileSize() { + s.Run("nonexistent file", func() { + size, err := s.backend.FileSize("tests/nonexistentfile") + s.NotNil(err) + s.Zero(size) + }) + + s.Run("valid file", func() { + data := make([]byte, rand.Intn(1024*1024)+1) + path := "tests/" + model.NewId() + + written, err := s.backend.WriteFile(bytes.NewReader(data), path) + s.Nil(err) + s.EqualValues(len(data), written) + defer s.backend.RemoveFile(path) + + size, err := s.backend.FileSize(path) + s.Nil(err) + s.Equal(int64(len(data)), size) + }) +} + func BenchmarkS3WriteFile(b *testing.B) { utils.TranslationsPreInit() diff --git a/services/filesstore/localstore.go b/services/filesstore/localstore.go index 7ecbf669f3f..0ebd7685450 100644 --- a/services/filesstore/localstore.go +++ b/services/filesstore/localstore.go @@ -63,6 +63,14 @@ func (b *LocalFileBackend) FileExists(path string) (bool, *model.AppError) { return true, nil } +func (b *LocalFileBackend) FileSize(path string) (int64, *model.AppError) { + info, err := os.Stat(filepath.Join(b.directory, path)) + if err != nil { + return 0, model.NewAppError("FileSize", "api.file.file_size.local.app_error", nil, err.Error(), http.StatusInternalServerError) + } + return info.Size(), nil +} + func (b *LocalFileBackend) CopyFile(oldPath, newPath string) *model.AppError { if err := utils.CopyFile(filepath.Join(b.directory, oldPath), filepath.Join(b.directory, newPath)); err != nil { return model.NewAppError("copyFile", "api.file.move_file.rename.app_error", nil, err.Error(), http.StatusInternalServerError) diff --git a/services/filesstore/s3store.go b/services/filesstore/s3store.go index f0348bda65a..687e228adab 100644 --- a/services/filesstore/s3store.go +++ b/services/filesstore/s3store.go @@ -195,6 +195,17 @@ func (b *S3FileBackend) FileExists(path string) (bool, *model.AppError) { return false, model.NewAppError("FileExists", "api.file.file_exists.s3.app_error", nil, err.Error(), http.StatusInternalServerError) } +func (b *S3FileBackend) FileSize(path string) (int64, *model.AppError) { + path = filepath.Join(b.pathPrefix, path) + + info, err := b.client.StatObject(context.Background(), b.bucket, path, s3.StatObjectOptions{}) + if err != nil { + return 0, model.NewAppError("FileSize", "api.file.file_size.s3.app_error", nil, err.Error(), http.StatusInternalServerError) + } + + return info.Size, nil +} + func (b *S3FileBackend) CopyFile(oldPath, newPath string) *model.AppError { oldPath = filepath.Join(b.pathPrefix, oldPath) newPath = filepath.Join(b.pathPrefix, newPath) diff --git a/store/sqlstore/upload_session_store.go b/store/sqlstore/upload_session_store.go index bc7fe4c5188..e8baed02eb0 100644 --- a/store/sqlstore/upload_session_store.go +++ b/store/sqlstore/upload_session_store.go @@ -93,9 +93,6 @@ func (us SqlUploadSessionStore) Get(id string) (*model.UploadSession, error) { } func (us SqlUploadSessionStore) GetForUser(userId string) ([]*model.UploadSession, error) { - if !model.IsValidId(userId) { - return nil, errors.New("SqlUploadSessionStore.GetForUser: userId is not valid") - } query := us.getQueryBuilder(). Select("*"). From("UploadSessions"). diff --git a/store/storetest/upload_session_store.go b/store/storetest/upload_session_store.go index df946052eb9..0b9c4c0e3fb 100644 --- a/store/storetest/upload_session_store.go +++ b/store/storetest/upload_session_store.go @@ -145,12 +145,6 @@ func testUploadSessionStoreGetForUser(t *testing.T, ss store.Store) { }, } - t.Run("getting invalid userId should fail", func(t *testing.T) { - us, err := ss.UploadSession().GetForUser("invalidId") - require.Error(t, err) - require.Nil(t, us) - }) - t.Run("should return no sessions", func(t *testing.T) { us, err := ss.UploadSession().GetForUser(userId) require.NoError(t, err) diff --git a/tests/import_test.zip b/tests/import_test.zip new file mode 100644 index 00000000000..79fe1fab278 Binary files /dev/null and b/tests/import_test.zip differ diff --git a/tests/testarchive.zip b/tests/testarchive.zip new file mode 100644 index 00000000000..f77b5dae011 Binary files /dev/null and b/tests/testarchive.zip differ diff --git a/utils/archive.go b/utils/archive.go new file mode 100644 index 00000000000..cf8d740e4cf --- /dev/null +++ b/utils/archive.go @@ -0,0 +1,65 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package utils + +import ( + "archive/zip" + "fmt" + "io" + "os" + "path/filepath" + "strings" +) + +func sanitizePath(p string) string { + dir := strings.ReplaceAll(filepath.Dir(filepath.Clean(p)), "..", "") + base := filepath.Base(p) + if strings.Count(base, ".") == len(base) { + return "" + } + return filepath.Join(dir, base) +} + +// UnzipToPath extracts a given zip archive into a given path. +// It returns a list of extracted paths. +func UnzipToPath(zipFile io.ReaderAt, size int64, outPath string) ([]string, error) { + rd, err := zip.NewReader(zipFile, size) + if err != nil { + return nil, fmt.Errorf("failed to create reader: %w", err) + } + + paths := make([]string, len(rd.File)) + for i, f := range rd.File { + filePath := sanitizePath(f.Name) + if filePath == "" { + return nil, fmt.Errorf("invalid filepath `%s`", f.Name) + } + path := filepath.Join(outPath, filePath) + paths[i] = path + if f.FileInfo().IsDir() { + if err := os.Mkdir(path, 0744); err != nil { + return nil, fmt.Errorf("failed to create directory: %w", err) + } + continue + } + + outFile, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0600) + if err != nil { + return nil, fmt.Errorf("failed to create file: %w", err) + } + defer outFile.Close() + + file, err := f.Open() + if err != nil { + return nil, fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + + if _, err := io.Copy(outFile, file); err != nil { + return nil, fmt.Errorf("failed to write to file: %w", err) + } + } + + return paths, nil +} diff --git a/utils/archive_test.go b/utils/archive_test.go new file mode 100644 index 00000000000..f19a477c8bc --- /dev/null +++ b/utils/archive_test.go @@ -0,0 +1,149 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package utils + +import ( + "archive/zip" + "errors" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/mattermost/mattermost-server/v5/utils/fileutils" + + "github.com/stretchr/testify/require" +) + +func TestSanitizePath(t *testing.T) { + cases := []struct { + input string + expected string + }{ + { + ".", + "", + }, + { + "../", + "", + }, + { + "...", + "", + }, + { + "..//.", + "", + }, + { + "/../", + "", + }, + { + "/path/...../to/file", + "/path/to/file", + }, + { + "/path/to/file...", + "/path/to/file...", + }, + { + "/path/to/../../../file", + "/file", + }, + { + "../../../../file", + "/file", + }, + { + "/path/to/file..ext", + "/path/to/file..ext", + }, + { + "/path/to/...file..ext", + "/path/to/...file..ext", + }, + { + "./path/to/...file..ext", + "path/to/...file..ext", + }, + { + "./...file", + "...file", + }, + { + "path/", + "path", + }, + } + + for _, c := range cases { + t.Run(c.input, func(t *testing.T) { + require.Equal(t, c.expected, sanitizePath(c.input)) + }) + } +} + +func TestUnzipToPath(t *testing.T) { + testDir, _ := fileutils.FindDir("tests") + require.NotEmpty(t, testDir) + + dir, err := ioutil.TempDir("", "unzip") + require.Nil(t, err) + defer os.RemoveAll(dir) + + t.Run("invalid archive", func(t *testing.T) { + file, err := os.Open(testDir + "/testplugin.tar.gz") + require.Nil(t, err) + defer file.Close() + + info, err := file.Stat() + require.Nil(t, err) + + paths, err := UnzipToPath(file, info.Size(), dir) + require.NotNil(t, err) + require.True(t, errors.Is(err, zip.ErrFormat)) + require.Nil(t, paths) + }) + + t.Run("valid archive", func(t *testing.T) { + file, err := os.Open(testDir + "/testarchive.zip") + require.Nil(t, err) + defer file.Close() + + info, err := file.Stat() + require.Nil(t, err) + + paths, err := UnzipToPath(file, info.Size(), dir) + require.Nil(t, err) + require.NotEmpty(t, paths) + + expectedFiles := map[string]int64{ + dir + "/testfile.txt": 446, + dir + "/testdir/testfile2.txt": 866, + dir + "/testdir2/testfile3.txt": 845, + } + + expectedDirs := []string{ + dir + "/testdir", + dir + "/testdir2", + } + + err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + require.Nil(t, err) + if path == dir { + return nil + } + require.Contains(t, paths, path) + if info.IsDir() { + require.Contains(t, expectedDirs, path) + } else { + require.Equal(t, expectedFiles[path], info.Size()) + } + return nil + }) + require.Nil(t, err) + }) +}