Make S3 upload part size configurable (#26533)

This commit is contained in:
Claudio Costa 2024-04-05 07:20:48 -06:00 committed by GitHub
parent 931abcb24e
commit bc6182229b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 142 additions and 32 deletions

View file

@ -62,6 +62,7 @@ type FileBackendSettings struct {
SkipVerify bool
AmazonS3RequestTimeoutMilliseconds int64
AmazonS3PresignExpiresSeconds int64
AmazonS3UploadPartSizeBytes int64
}
func NewFileBackendSettingsFromConfig(fileSettings *model.FileSettings, enableComplianceFeature bool, skipVerify bool) FileBackendSettings {
@ -85,6 +86,7 @@ func NewFileBackendSettingsFromConfig(fileSettings *model.FileSettings, enableCo
AmazonS3Trace: fileSettings.AmazonS3Trace != nil && *fileSettings.AmazonS3Trace,
AmazonS3RequestTimeoutMilliseconds: *fileSettings.AmazonS3RequestTimeoutMilliseconds,
SkipVerify: skipVerify,
AmazonS3UploadPartSizeBytes: *fileSettings.AmazonS3UploadPartSizeBytes,
}
}
@ -109,6 +111,7 @@ func NewExportFileBackendSettingsFromConfig(fileSettings *model.FileSettings, en
AmazonS3Trace: fileSettings.ExportAmazonS3Trace != nil && *fileSettings.ExportAmazonS3Trace,
AmazonS3RequestTimeoutMilliseconds: *fileSettings.ExportAmazonS3RequestTimeoutMilliseconds,
AmazonS3PresignExpiresSeconds: *fileSettings.ExportAmazonS3PresignExpiresSeconds,
AmazonS3UploadPartSizeBytes: *fileSettings.ExportAmazonS3UploadPartSizeBytes,
SkipVerify: skipVerify,
}
}

View file

