[MM-28422] Enable processing of import files through API (#16062)

* Implement unzip function

* Implement FileSize method

* Implement path rewriting for bulk import

* Small improvements

* Add ImportSettings to config

* Implement ListImports API endpoint

* Enable uploading import files

* Implement import process job

* Add missing license headers

* Address reviews

* Make path sanitization a bit smarter

* Clean path before calculating Dir

* [MM-30008] Add mmctl support for file imports (#16301)

* Add mmctl support for import files

* Improve test

* Remove unnecessary handlers

* Use th.TestForSystemAdminAndLocal

* Make nouser id a constant
This commit is contained in:
Claudio Costa 2020-12-03 11:38:00 +01:00 committed by GitHub
parent f733ee9332
commit df906cad9d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
40 changed files with 1205 additions and 36 deletions

View file

@ -123,6 +123,8 @@ type Routes struct {
Groups *mux.Router // 'api/v4/groups'
Cloud *mux.Router // 'api/v4/cloud'
Imports *mux.Router // 'api/v4/imports'
}
type API struct {
@ -235,6 +237,8 @@ func Init(configservice configservice.ConfigService, globalOptionsFunc app.AppOp
api.BaseRoutes.Cloud = api.BaseRoutes.ApiRoot.PathPrefix("/cloud").Subrouter()
api.BaseRoutes.Imports = api.BaseRoutes.ApiRoot.PathPrefix("/imports").Subrouter()
api.InitUser()
api.InitBot()
api.InitTeam()
@ -271,6 +275,7 @@ func Init(configservice configservice.ConfigService, globalOptionsFunc app.AppOp
api.InitGroup()
api.InitAction()
api.InitCloud()
api.InitImport()
root.Handle("/api/v4/{anything:.*}", http.HandlerFunc(api.Handle404))
@ -335,6 +340,13 @@ func InitLocal(configservice configservice.ConfigService, globalOptionsFunc app.
api.BaseRoutes.Roles = api.BaseRoutes.ApiRoot.PathPrefix("/roles").Subrouter()
api.BaseRoutes.Uploads = api.BaseRoutes.ApiRoot.PathPrefix("/uploads").Subrouter()
api.BaseRoutes.Upload = api.BaseRoutes.Uploads.PathPrefix("/{upload_id:[A-Za-z0-9]+}").Subrouter()
api.BaseRoutes.Imports = api.BaseRoutes.ApiRoot.PathPrefix("/imports").Subrouter()
api.BaseRoutes.Jobs = api.BaseRoutes.ApiRoot.PathPrefix("/jobs").Subrouter()
api.InitUserLocal()
api.InitTeamLocal()
api.InitChannelLocal()
@ -349,6 +361,9 @@ func InitLocal(configservice configservice.ConfigService, globalOptionsFunc app.
api.InitSystemLocal()
api.InitPostLocal()
api.InitRoleLocal()
api.InitUploadLocal()
api.InitImportLocal()
api.InitJobLocal()
root.Handle("/api/v4/{anything:.*}", http.HandlerFunc(api.Handle404))

36
api4/import.go Normal file
View file

@ -0,0 +1,36 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package api4
import (
"encoding/json"
"net/http"
"github.com/mattermost/mattermost-server/v5/model"
)
func (api *API) InitImport() {
api.BaseRoutes.Imports.Handle("", api.ApiSessionRequired(listImports)).Methods("GET")
}
func listImports(c *Context, w http.ResponseWriter, r *http.Request) {
if !c.IsSystemAdmin() {
c.SetPermissionError(model.PERMISSION_MANAGE_SYSTEM)
return
}
imports, appErr := c.App.ListImports()
if appErr != nil {
c.Err = appErr
return
}
data, err := json.Marshal(imports)
if err != nil {
c.Err = model.NewAppError("listImports", "app.import.marshal.app_error", nil, err.Error(), http.StatusInternalServerError)
return
}
w.Write(data)
}

8
api4/import_local.go Normal file
View file

@ -0,0 +1,8 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package api4
func (api *API) InitImportLocal() {
api.BaseRoutes.Imports.Handle("", api.ApiLocal(listImports)).Methods("GET")
}

106
api4/import_test.go Normal file
View file

@ -0,0 +1,106 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package api4
import (
"os"
"path/filepath"
"testing"
"github.com/mattermost/mattermost-server/v5/model"
"github.com/mattermost/mattermost-server/v5/utils/fileutils"
"github.com/stretchr/testify/require"
)
func TestListImports(t *testing.T) {
th := Setup(t)
defer th.TearDown()
testsDir, _ := fileutils.FindDir("tests")
require.NotEmpty(t, testsDir)
uploadNewImport := func(c *model.Client4, t *testing.T) string {
file, err := os.Open(testsDir + "/import_test.zip")
require.Nil(t, err)
info, err := file.Stat()
require.Nil(t, err)
us := &model.UploadSession{
Filename: info.Name(),
FileSize: info.Size(),
Type: model.UploadTypeImport,
}
if c == th.LocalClient {
us.UserId = model.UploadNoUserID
}
u, resp := c.CreateUpload(us)
require.Nil(t, resp.Error)
require.NotNil(t, u)
finfo, resp := c.UploadData(u.Id, file)
require.Nil(t, resp.Error)
require.NotNil(t, finfo)
return u.Id
}
t.Run("no permissions", func(t *testing.T) {
imports, resp := th.Client.ListImports()
require.Error(t, resp.Error)
require.Equal(t, "api.context.permissions.app_error", resp.Error.Id)
require.Nil(t, imports)
})
dataDir, found := fileutils.FindDir("data")
require.True(t, found)
th.TestForSystemAdminAndLocal(t, func(t *testing.T, c *model.Client4) {
imports, resp := c.ListImports()
require.Nil(t, resp.Error)
require.Empty(t, imports)
}, "no imports")
th.TestForSystemAdminAndLocal(t, func(t *testing.T, c *model.Client4) {
id := uploadNewImport(c, t)
id2 := uploadNewImport(c, t)
importDir := filepath.Join(dataDir, "import")
f, err := os.Create(filepath.Join(importDir, "import.zip.tmp"))
require.Nil(t, err)
f.Close()
imports, resp := c.ListImports()
require.Nil(t, resp.Error)
require.NotEmpty(t, imports)
require.Len(t, imports, 2)
require.Contains(t, imports, id+"_import_test.zip")
require.Contains(t, imports, id2+"_import_test.zip")
require.Nil(t, os.RemoveAll(importDir))
}, "expected imports")
th.TestForSystemAdminAndLocal(t, func(t *testing.T, c *model.Client4) {
th.App.UpdateConfig(func(cfg *model.Config) { *cfg.ImportSettings.Directory = "import_new" })
defer th.App.UpdateConfig(func(cfg *model.Config) { *cfg.ImportSettings.Directory = "import" })
importDir := filepath.Join(dataDir, "import_new")
imports, resp := c.ListImports()
require.Nil(t, resp.Error)
require.Empty(t, imports)
id := uploadNewImport(c, t)
imports, resp = c.ListImports()
require.Nil(t, resp.Error)
require.NotEmpty(t, imports)
require.Len(t, imports, 1)
require.Equal(t, id+"_import_test.zip", imports[0])
require.Nil(t, os.RemoveAll(importDir))
}, "change import directory")
}

12
api4/job_local.go Normal file
View file

@ -0,0 +1,12 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package api4
func (api *API) InitJobLocal() {
api.BaseRoutes.Jobs.Handle("", api.ApiLocal(getJobs)).Methods("GET")
api.BaseRoutes.Jobs.Handle("", api.ApiLocal(createJob)).Methods("POST")
api.BaseRoutes.Jobs.Handle("/{job_id:[A-Za-z0-9]+}", api.ApiLocal(getJob)).Methods("GET")
api.BaseRoutes.Jobs.Handle("/{job_id:[A-Za-z0-9]+}/cancel", api.ApiLocal(cancelJob)).Methods("POST")
api.BaseRoutes.Jobs.Handle("/type/{job_type:[A-Za-z0-9_-]+}", api.ApiLocal(getJobsByType)).Methods("GET")
}

View file

@ -37,14 +37,23 @@ func createUpload(c *Context, w http.ResponseWriter, r *http.Request) {
defer c.LogAuditRec(auditRec)
auditRec.AddMeta("upload", us)
if !c.App.SessionHasPermissionToChannel(*c.App.Session(), us.ChannelId, model.PERMISSION_UPLOAD_FILE) {
c.SetPermissionError(model.PERMISSION_UPLOAD_FILE)
return
if us.Type == model.UploadTypeImport {
if !c.IsSystemAdmin() {
c.SetPermissionError(model.PERMISSION_MANAGE_SYSTEM)
return
}
} else {
if !c.App.SessionHasPermissionToChannel(*c.App.Session(), us.ChannelId, model.PERMISSION_UPLOAD_FILE) {
c.SetPermissionError(model.PERMISSION_UPLOAD_FILE)
return
}
us.Type = model.UploadTypeAttachment
}
us.Id = model.NewId()
us.Type = model.UploadTypeAttachment
us.UserId = c.App.Session().UserId
if c.App.Session().UserId != "" {
us.UserId = c.App.Session().UserId
}
us, err := c.App.CreateUploadSession(us)
if err != nil {
c.Err = err
@ -68,7 +77,7 @@ func getUpload(c *Context, w http.ResponseWriter, r *http.Request) {
return
}
if us.UserId != c.App.Session().UserId {
if us.UserId != c.App.Session().UserId && !c.IsSystemAdmin() {
c.Err = model.NewAppError("getUpload", "api.upload.get_upload.forbidden.app_error", nil, "", http.StatusForbidden)
return
}
@ -98,9 +107,16 @@ func uploadData(c *Context, w http.ResponseWriter, r *http.Request) {
return
}
if us.UserId != c.App.Session().UserId || !c.App.SessionHasPermissionToChannel(*c.App.Session(), us.ChannelId, model.PERMISSION_UPLOAD_FILE) {
c.SetPermissionError(model.PERMISSION_UPLOAD_FILE)
return
if us.Type == model.UploadTypeImport {
if !c.IsSystemAdmin() {
c.SetPermissionError(model.PERMISSION_MANAGE_SYSTEM)
return
}
} else {
if us.UserId != c.App.Session().UserId || !c.App.SessionHasPermissionToChannel(*c.App.Session(), us.ChannelId, model.PERMISSION_UPLOAD_FILE) {
c.SetPermissionError(model.PERMISSION_UPLOAD_FILE)
return
}
}
boundary, parseErr := parseMultipartRequestHeader(r)

10
api4/upload_local.go Normal file
View file

@ -0,0 +1,10 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package api4
func (api *API) InitUploadLocal() {
api.BaseRoutes.Uploads.Handle("", api.ApiLocal(createUpload)).Methods("POST")
api.BaseRoutes.Upload.Handle("", api.ApiLocal(getUpload)).Methods("GET")
api.BaseRoutes.Upload.Handle("", api.ApiLocal(uploadData)).Methods("POST")
}

View file

@ -8,9 +8,11 @@ import (
"io"
"mime/multipart"
"net/http"
"os"
"testing"
"github.com/mattermost/mattermost-server/v5/model"
"github.com/mattermost/mattermost-server/v5/utils/fileutils"
"github.com/stretchr/testify/require"
)
@ -51,6 +53,41 @@ func TestCreateUpload(t *testing.T) {
require.NotEmpty(t, u)
require.Equal(t, http.StatusCreated, resp.StatusCode)
})
t.Run("import file", func(t *testing.T) {
testsDir, _ := fileutils.FindDir("tests")
importFile, err := os.Open(testsDir + "/import_test.zip")
require.Nil(t, err)
defer importFile.Close()
info, err := importFile.Stat()
require.Nil(t, err)
t.Run("permissions error", func(t *testing.T) {
us := &model.UploadSession{
Filename: info.Name(),
FileSize: info.Size(),
Type: model.UploadTypeImport,
}
u, resp := th.Client.CreateUpload(us)
require.Nil(t, u)
require.Error(t, resp.Error)
require.Equal(t, "api.context.permissions.app_error", resp.Error.Id)
require.Equal(t, http.StatusForbidden, resp.StatusCode)
})
t.Run("success", func(t *testing.T) {
us := &model.UploadSession{
Filename: info.Name(),
FileSize: info.Size(),
Type: model.UploadTypeImport,
}
u, resp := th.SystemAdminClient.CreateUpload(us)
require.Nil(t, resp.Error)
require.NotEmpty(t, u)
})
})
}
func TestGetUpload(t *testing.T) {

View file

@ -38,6 +38,8 @@ func (api *API) InitUserLocal() {
api.BaseRoutes.Users.Handle("/migrate_auth/ldap", api.ApiLocal(migrateAuthToLDAP)).Methods("POST")
api.BaseRoutes.Users.Handle("/migrate_auth/saml", api.ApiLocal(migrateAuthToSaml)).Methods("POST")
api.BaseRoutes.User.Handle("/uploads", api.ApiLocal(localGetUploadsForUser)).Methods("GET")
}
func localGetUsers(c *Context, w http.ResponseWriter, r *http.Request) {
@ -315,3 +317,13 @@ func localGetUserByEmail(c *Context, w http.ResponseWriter, r *http.Request) {
w.Header().Set(model.HEADER_ETAG_SERVER, etag)
w.Write([]byte(user.ToJson()))
}
func localGetUploadsForUser(c *Context, w http.ResponseWriter, r *http.Request) {
uss, err := c.App.GetUploadSessionsForUser(c.Params.UserId)
if err != nil {
c.Err = err
return
}
w.Write([]byte(model.UploadSessionsToJson(uss)))
}

View file

@ -115,6 +115,9 @@ func (a *App) initJobs() {
if productNoticesJobInterface != nil {
a.srv.Jobs.ProductNotices = productNoticesJobInterface(a)
}
if jobsImportProcessInterface != nil {
a.srv.Jobs.ImportProcess = jobsImportProcessInterface(a)
}
if jobsActiveUsersInterface != nil {
a.srv.Jobs.ActiveUsers = jobsActiveUsersInterface(a)

View file

@ -385,6 +385,7 @@ type AppIface interface {
BuildSamlMetadataObject(idpMetadata []byte) (*model.SamlMetadataResponse, *model.AppError)
BulkExport(writer io.Writer, file string, pathToEmojiDir string, dirNameToExportEmoji string) *model.AppError
BulkImport(fileReader io.Reader, dryRun bool, workers int) (*model.AppError, int)
BulkImportWithPath(fileReader io.Reader, dryRun bool, workers int, importPath string) (*model.AppError, int)
CancelJob(jobId string) *model.AppError
ChannelMembersToAdd(since int64, channelID *string) ([]*model.UserChannelIDPair, *model.AppError)
ChannelMembersToRemove(teamID *string) ([]*model.ChannelMember, *model.AppError)
@ -494,6 +495,7 @@ type AppIface interface {
FetchSamlMetadataFromIdp(url string) ([]byte, *model.AppError)
FileBackend() (filesstore.FileBackend, *model.AppError)
FileExists(path string) (bool, *model.AppError)
FileSize(path string) (int64, *model.AppError)
FillInChannelProps(channel *model.Channel) *model.AppError
FillInChannelsProps(channelList *model.ChannelList) *model.AppError
FilterUsersByVisible(viewer *model.User, otherUsers []*model.User) ([]*model.User, *model.AppError)
@ -770,6 +772,7 @@ type AppIface interface {
LimitedClientConfig() map[string]string
ListAllCommands(teamId string, T goi18n.TranslateFunc) ([]*model.Command, *model.AppError)
ListDirectory(path string) ([]string, *model.AppError)
ListImports() ([]string, *model.AppError)
ListPluginKeys(pluginId string, page, perPage int) ([]string, *model.AppError)
ListTeamCommands(teamId string) ([]*model.Command, *model.AppError)
Log() *mlog.Logger

View file

@ -108,6 +108,12 @@ func RegisterJobsExpiryNotifyJobInterface(f func(*App) tjobs.ExpiryNotifyJobInte
jobsExpiryNotifyInterface = f
}
var jobsImportProcessInterface func(*App) tjobs.ImportProcessInterface
func RegisterJobsImportProcessInterface(f func(*App) tjobs.ImportProcessInterface) {
jobsImportProcessInterface = f
}
var productNoticesJobInterface func(*App) tjobs.ProductNoticesJobInterface
func RegisterProductNoticesJobInterface(f func(*App) tjobs.ProductNoticesJobInterface) {

View file

@ -101,6 +101,14 @@ func (a *App) FileExists(path string) (bool, *model.AppError) {
return backend.FileExists(path)
}
func (a *App) FileSize(path string) (int64, *model.AppError) {
backend, err := a.FileBackend()
if err != nil {
return 0, err
}
return backend.FileSize(path)
}
func (a *App) MoveFile(oldPath, newPath string) *model.AppError {
backend, err := a.FileBackend()
if err != nil {

View file

@ -5,9 +5,11 @@ package app
import (
"bufio"
"bytes"
"encoding/json"
"io"
"net/http"
"path/filepath"
"strings"
"sync"
@ -29,6 +31,46 @@ func stopOnError(err LineImportWorkerError) bool {
return true
}
func rewriteAttachmentPaths(files *[]AttachmentImportData, basePath string) {
if files == nil {
return
}
for _, f := range *files {
if f.Path != nil {
*f.Path = filepath.Join(basePath, *f.Path)
}
}
}
func rewriteFilePaths(line *LineImportData, basePath string) {
switch line.Type {
case "post", "direct_post":
var replies []ReplyImportData
if line.Type == "direct_post" {
rewriteAttachmentPaths(line.DirectPost.Attachments, basePath)
if line.DirectPost.Replies != nil {
replies = *line.DirectPost.Replies
}
} else {
rewriteAttachmentPaths(line.Post.Attachments, basePath)
if line.Post.Replies != nil {
replies = *line.Post.Replies
}
}
for _, reply := range replies {
rewriteAttachmentPaths(reply.Attachments, basePath)
}
case "user":
if line.User.ProfileImage != nil {
*line.User.ProfileImage = filepath.Join(basePath, *line.User.ProfileImage)
}
case "emoji":
if line.Emoji.Image != nil {
*line.Emoji.Image = filepath.Join(basePath, *line.Emoji.Image)
}
}
}
func (a *App) bulkImportWorker(dryRun bool, wg *sync.WaitGroup, lines <-chan LineImportWorkerData, errors chan<- LineImportWorkerError) {
postLines := []LineImportWorkerData{}
directPostLines := []LineImportWorkerData{}
@ -77,6 +119,14 @@ func (a *App) bulkImportWorker(dryRun bool, wg *sync.WaitGroup, lines <-chan Lin
}
func (a *App) BulkImport(fileReader io.Reader, dryRun bool, workers int) (*model.AppError, int) {
return a.bulkImport(fileReader, dryRun, workers, "")
}
func (a *App) BulkImportWithPath(fileReader io.Reader, dryRun bool, workers int, importPath string) (*model.AppError, int) {
return a.bulkImport(fileReader, dryRun, workers, importPath)
}
func (a *App) bulkImport(fileReader io.Reader, dryRun bool, workers int, importPath string) (*model.AppError, int) {
scanner := bufio.NewScanner(fileReader)
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, maxScanTokenSize)
@ -92,7 +142,7 @@ func (a *App) BulkImport(fileReader io.Reader, dryRun bool, workers int) (*model
lastLineType := ""
for scanner.Scan() {
decoder := json.NewDecoder(strings.NewReader(scanner.Text()))
decoder := json.NewDecoder(bytes.NewReader(scanner.Bytes()))
lineNumber++
var line LineImportData
@ -100,6 +150,10 @@ func (a *App) BulkImport(fileReader io.Reader, dryRun bool, workers int) (*model
return model.NewAppError("BulkImport", "app.import.bulk_import.json_decode.error", nil, err.Error(), http.StatusBadRequest), lineNumber
}
if importPath != "" {
rewriteFilePaths(&line, importPath)
}
if lineNumber == 1 {
importDataFileVersion, appErr := processImportDataFileVersionLine(line)
if appErr != nil {
@ -214,3 +268,20 @@ func (a *App) importLine(line LineImportData, dryRun bool) *model.AppError {
return model.NewAppError("BulkImport", "app.import.import_line.unknown_line_type.error", map[string]interface{}{"Type": line.Type}, "", http.StatusBadRequest)
}
}
func (a *App) ListImports() ([]string, *model.AppError) {
imports, appErr := a.ListDirectory(*a.Config().ImportSettings.Directory)
if appErr != nil {
return nil, appErr
}
results := make([]string, 0, len(imports))
for i := 0; i < len(imports); i++ {
filename := filepath.Base(imports[i])
if !strings.HasSuffix(filename, incompleteUploadSuffix) {
results = append(results, filename)
}
}
return results, nil
}

View file

@ -532,6 +532,7 @@ func (a *App) importUser(data *UserImportData, dryRun bool) *model.AppError {
if err != nil {
mlog.Error("Unable to open the profile image.", mlog.Any("err", err))
}
defer file.Close()
if err := a.SetProfileImageFromMultiPartFile(savedUser.Id, file); err != nil {
mlog.Error("Unable to set the profile image from a file.", mlog.Any("err", err))
}
@ -1740,6 +1741,7 @@ func (a *App) importEmoji(data *EmojiImportData, dryRun bool) *model.AppError {
if err != nil {
return model.NewAppError("BulkImport", "app.import.emoji.bad_file.error", map[string]interface{}{"EmojiName": *data.Name}, "", http.StatusBadRequest)
}
defer file.Close()
if _, err := a.WriteFile(file, getEmojiImagePath(emoji.Id)); err != nil {
return err

View file

@ -4,8 +4,11 @@
package app
import (
"io/ioutil"
"net/http"
"os"
"path/filepath"
"runtime"
"strings"
"testing"
@ -13,6 +16,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/mattermost/mattermost-server/v5/model"
"github.com/mattermost/mattermost-server/v5/utils"
"github.com/mattermost/mattermost-server/v5/utils/fileutils"
)
@ -192,7 +196,7 @@ func TestImportBulkImport(t *testing.T) {
// Run bulk import using a valid and large input and a \r\n line break.
t.Run("", func(t *testing.T) {
posts := `{"type": "post"` + strings.Repeat(`, "post": {"team": "`+teamName+`", "channel": "`+channelName+`", "user": "`+username+`", "message": "Repeat after me", "create_at": 193456789012}`, 1E4) + "}"
posts := `{"type": "post"` + strings.Repeat(`, "post": {"team": "`+teamName+`", "channel": "`+channelName+`", "user": "`+username+`", "message": "Repeat after me", "create_at": 193456789012}`, 1e4) + "}"
data4 := `{"type": "version", "version": 1}
{"type": "team", "team": {"type": "O", "display_name": "lskmw2d7a5ao7ppwqh5ljchvr4", "name": "` + teamName + `"}}
{"type": "channel", "channel": {"type": "O", "display_name": "xr6m6udffngark2uekvr3hoeny", "team": "` + teamName + `", "name": "` + channelName + `"}}
@ -263,3 +267,124 @@ func AssertFileIdsInPost(files []*model.FileInfo, th *TestHelper, t *testing.T)
assert.Contains(t, posts[0].FileIds, file.Id)
}
}
func TestRewriteFilePaths(t *testing.T) {
genAttachments := func() *[]AttachmentImportData {
return &[]AttachmentImportData{
{
Path: model.NewString("file.jpg"),
},
{
Path: model.NewString("somedir/file.jpg"),
},
}
}
line := LineImportData{
Type: "post",
Post: &PostImportData{
Attachments: genAttachments(),
},
}
line2 := LineImportData{
Type: "direct_post",
DirectPost: &DirectPostImportData{
Attachments: genAttachments(),
},
}
userLine := LineImportData{
Type: "user",
User: &UserImportData{
ProfileImage: model.NewString("profile.jpg"),
},
}
emojiLine := LineImportData{
Type: "emoji",
Emoji: &EmojiImportData{
Image: model.NewString("emoji.png"),
},
}
t.Run("empty path", func(t *testing.T) {
expected := &[]AttachmentImportData{
{
Path: model.NewString("file.jpg"),
},
{
Path: model.NewString("somedir/file.jpg"),
},
}
rewriteFilePaths(&line, "")
require.Equal(t, expected, line.Post.Attachments)
rewriteFilePaths(&line2, "")
require.Equal(t, expected, line2.DirectPost.Attachments)
})
t.Run("valid path", func(t *testing.T) {
expected := &[]AttachmentImportData{
{
Path: model.NewString("/tmp/file.jpg"),
},
{
Path: model.NewString("/tmp/somedir/file.jpg"),
},
}
t.Run("post attachments", func(t *testing.T) {
rewriteFilePaths(&line, "/tmp")
require.Equal(t, expected, line.Post.Attachments)
})
t.Run("direct post attachments", func(t *testing.T) {
rewriteFilePaths(&line2, "/tmp")
require.Equal(t, expected, line2.DirectPost.Attachments)
})
t.Run("profile image", func(t *testing.T) {
expected := "/tmp/profile.jpg"
rewriteFilePaths(&userLine, "/tmp")
require.Equal(t, expected, *userLine.User.ProfileImage)
})
t.Run("emoji", func(t *testing.T) {
expected := "/tmp/emoji.png"
rewriteFilePaths(&emojiLine, "/tmp")
require.Equal(t, expected, *emojiLine.Emoji.Image)
})
})
}
func BenchmarkBulkImport(b *testing.B) {
th := Setup(b)
defer th.TearDown()
testsDir, _ := fileutils.FindDir("tests")
importFile, err := os.Open(testsDir + "/import_test.zip")
require.Nil(b, err)
defer importFile.Close()
info, err := importFile.Stat()
require.Nil(b, err)
dir, err := ioutil.TempDir("", "testimport")
require.Nil(b, err)
defer os.RemoveAll(dir)
_, err = utils.UnzipToPath(importFile, info.Size(), dir)
require.Nil(b, err)
jsonFile, err := os.Open(dir + "/import.jsonl")
require.Nil(b, err)
defer jsonFile.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err, _ := th.App.BulkImportWithPath(jsonFile, false, runtime.NumCPU(), dir)
require.Nil(b, err)
}
b.StopTimer()
}

View file

@ -922,6 +922,28 @@ func (a *OpenTracingAppLayer) BulkImport(fileReader io.Reader, dryRun bool, work
return resultVar0, resultVar1
}
func (a *OpenTracingAppLayer) BulkImportWithPath(fileReader io.Reader, dryRun bool, workers int, importPath string) (*model.AppError, int) {
origCtx := a.ctx
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.BulkImportWithPath")
a.ctx = newCtx
a.app.Srv().Store.SetContext(newCtx)
defer func() {
a.app.Srv().Store.SetContext(origCtx)
a.ctx = origCtx
}()
defer span.Finish()
resultVar0, resultVar1 := a.app.BulkImportWithPath(fileReader, dryRun, workers, importPath)
if resultVar0 != nil {
span.LogFields(spanlog.Error(resultVar0))
ext.Error.Set(span, true)
}
return resultVar0, resultVar1
}
func (a *OpenTracingAppLayer) CancelJob(jobId string) *model.AppError {
origCtx := a.ctx
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.CancelJob")
@ -3667,6 +3689,28 @@ func (a *OpenTracingAppLayer) FileReader(path string) (filesstore.ReadCloseSeeke
return resultVar0, resultVar1
}
func (a *OpenTracingAppLayer) FileSize(path string) (int64, *model.AppError) {
origCtx := a.ctx
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.FileSize")
a.ctx = newCtx
a.app.Srv().Store.SetContext(newCtx)
defer func() {
a.app.Srv().Store.SetContext(origCtx)
a.ctx = origCtx
}()
defer span.Finish()
resultVar0, resultVar1 := a.app.FileSize(path)
if resultVar1 != nil {
span.LogFields(spanlog.Error(resultVar1))
ext.Error.Set(span, true)
}
return resultVar0, resultVar1
}
func (a *OpenTracingAppLayer) FillInChannelProps(channel *model.Channel) *model.AppError {
origCtx := a.ctx
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.FillInChannelProps")
@ -10252,6 +10296,28 @@ func (a *OpenTracingAppLayer) ListDirectory(path string) ([]string, *model.AppEr
return resultVar0, resultVar1
}
func (a *OpenTracingAppLayer) ListImports() ([]string, *model.AppError) {
origCtx := a.ctx
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.ListImports")
a.ctx = newCtx
a.app.Srv().Store.SetContext(newCtx)
defer func() {
a.app.Srv().Store.SetContext(origCtx)
a.ctx = origCtx
}()
defer span.Finish()
resultVar0, resultVar1 := a.app.ListImports()
if resultVar1 != nil {
span.LogFields(spanlog.Error(resultVar1))
ext.Error.Set(span, true)
}
return resultVar0, resultVar1
}
func (a *OpenTracingAppLayer) ListPluginKeys(pluginId string, page int, perPage int) ([]string, *model.AppError) {
origCtx := a.ctx
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.ListPluginKeys")

View file

@ -19,6 +19,7 @@ import (
)
const minFirstPartSize = 5 * 1024 * 1024 // 5MB
const incompleteUploadSuffix = ".tmp"
func (a *App) runPluginsHook(info *model.FileInfo, file io.Reader) *model.AppError {
pluginsEnvironment := a.GetPluginsEnvironment()
@ -108,19 +109,25 @@ func (a *App) CreateUploadSession(us *model.UploadSession) (*model.UploadSession
us.FileOffset = 0
now := time.Now()
us.CreateAt = model.GetMillisForTime(now)
us.Path = now.Format("20060102") + "/teams/noteam/channels/" + us.ChannelId + "/users/" + us.UserId + "/" + us.Id + "/" + filepath.Base(us.Filename)
if us.Type == model.UploadTypeAttachment {
us.Path = now.Format("20060102") + "/teams/noteam/channels/" + us.ChannelId + "/users/" + us.UserId + "/" + us.Id + "/" + filepath.Base(us.Filename)
} else if us.Type == model.UploadTypeImport {
us.Path = *a.Config().ImportSettings.Directory + "/" + us.Id + "_" + filepath.Base(us.Filename)
}
if err := us.IsValid(); err != nil {
return nil, err
}
channel, err := a.GetChannel(us.ChannelId)
if err != nil {
return nil, model.NewAppError("CreateUploadSession", "app.upload.create.incorrect_channel_id.app_error",
map[string]interface{}{"channelId": us.ChannelId}, "", http.StatusBadRequest)
}
if channel.DeleteAt != 0 {
return nil, model.NewAppError("CreateUploadSession", "app.upload.create.cannot_upload_to_deleted_channel.app_error",
map[string]interface{}{"channelId": us.ChannelId}, "", http.StatusBadRequest)
if us.Type == model.UploadTypeAttachment {
channel, err := a.GetChannel(us.ChannelId)
if err != nil {
return nil, model.NewAppError("CreateUploadSession", "app.upload.create.incorrect_channel_id.app_error",
map[string]interface{}{"channelId": us.ChannelId}, "", http.StatusBadRequest)
}
if channel.DeleteAt != 0 {
return nil, model.NewAppError("CreateUploadSession", "app.upload.create.cannot_upload_to_deleted_channel.app_error",
map[string]interface{}{"channelId": us.ChannelId}, "", http.StatusBadRequest)
}
}
us, storeErr := a.Srv().Store.UploadSession().Save(us)
@ -186,6 +193,11 @@ func (a *App) UploadData(us *model.UploadSession, rd io.Reader) (*model.FileInfo
nil, "FileOffset mismatch", http.StatusBadRequest)
}
uploadPath := us.Path
if us.Type == model.UploadTypeImport {
uploadPath += incompleteUploadSuffix
}
// make sure it's not possible to upload more data than what is expected.
lr := &io.LimitedReader{
R: rd,
@ -195,12 +207,12 @@ func (a *App) UploadData(us *model.UploadSession, rd io.Reader) (*model.FileInfo
var written int64
if us.FileOffset == 0 {
// new upload
written, err = a.WriteFile(lr, us.Path)
written, err = a.WriteFile(lr, uploadPath)
if err != nil && written == 0 {
return nil, err
}
if written < minFirstPartSize && written != us.FileSize {
a.RemoveFile(us.Path)
a.RemoveFile(uploadPath)
var errStr string
if err != nil {
errStr = err.Error()
@ -210,7 +222,7 @@ func (a *App) UploadData(us *model.UploadSession, rd io.Reader) (*model.FileInfo
}
} else if us.FileOffset < us.FileSize {
// resume upload
written, err = a.AppendFile(lr, us.Path)
written, err = a.AppendFile(lr, uploadPath)
}
if written > 0 {
us.FileOffset += written
@ -228,7 +240,7 @@ func (a *App) UploadData(us *model.UploadSession, rd io.Reader) (*model.FileInfo
}
// upload is done, create FileInfo
file, err := a.FileReader(us.Path)
file, err := a.FileReader(uploadPath)
if err != nil {
return nil, model.NewAppError("UploadData", "app.upload.upload_data.read_file.app_error", nil, err.Error(), http.StatusInternalServerError)
}
@ -260,13 +272,19 @@ func (a *App) UploadData(us *model.UploadSession, rd io.Reader) (*model.FileInfo
nameWithoutExtension := info.Name[:strings.LastIndex(info.Name, ".")]
info.PreviewPath = filepath.Dir(info.Path) + "/" + nameWithoutExtension + "_preview.jpg"
info.ThumbnailPath = filepath.Dir(info.Path) + "/" + nameWithoutExtension + "_thumb.jpg"
imgData, fileErr := a.ReadFile(us.Path)
imgData, fileErr := a.ReadFile(uploadPath)
if fileErr != nil {
return nil, fileErr
}
a.HandleImages([]string{info.PreviewPath}, []string{info.ThumbnailPath}, [][]byte{imgData})
}
if us.Type == model.UploadTypeImport {
if err := a.MoveFile(uploadPath, us.Path); err != nil {
return nil, model.NewAppError("UploadData", "app.upload.upload_data.move_file.app_error", nil, err.Error(), http.StatusInternalServerError)
}
}
var storeErr error
if info, storeErr = a.Srv().Store.FileInfo().Save(info); storeErr != nil {
var appErr *model.AppError

View file

@ -1344,6 +1344,14 @@
"id": "api.file.file_exists.s3.app_error",
"translation": "Unable to check if the file exists."
},
{
"id": "api.file.file_size.local.app_error",
"translation": "Unable to get the file size."
},
{
"id": "api.file.file_size.s3.app_error",
"translation": "Unable to get the file size."
},
{
"id": "api.file.get_file.public_invalid.app_error",
"translation": "The public link does not appear to be valid."
@ -4254,6 +4262,10 @@
"id": "app.import.import_user_teams.save_preferences.error",
"translation": "Unable to save the team theme preferences"
},
{
"id": "app.import.marshal.app_error",
"translation": "Unable to marshal response."
},
{
"id": "app.import.process_import_data_file_version_line.invalid_version.error",
"translation": "Unable to read the version of the data import file."
@ -5518,6 +5530,10 @@
"id": "app.upload.upload_data.large_image.app_error",
"translation": "{{.Filename}} dimensions ({{.Width}} by {{.Height}} pixels) exceed the limits."
},
{
"id": "app.upload.upload_data.move_file.app_error",
"translation": "Failed to move uploaded file."
},
{
"id": "app.upload.upload_data.read_file.app_error",
"translation": "Failed to read a file."
@ -6626,6 +6642,30 @@
"id": "groups.unsupported_syncable_type",
"translation": "Unsupported syncable type '{{.Value}}'."
},
{
"id": "import_process.worker.do_job.file_exists",
"translation": "Unable to process import: file does not exists."
},
{
"id": "import_process.worker.do_job.missing_file",
"translation": "Unable to process import: import_file parameter is missing."
},
{
"id": "import_process.worker.do_job.missing_jsonl",
"translation": "Unable to process import: JSONL file is missing."
},
{
"id": "import_process.worker.do_job.open_file",
"translation": "Unable to process import: failed to open file."
},
{
"id": "import_process.worker.do_job.tmp_dir",
"translation": "Unable to process import: failed to create temporary directory."
},
{
"id": "import_process.worker.do_job.unzip",
"translation": "Unable to process import: failed to unzip file."
},
{
"id": "interactive_message.decode_trigger_id.base64_decode_failed",
"translation": "Failed to decode base64 for trigger ID for interactive dialog."
@ -7126,6 +7166,14 @@
"id": "model.config.is_valid.image_proxy_type.app_error",
"translation": "Invalid image proxy type. Must be 'local' or 'atmos/camo'."
},
{
"id": "model.config.is_valid.import.directory.app_error",
"translation": "Invalid value for Directory."
},
{
"id": "model.config.is_valid.import.retention_days_too_low.app_error",
"translation": "Invalid value for RetentionDays. Value is too low."
},
{
"id": "model.config.is_valid.ldap_basedn",
"translation": "AD/LDAP field \"BaseDN\" is required."

View file

@ -21,4 +21,7 @@ import (
// This is a placeholder so this package can be imported in Team Edition when it will be otherwise empty.
_ "github.com/mattermost/mattermost-server/v5/jobs/product_notices"
// This is a placeholder so this package can be imported in Team Edition when it will be otherwise empty.
_ "github.com/mattermost/mattermost-server/v5/jobs/import_process"
)

View file

@ -0,0 +1,196 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package import_process
import (
"io"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"runtime"
"strconv"
"github.com/mattermost/mattermost-server/v5/app"
"github.com/mattermost/mattermost-server/v5/jobs"
tjobs "github.com/mattermost/mattermost-server/v5/jobs/interfaces"
"github.com/mattermost/mattermost-server/v5/mlog"
"github.com/mattermost/mattermost-server/v5/model"
"github.com/mattermost/mattermost-server/v5/utils"
)
func init() {
app.RegisterJobsImportProcessInterface(func(a *app.App) tjobs.ImportProcessInterface {
return &ImportProcessInterfaceImpl{a}
})
}
type ImportProcessInterfaceImpl struct {
app *app.App
}
type ImportProcessWorker struct {
name string
stopChan chan struct{}
stoppedChan chan struct{}
jobsChan chan model.Job
jobServer *jobs.JobServer
app *app.App
}
func (i *ImportProcessInterfaceImpl) MakeWorker() model.Worker {
return &ImportProcessWorker{
name: "ImportProcess",
stopChan: make(chan struct{}),
stoppedChan: make(chan struct{}),
jobsChan: make(chan model.Job),
jobServer: i.app.Srv().Jobs,
app: i.app,
}
}
func (w *ImportProcessWorker) JobChannel() chan<- model.Job {
return w.jobsChan
}
func (w *ImportProcessWorker) Run() {
mlog.Debug("Worker started", mlog.String("worker", w.name))
defer func() {
mlog.Debug("Worker finished", mlog.String("worker", w.name))
close(w.stoppedChan)
}()
for {
select {
case <-w.stopChan:
mlog.Debug("Worker received stop signal", mlog.String("worker", w.name))
return
case job := <-w.jobsChan:
mlog.Debug("Worker received a new candidate job.", mlog.String("worker", w.name))
w.doJob(&job)
}
}
}
func (w *ImportProcessWorker) Stop() {
mlog.Debug("Worker stopping", mlog.String("worker", w.name))
close(w.stopChan)
<-w.stoppedChan
}
func (w *ImportProcessWorker) doJob(job *model.Job) {
if claimed, err := w.jobServer.ClaimJob(job); err != nil {
mlog.Warn("Worker experienced an error while trying to claim job",
mlog.String("worker", w.name),
mlog.String("job_id", job.Id),
mlog.String("error", err.Error()))
return
} else if !claimed {
return
}
importFileName, ok := job.Data["import_file"]
if !ok {
appError := model.NewAppError("ImportProcessWorker", "import_process.worker.do_job.missing_file", nil, "", http.StatusBadRequest)
w.setJobError(job, appError)
return
}
importFilePath := filepath.Join(*w.app.Config().ImportSettings.Directory, importFileName)
if ok, err := w.app.FileExists(importFilePath); err != nil {
w.setJobError(job, err)
return
} else if !ok {
appError := model.NewAppError("ImportProcessWorker", "import_process.worker.do_job.file_exists", nil, "", http.StatusBadRequest)
w.setJobError(job, appError)
return
}
importFileSize, appErr := w.app.FileSize(importFilePath)
if appErr != nil {
w.setJobError(job, appErr)
return
}
importFile, appErr := w.app.FileReader(importFilePath)
if appErr != nil {
w.setJobError(job, appErr)
return
}
defer importFile.Close()
// TODO (MM-30187): improve this process by eliminating the need to unzip the import
// file locally and instead do the whole bulk import process in memory by
// streaming the import file.
// create a temporary dir to extract the zipped import file.
dir, err := ioutil.TempDir("", "import")
if err != nil {
appError := model.NewAppError("ImportProcessWorker", "import_process.worker.do_job.tmp_dir", nil, err.Error(), http.StatusInternalServerError)
w.setJobError(job, appError)
return
}
defer os.RemoveAll(dir)
// extract the contents of the zipped file.
paths, err := utils.UnzipToPath(importFile.(io.ReaderAt), importFileSize, dir)
if err != nil {
appError := model.NewAppError("ImportProcessWorker", "import_process.worker.do_job.unzip", nil, err.Error(), http.StatusInternalServerError)
w.setJobError(job, appError)
return
}
// find JSONL import file.
var jsonFilePath string
for _, path := range paths {
if filepath.Ext(path) == ".jsonl" {
jsonFilePath = path
break
}
}
if jsonFilePath == "" {
appError := model.NewAppError("ImportProcessWorker", "import_process.worker.do_job.missing_jsonl", nil, "", http.StatusBadRequest)
w.setJobError(job, appError)
return
}
jsonFile, err := os.Open(jsonFilePath)
if err != nil {
appError := model.NewAppError("ImportProcessWorker", "import_process.worker.do_job.open_file", nil, err.Error(), http.StatusInternalServerError)
w.setJobError(job, appError)
return
}
// do the actual import.
appErr, lineNumber := w.app.BulkImportWithPath(jsonFile, false, runtime.NumCPU(), dir)
if appErr != nil {
job.Data["line_number"] = strconv.Itoa(lineNumber)
w.setJobError(job, appErr)
return
}
// remove import file when done.
if appErr := w.app.RemoveFile(importFilePath); appErr != nil {
w.setJobError(job, appErr)
return
}
mlog.Info("Worker: Job is complete", mlog.String("worker", w.name), mlog.String("job_id", job.Id))
w.setJobSuccess(job)
}
func (w *ImportProcessWorker) setJobSuccess(job *model.Job) {
if err := w.app.Srv().Jobs.SetJobSuccess(job); err != nil {
mlog.Error("Worker: Failed to set success for job", mlog.String("worker", w.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
w.setJobError(job, err)
}
}
func (w *ImportProcessWorker) setJobError(job *model.Job, appError *model.AppError) {
if err := w.app.Srv().Jobs.SetJobError(job, appError); err != nil {
mlog.Error("Worker: Failed to set job error", mlog.String("worker", w.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
}
}

View file

@ -0,0 +1,12 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package interfaces
import (
"github.com/mattermost/mattermost-server/v5/model"
)
type ImportProcessInterface interface {
MakeWorker() model.Worker
}

View file

@ -149,6 +149,13 @@ func (watcher *Watcher) PollAndNotify() {
default:
}
}
} else if job.Type == model.JOB_TYPE_IMPORT_PROCESS {
if watcher.workers.ImportProcess != nil {
select {
case watcher.workers.ImportProcess.JobChannel() <- *job:
default:
}
}
} else if job.Type == model.JOB_TYPE_CLOUD {
if watcher.workers.Cloud != nil {
select {

View file

@ -28,6 +28,7 @@ type JobServer struct {
ExpiryNotify tjobs.ExpiryNotifyJobInterface
ProductNotices tjobs.ProductNoticesJobInterface
ActiveUsers tjobs.ActiveUsersJobInterface
ImportProcess tjobs.ImportProcessInterface
Cloud ejobs.CloudJobInterface
}

View file

@ -27,6 +27,7 @@ type Workers struct {
ExpiryNotify model.Worker
ProductNotices model.Worker
ActiveUsers model.Worker
ImportProcess model.Worker
Cloud model.Worker
listenerId string
@ -82,6 +83,10 @@ func (srv *JobServer) InitWorkers() *Workers {
workers.ProductNotices = productNoticesInterface.MakeWorker()
}
if importProcessInterface := srv.ImportProcess; importProcessInterface != nil {
workers.ImportProcess = importProcessInterface.MakeWorker()
}
if cloudInterface := srv.Cloud; cloudInterface != nil {
workers.Cloud = cloudInterface.MakeWorker()
}
@ -137,6 +142,10 @@ func (workers *Workers) Start() *Workers {
go workers.ProductNotices.Run()
}
if workers.ImportProcess != nil {
go workers.ImportProcess.Run()
}
if workers.Cloud != nil {
go workers.Cloud.Run()
}
@ -245,10 +254,15 @@ func (workers *Workers) Stop() *Workers {
if workers.ActiveUsers != nil {
workers.ActiveUsers.Stop()
}
if workers.ProductNotices != nil {
workers.ProductNotices.Stop()
}
if workers.ImportProcess != nil {
workers.ImportProcess.Stop()
}
if workers.Cloud != nil {
workers.Cloud.Stop()
}

View file

@ -543,6 +543,10 @@ func (c *Client4) GetGroupSyncablesRoute(groupID string, syncableType GroupSynca
return fmt.Sprintf("%s/%ss", c.GetGroupRoute(groupID), strings.ToLower(syncableType.String()))
}
func (c *Client4) GetImportsRoute() string {
return "/imports"
}
func (c *Client4) DoApiGet(url string, etag string) (*http.Response, *AppError) {
return c.DoApiRequest(http.MethodGet, c.ApiUrl+url, "", etag)
}
@ -5754,6 +5758,15 @@ func (c *Client4) UpdateCloudCustomerAddress(address *Address) (*CloudCustomer,
return customer, BuildResponse(r)
}
func (c *Client4) ListImports() ([]string, *Response) {
r, err := c.DoApiGet(c.GetImportsRoute(), "")
if err != nil {
return nil, BuildErrorResponse(r, err)
}
defer closeBody(r)
return ArrayFromJson(r.Body), BuildResponse(r)
}
func (c *Client4) GetUserThreads(userId string, options GetUserThreadsOpts) (*Threads, *Response) {
v := url.Values{}
if options.Since != 0 {

View file

@ -117,6 +117,9 @@ const (
FILE_SETTINGS_DEFAULT_DIRECTORY = "./data/"
IMPORT_SETTINGS_DEFAULT_DIRECTORY = "./import"
IMPORT_SETTINGS_DEFAULT_RETENTION_DAYS = 30
EMAIL_SETTINGS_DEFAULT_FEEDBACK_ORGANIZATION = ""
SUPPORT_SETTINGS_DEFAULT_TERMS_OF_SERVICE_LINK = "https://about.mattermost.com/default-terms/"
@ -2854,6 +2857,37 @@ func (s *ImageProxySettings) SetDefaults(ss ServiceSettings) {
}
}
// ImportSettings defines configuration settings for file imports.
type ImportSettings struct {
// The directory where to store the imported files.
Directory *string
// The number of days to retain the imported files before deleting them.
RetentionDays *int
}
func (s *ImportSettings) isValid() *AppError {
if *s.Directory == "" {
return NewAppError("Config.IsValid", "model.config.is_valid.import.directory.app_error", nil, "", http.StatusBadRequest)
}
if *s.RetentionDays <= 0 {
return NewAppError("Config.IsValid", "model.config.is_valid.import.retention_days_too_low.app_error", nil, "", http.StatusBadRequest)
}
return nil
}
// SetDefaults applies the default settings to the struct.
func (s *ImportSettings) SetDefaults() {
if s.Directory == nil || *s.Directory == "" {
s.Directory = NewString(IMPORT_SETTINGS_DEFAULT_DIRECTORY)
}
if s.RetentionDays == nil {
s.RetentionDays = NewInt(IMPORT_SETTINGS_DEFAULT_RETENTION_DAYS)
}
}
type ConfigFunc func() *Config
const ConfigAccessTagType = "access"
@ -2929,6 +2963,7 @@ type Config struct {
ImageProxySettings ImageProxySettings
CloudSettings CloudSettings
FeatureFlags *FeatureFlags `json:",omitempty"`
ImportSettings ImportSettings
}
func (o *Config) Clone() *Config {
@ -3032,6 +3067,7 @@ func (o *Config) SetDefaults() {
o.FeatureFlags = &FeatureFlags{}
o.FeatureFlags.SetDefaults()
}
o.ImportSettings.SetDefaults()
}
func (o *Config) IsValid() *AppError {
@ -3110,6 +3146,10 @@ func (o *Config) IsValid() *AppError {
if err := o.ImageProxySettings.isValid(); err != nil {
return err
}
if err := o.ImportSettings.isValid(); err != nil {
return err
}
return nil
}

View file

@ -1389,3 +1389,31 @@ func TestSetDefaultFeatureFlagBehaviour(t *testing.T) {
require.Equal(t, "somevalue", cfg.FeatureFlags.TestFeature)
}
func TestConfigImportSettingsDefaults(t *testing.T) {
cfg := Config{}
cfg.SetDefaults()
require.Equal(t, "./import", *cfg.ImportSettings.Directory)
require.Equal(t, 30, *cfg.ImportSettings.RetentionDays)
}
func TestConfigImportSettingsIsValid(t *testing.T) {
cfg := Config{}
cfg.SetDefaults()
err := cfg.ImportSettings.isValid()
require.Nil(t, err)
*cfg.ImportSettings.Directory = ""
err = cfg.ImportSettings.isValid()
require.NotNil(t, err)
require.Equal(t, "model.config.is_valid.import.directory.app_error", err.Id)
cfg.SetDefaults()
*cfg.ImportSettings.RetentionDays = 0
err = cfg.ImportSettings.isValid()
require.NotNil(t, err)
require.Equal(t, "model.config.is_valid.import.retention_days_too_low.app_error", err.Id)
}

View file

@ -22,6 +22,7 @@ const (
JOB_TYPE_EXPIRY_NOTIFY = "expiry_notify"
JOB_TYPE_PRODUCT_NOTICES = "product_notices"
JOB_TYPE_ACTIVE_USERS = "active_users"
JOB_TYPE_IMPORT_PROCESS = "import_process"
JOB_TYPE_CLOUD = "cloud"
JOB_STATUS_PENDING = "pending"
@ -66,6 +67,7 @@ func (j *Job) IsValid() *AppError {
case JOB_TYPE_PRODUCT_NOTICES:
case JOB_TYPE_EXPIRY_NOTIFY:
case JOB_TYPE_ACTIVE_USERS:
case JOB_TYPE_IMPORT_PROCESS:
case JOB_TYPE_CLOUD:
default:
return NewAppError("Job.IsValid", "model.job.is_valid.type.app_error", nil, "id="+j.Id, http.StatusBadRequest)

View file

@ -18,6 +18,9 @@ const (
UploadTypeImport UploadType = "import"
)
// UploadNoUserID is a "fake" user id used by the API layer when in local mode.
const UploadNoUserID = "nouser"
// UploadSession contains information used to keep track of a file upload.
type UploadSession struct {
// The unique identifier for the session.
@ -29,7 +32,7 @@ type UploadSession struct {
// The id of the user performing the upload.
UserId string `json:"user_id"`
// The id of the channel to upload to.
ChannelId string `json:"channel_id"`
ChannelId string `json:"channel_id,omitempty"`
// The name of the file to upload.
Filename string `json:"filename"`
// The path where the file is stored.
@ -109,7 +112,7 @@ func (us *UploadSession) IsValid() *AppError {
return NewAppError("UploadSession.IsValid", "model.upload_session.is_valid.type.app_error", nil, err.Error(), http.StatusBadRequest)
}
if !IsValidId(us.UserId) {
if !IsValidId(us.UserId) && us.UserId != UploadNoUserID {
return NewAppError("UploadSession.IsValid", "model.upload_session.is_valid.user_id.app_error", nil, "id="+us.Id, http.StatusBadRequest)
}

View file

@ -21,6 +21,7 @@ type FileBackend interface {
Reader(path string) (ReadCloseSeeker, *model.AppError)
ReadFile(path string) ([]byte, *model.AppError)
FileExists(path string) (bool, *model.AppError)
FileSize(path string) (int64, *model.AppError)
CopyFile(oldPath, newPath string) *model.AppError
MoveFile(oldPath, newPath string) *model.AppError
WriteFile(fr io.Reader, path string) (int64, *model.AppError)

View file

@ -7,6 +7,7 @@ import (
"bytes"
"fmt"
"io/ioutil"
"math/rand"
"os"
"testing"
@ -365,6 +366,28 @@ func (s *FileBackendTestSuite) TestAppendFile() {
})
}
func (s *FileBackendTestSuite) TestFileSize() {
s.Run("nonexistent file", func() {
size, err := s.backend.FileSize("tests/nonexistentfile")
s.NotNil(err)
s.Zero(size)
})
s.Run("valid file", func() {
data := make([]byte, rand.Intn(1024*1024)+1)
path := "tests/" + model.NewId()
written, err := s.backend.WriteFile(bytes.NewReader(data), path)
s.Nil(err)
s.EqualValues(len(data), written)
defer s.backend.RemoveFile(path)
size, err := s.backend.FileSize(path)
s.Nil(err)
s.Equal(int64(len(data)), size)
})
}
func BenchmarkS3WriteFile(b *testing.B) {
utils.TranslationsPreInit()

View file

@ -63,6 +63,14 @@ func (b *LocalFileBackend) FileExists(path string) (bool, *model.AppError) {
return true, nil
}
func (b *LocalFileBackend) FileSize(path string) (int64, *model.AppError) {
info, err := os.Stat(filepath.Join(b.directory, path))
if err != nil {
return 0, model.NewAppError("FileSize", "api.file.file_size.local.app_error", nil, err.Error(), http.StatusInternalServerError)
}
return info.Size(), nil
}
func (b *LocalFileBackend) CopyFile(oldPath, newPath string) *model.AppError {
if err := utils.CopyFile(filepath.Join(b.directory, oldPath), filepath.Join(b.directory, newPath)); err != nil {
return model.NewAppError("copyFile", "api.file.move_file.rename.app_error", nil, err.Error(), http.StatusInternalServerError)

View file

@ -195,6 +195,17 @@ func (b *S3FileBackend) FileExists(path string) (bool, *model.AppError) {
return false, model.NewAppError("FileExists", "api.file.file_exists.s3.app_error", nil, err.Error(), http.StatusInternalServerError)
}
func (b *S3FileBackend) FileSize(path string) (int64, *model.AppError) {
path = filepath.Join(b.pathPrefix, path)
info, err := b.client.StatObject(context.Background(), b.bucket, path, s3.StatObjectOptions{})
if err != nil {
return 0, model.NewAppError("FileSize", "api.file.file_size.s3.app_error", nil, err.Error(), http.StatusInternalServerError)
}
return info.Size, nil
}
func (b *S3FileBackend) CopyFile(oldPath, newPath string) *model.AppError {
oldPath = filepath.Join(b.pathPrefix, oldPath)
newPath = filepath.Join(b.pathPrefix, newPath)

View file

@ -93,9 +93,6 @@ func (us SqlUploadSessionStore) Get(id string) (*model.UploadSession, error) {
}
func (us SqlUploadSessionStore) GetForUser(userId string) ([]*model.UploadSession, error) {
if !model.IsValidId(userId) {
return nil, errors.New("SqlUploadSessionStore.GetForUser: userId is not valid")
}
query := us.getQueryBuilder().
Select("*").
From("UploadSessions").

View file

@ -145,12 +145,6 @@ func testUploadSessionStoreGetForUser(t *testing.T, ss store.Store) {
},
}
t.Run("getting invalid userId should fail", func(t *testing.T) {
us, err := ss.UploadSession().GetForUser("invalidId")
require.Error(t, err)
require.Nil(t, us)
})
t.Run("should return no sessions", func(t *testing.T) {
us, err := ss.UploadSession().GetForUser(userId)
require.NoError(t, err)

BIN
tests/import_test.zip Normal file

Binary file not shown.

BIN
tests/testarchive.zip Normal file

Binary file not shown.

65
utils/archive.go Normal file
View file

@ -0,0 +1,65 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package utils
import (
"archive/zip"
"fmt"
"io"
"os"
"path/filepath"
"strings"
)
func sanitizePath(p string) string {
dir := strings.ReplaceAll(filepath.Dir(filepath.Clean(p)), "..", "")
base := filepath.Base(p)
if strings.Count(base, ".") == len(base) {
return ""
}
return filepath.Join(dir, base)
}
// UnzipToPath extracts a given zip archive into a given path.
// It returns a list of extracted paths.
func UnzipToPath(zipFile io.ReaderAt, size int64, outPath string) ([]string, error) {
rd, err := zip.NewReader(zipFile, size)
if err != nil {
return nil, fmt.Errorf("failed to create reader: %w", err)
}
paths := make([]string, len(rd.File))
for i, f := range rd.File {
filePath := sanitizePath(f.Name)
if filePath == "" {
return nil, fmt.Errorf("invalid filepath `%s`", f.Name)
}
path := filepath.Join(outPath, filePath)
paths[i] = path
if f.FileInfo().IsDir() {
if err := os.Mkdir(path, 0744); err != nil {
return nil, fmt.Errorf("failed to create directory: %w", err)
}
continue
}
outFile, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return nil, fmt.Errorf("failed to create file: %w", err)
}
defer outFile.Close()
file, err := f.Open()
if err != nil {
return nil, fmt.Errorf("failed to open file: %w", err)
}
defer file.Close()
if _, err := io.Copy(outFile, file); err != nil {
return nil, fmt.Errorf("failed to write to file: %w", err)
}
}
return paths, nil
}

149
utils/archive_test.go Normal file
View file

@ -0,0 +1,149 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package utils
import (
"archive/zip"
"errors"
"io/ioutil"
"os"
"path/filepath"
"testing"
"github.com/mattermost/mattermost-server/v5/utils/fileutils"
"github.com/stretchr/testify/require"
)
func TestSanitizePath(t *testing.T) {
cases := []struct {
input string
expected string
}{
{
".",
"",
},
{
"../",
"",
},
{
"...",
"",
},
{
"..//.",
"",
},
{
"/../",
"",
},
{
"/path/...../to/file",
"/path/to/file",
},
{
"/path/to/file...",
"/path/to/file...",
},
{
"/path/to/../../../file",
"/file",
},
{
"../../../../file",
"/file",
},
{
"/path/to/file..ext",
"/path/to/file..ext",
},
{
"/path/to/...file..ext",
"/path/to/...file..ext",
},
{
"./path/to/...file..ext",
"path/to/...file..ext",
},
{
"./...file",
"...file",
},
{
"path/",
"path",
},
}
for _, c := range cases {
t.Run(c.input, func(t *testing.T) {
require.Equal(t, c.expected, sanitizePath(c.input))
})
}
}
func TestUnzipToPath(t *testing.T) {
testDir, _ := fileutils.FindDir("tests")
require.NotEmpty(t, testDir)
dir, err := ioutil.TempDir("", "unzip")
require.Nil(t, err)
defer os.RemoveAll(dir)
t.Run("invalid archive", func(t *testing.T) {
file, err := os.Open(testDir + "/testplugin.tar.gz")
require.Nil(t, err)
defer file.Close()
info, err := file.Stat()
require.Nil(t, err)
paths, err := UnzipToPath(file, info.Size(), dir)
require.NotNil(t, err)
require.True(t, errors.Is(err, zip.ErrFormat))
require.Nil(t, paths)
})
t.Run("valid archive", func(t *testing.T) {
file, err := os.Open(testDir + "/testarchive.zip")
require.Nil(t, err)
defer file.Close()
info, err := file.Stat()
require.Nil(t, err)
paths, err := UnzipToPath(file, info.Size(), dir)
require.Nil(t, err)
require.NotEmpty(t, paths)
expectedFiles := map[string]int64{
dir + "/testfile.txt": 446,
dir + "/testdir/testfile2.txt": 866,
dir + "/testdir2/testfile3.txt": 845,
}
expectedDirs := []string{
dir + "/testdir",
dir + "/testdir2",
}
err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
require.Nil(t, err)
if path == dir {
return nil
}
require.Contains(t, paths, path)
if info.IsDir() {
require.Contains(t, expectedDirs, path)
} else {
require.Equal(t, expectedFiles[path], info.Size())
}
return nil
})
require.Nil(t, err)
})
}