@ -8,6 +8,7 @@ import (
"context"
"fmt"
"io"
"math"
"math/rand"
"os"
"strings"
@ -584,7 +585,24 @@ func (s *FileBackendTestSuite) TestFileModTime() {
}
func BenchmarkS3WriteFile(b *testing.B) {
settings := FileBackendSettings{
fileSizes := []int{
1024 * 100, // 100KB
1024 * 1024, // 1MB
1024 * 1024 * 10, // 10MB
1024 * 1024 * 100, // 100MB
1024 * 1024 * 1000, // 1GB
1024 * 1024 * 10000, // 10GB
}
partSizes := []int64{
1024 * 1024 * 5, // 5MB
1024 * 1024 * 10, // 10MB
1024 * 1024 * 25, // 25MB
1024 * 1024 * 100, // 100MB
1024 * 1024 * 200, // 200MB
}
defaultSettings := FileBackendSettings{
DriverName: driverS3,
AmazonS3AccessKeyId: "minioaccesskey",
AmazonS3SecretAccessKey: "miniosecretkey",
@ -594,29 +612,81 @@ func BenchmarkS3WriteFile(b *testing.B) {
AmazonS3PathPrefix: "",
AmazonS3SSL: false,
AmazonS3SSE: false,
AmazonS3RequestTimeoutMilliseconds: 20000,
AmazonS3RequestTimeoutMilliseconds: 300 * 1000,
}
backend, err := NewFileBackend(settings)
require.NoError(b, err)
// The following overrides make it easier to test these against different backends
// (e.g. S3 instead of minio).
if val := os.Getenv("MM_FILESETTINGS_AMAZONS3BUCKET"); val != "" {
defaultSettings.AmazonS3Bucket = val
}
if val := os.Getenv("MM_FILESETTINGS_AMAZONS3REGION"); val != "" {
defaultSettings.AmazonS3Region = val
}
if val := os.Getenv("MM_FILESETTINGS_AMAZONS3ACCESSKEYID"); val != "" {
defaultSettings.AmazonS3AccessKeyId = val
}
if val := os.Getenv("MM_FILESETTINGS_AMAZONS3SECRETACCESSKEY"); val != "" {
defaultSettings.AmazonS3SecretAccessKey = val
}
if val := os.Getenv("MM_FILESETTINGS_AMAZONS3ENDPOINT"); val != "" {
defaultSettings.AmazonS3Endpoint = val
}
if val := os.Getenv("MM_FILESETTINGS_AMAZONS3TRACE"); val == "true" {
defaultSettings.AmazonS3Trace = true
}
// This is needed to create the bucket if it doesn't exist.
require.NoError(b, backend.TestConnection())
backendMap := make(map[int64]FileBackend, len(partSizes))
for _, partSize := range partSizes {
settings := defaultSettings
settings.AmazonS3UploadPartSizeBytes = partSize
path := "tests/" + randomString()
size := 1 * 1024 * 1024
data := make([]byte, size)
b.ResetTimer()
for i := 0; i < b.N; i++ {
written, err := backend.WriteFile(bytes.NewReader(data), path)
defer backend.RemoveFile(path)
backend, err := NewFileBackend(settings)
require.NoError(b, err)
require.Len(b, data, int(written))
// This is needed to create the bucket if it doesn't exist.
err = backend.TestConnection()
if _, ok := err.(*S3FileBackendNoBucketError); ok {
require.NoError(b, backend.(*S3FileBackend).MakeBucket())
} else {
require.NoError(b, err)
}
backendMap[partSize] = backend
}
b.StopTimer()
bufferSize := 1024 * 1024 // 4MB
buffer := make([]byte, bufferSize)
for _, size := range fileSizes {
for _, partSize := range partSizes {
backend := backendMap[partSize]
b.Run(fmt.Sprintf("FileSize-%dMB_PartSize-%dMB", int(math.Round(float64(size)/1024/1024)), int(math.Round(float64(partSize)/1024/1024))), func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
rd, wr := io.Pipe()
go func() {
defer wr.Close()
for i := 0; i < size; i += bufferSize {
b := buffer
if size < bufferSize {
b = b[:size]
}
wr.Write(b)
}
}()
path := "tests/" + randomString()
b.StartTimer()
written, err := backend.WriteFile(rd, path)
b.StopTimer()
require.NoError(b, err)
require.Equal(b, size, int(written))
err = backend.RemoveFile(path)
require.NoError(b, err)
}
})
}
}
}
func TestNewExportFileBackendSettingsFromConfig(t *testing.T) {
@ -670,6 +740,7 @@ func TestNewExportFileBackendSettingsFromConfig(t *testing.T) {
SkipVerify: true,
AmazonS3RequestTimeoutMilliseconds: 1000,
AmazonS3PresignExpiresSeconds: 60000,
AmazonS3UploadPartSizeBytes: model.FileSettingsDefaultS3ExportUploadPartSizeBytes,
}
actual := NewExportFileBackendSettingsFromConfig(&model.FileSettings{
@ -686,6 +757,7 @@ func TestNewExportFileBackendSettingsFromConfig(t *testing.T) {
ExportAmazonS3Trace: model.NewBool(true),
ExportAmazonS3RequestTimeoutMilliseconds: model.NewInt64(1000),
ExportAmazonS3PresignExpiresSeconds: model.NewInt64(60000),
ExportAmazonS3UploadPartSizeBytes: model.NewInt64(model.FileSettingsDefaultS3ExportUploadPartSizeBytes),
}, enableComplianceFeature, skipVerify)
require.Equal(t, expected, actual)
@ -711,6 +783,7 @@ func TestNewExportFileBackendSettingsFromConfig(t *testing.T) {
SkipVerify: true,
AmazonS3RequestTimeoutMilliseconds: 1000,
AmazonS3PresignExpiresSeconds: 60000,
AmazonS3UploadPartSizeBytes: model.FileSettingsDefaultS3ExportUploadPartSizeBytes,
}
actual := NewExportFileBackendSettingsFromConfig(&model.FileSettings{
@ -727,6 +800,7 @@ func TestNewExportFileBackendSettingsFromConfig(t *testing.T) {
ExportAmazonS3Trace: model.NewBool(true),
ExportAmazonS3RequestTimeoutMilliseconds: model.NewInt64(1000),
ExportAmazonS3PresignExpiresSeconds: model.NewInt64(60000),
ExportAmazonS3UploadPartSizeBytes: model.NewInt64(model.FileSettingsDefaultS3ExportUploadPartSizeBytes),
}, enableComplianceFeature, skipVerify)
require.Equal(t, expected, actual)

View file

@ -42,6 +42,7 @@ type S3FileBackend struct {
timeout time.Duration
presignExpires time.Duration
isCloud bool // field to indicate whether this is running under Mattermost cloud or not.
uploadPartSize int64
}
type S3FileBackendAuthError struct {
@ -114,6 +115,7 @@ func newS3FileBackend(settings FileBackendSettings, isCloud bool) (*S3FileBacken
skipVerify: settings.SkipVerify,
timeout: timeout,
presignExpires: time.Duration(settings.AmazonS3PresignExpiresSeconds) * time.Second,
uploadPartSize: settings.AmazonS3UploadPartSizeBytes,
}
cli, err := backend.s3New(isCloud)
if err != nil {
@ -497,7 +499,7 @@ func (b *S3FileBackend) WriteFileContext(ctx context.Context, fr io.Reader, path
contentType = "binary/octet-stream"
}
options := s3PutOptions(b.encrypt, contentType)
options := s3PutOptions(b.encrypt, contentType, b.uploadPartSize)
objSize := int64(-1)
if b.isCloud {
@ -541,7 +543,7 @@ func (b *S3FileBackend) AppendFile(fr io.Reader, path string) (int64, error) {
contentType = "binary/octet-stream"
}
options := s3PutOptions(b.encrypt, contentType)
options := s3PutOptions(b.encrypt, contentType, b.uploadPartSize)
sse := options.ServerSideEncryption
partName := fp + ".part"
ctx2, cancel2 := context.WithTimeout(context.Background(), b.timeout)
@ -745,15 +747,13 @@ func (b *S3FileBackend) prefixedPath(s string) (string, error) {
return filepath.Join(b.pathPrefix, s), nil
}
func s3PutOptions(encrypted bool, contentType string) s3.PutObjectOptions {
func s3PutOptions(encrypted bool, contentType string, uploadPartSize int64) s3.PutObjectOptions {
options := s3.PutObjectOptions{}
if encrypted {
options.ServerSideEncryption = encrypt.NewSSE()
}
options.ContentType = contentType
// We set the part size to the minimum allowed value of 5MBs
// to avoid an excessive allocation in minio.PutObject implementation.
options.PartSize = 1024 * 1024 * 5
options.PartSize = uint64(uploadPartSize)
return options
}

View file

@ -4,12 +4,12 @@
package filestore
import (
"bytes"
"context"
"crypto/rand"
"encoding/base64"
"errors"
"fmt"
"io"
"net/http/httptest"
"net/http/httputil"
"net/url"
@ -116,15 +116,36 @@ func TestTimeout(t *testing.T) {
AmazonS3RequestTimeoutMilliseconds: 0,
}
fileBackend, err := NewS3FileBackend(cfg)
require.NoError(t, err)
t.Run("MakeBucket", func(t *testing.T) {
fileBackend, err := NewS3FileBackend(cfg)
require.NoError(t, err)
err = fileBackend.MakeBucket()
require.True(t, errors.Is(err, context.DeadlineExceeded))
err = fileBackend.MakeBucket()
require.True(t, errors.Is(err, context.DeadlineExceeded))
})
path := "tests/" + randomString() + ".png"
_, err = fileBackend.WriteFile(bytes.NewReader([]byte("testimage")), path)
require.True(t, errors.Is(err, context.DeadlineExceeded))
t.Run("WriteFile", func(t *testing.T) {
cfg.AmazonS3RequestTimeoutMilliseconds = 1000
fileBackend, err := NewS3FileBackend(cfg)
require.NoError(t, err)
err = fileBackend.MakeBucket()
require.NoError(t, err)
r, w := io.Pipe()
go func() {
defer w.Close()
for i := 0; i < 10; i++ {
_, writeErr := w.Write([]byte("data"))
require.NoError(t, writeErr)
time.Sleep(time.Millisecond * 200)
}
}()
_, err = fileBackend.WriteFile(r, "tests/"+randomString()+".png")
require.True(t, errors.Is(err, context.DeadlineExceeded))
})
}
func TestInsecureMakeBucket(t *testing.T) {

View file

@ -122,7 +122,9 @@ const (
SqlSettingsDefaultDataSource = "postgres://mmuser:mostest@localhost/mattermost_test?sslmode=disable&connect_timeout=10&binary_parameters=yes"
FileSettingsDefaultDirectory = "./data/"
FileSettingsDefaultDirectory = "./data/"
FileSettingsDefaultS3UploadPartSizeBytes = 5 * 1024 * 1024 // 5MB
FileSettingsDefaultS3ExportUploadPartSizeBytes = 100 * 1024 * 1024 // 100MB
ImportSettingsDefaultDirectory = "./import"
ImportSettingsDefaultRetentionDays = 30
@ -1579,6 +1581,7 @@ type FileSettings struct {
AmazonS3SSE *bool `access:"environment_file_storage,write_restrictable,cloud_restrictable"`
AmazonS3Trace *bool `access:"environment_file_storage,write_restrictable,cloud_restrictable"`
AmazonS3RequestTimeoutMilliseconds *int64 `access:"environment_file_storage,write_restrictable,cloud_restrictable"` // telemetry: none
AmazonS3UploadPartSizeBytes *int64 `access:"environment_file_storage,write_restrictable,cloud_restrictable"` // telemetry: none
// Export store settings
DedicatedExportStore *bool `access:"environment_file_storage,write_restrictable"`
ExportDriverName *string `access:"environment_file_storage,write_restrictable"`
@ -1595,6 +1598,7 @@ type FileSettings struct {
ExportAmazonS3Trace *bool `access:"environment_file_storage,write_restrictable"`
ExportAmazonS3RequestTimeoutMilliseconds *int64 `access:"environment_file_storage,write_restrictable"` // telemetry: none
ExportAmazonS3PresignExpiresSeconds *int64 `access:"environment_file_storage,write_restrictable"` // telemetry: none
ExportAmazonS3UploadPartSizeBytes *int64 `access:"environment_file_storage,write_restrictable"` // telemetry: none
}
func (s *FileSettings) SetDefaults(isUpdate bool) {
@ -1703,6 +1707,10 @@ func (s *FileSettings) SetDefaults(isUpdate bool) {
s.AmazonS3RequestTimeoutMilliseconds = NewInt64(30000)
}
if s.AmazonS3UploadPartSizeBytes == nil {
s.AmazonS3UploadPartSizeBytes = NewInt64(FileSettingsDefaultS3UploadPartSizeBytes)
}
if s.DedicatedExportStore == nil {
s.DedicatedExportStore = NewBool(false)
}
@ -1764,6 +1772,10 @@ func (s *FileSettings) SetDefaults(isUpdate bool) {
if s.ExportAmazonS3PresignExpiresSeconds == nil {
s.ExportAmazonS3PresignExpiresSeconds = NewInt64(21600) // 6h
}
if s.ExportAmazonS3UploadPartSizeBytes == nil {
s.ExportAmazonS3UploadPartSizeBytes = NewInt64(FileSettingsDefaultS3ExportUploadPartSizeBytes)
}
}
type EmailSettings struct {