diff --git a/cmd/restic/cmd_backup.go b/cmd/restic/cmd_backup.go index e0f901738..818735813 100644 --- a/cmd/restic/cmd_backup.go +++ b/cmd/restic/cmd_backup.go @@ -450,6 +450,21 @@ func collectTargets(opts BackupOptions, args []string, warnf func(msg string, ar return nil, errors.Fatal("nothing to backup, please specify source files/dirs") } + // example "s3://bucketname/maybe-folder" + if strings.HasPrefix(targets[0], fs.S3Prefix) { + for _, target := range targets { + if !strings.HasPrefix(target, fs.S3Prefix) { + return nil, errors.Fatalf("target=%s has not prefix %s", target, fs.S3Prefix) + + } + paths := strings.Split(strings.TrimPrefix(target, fs.S3Prefix), "/") + if len(paths) < 2 || paths[1] == "" { + return nil, errors.Fatalf("target=%s has not bucketName", target) + } + } + return targets, nil + } + return filterExisting(targets, warnf) } @@ -506,6 +521,7 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts global.Options, te success := true targets, err := collectTargets(opts, args, printer.E, term.InputRaw()) + if err != nil { if errors.Is(err, ErrInvalidSourceData) { success = false @@ -514,6 +530,16 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts global.Options, te } } + isS3Source := false + if len(targets) > 0 { + isS3Source = strings.HasPrefix(targets[0], fs.S3Prefix) + if isS3Source { + for i, target := range targets { + targets[i] = strings.TrimPrefix(target, fs.S3Prefix) + } + } + } + timeStamp := time.Now() backupStart := timeStamp if opts.TimeStamp != "" { @@ -611,6 +637,15 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts global.Options, te targets = []string{filename} } + if isS3Source { + s3Source := &fs.S3Source{} + err := s3Source.WarmingUp(targets) + if err != nil { + return err + } + targetFS = s3Source + } + if backupFSTestHook != nil { targetFS = backupFSTestHook(targetFS) } diff --git a/internal/fs/fs_s3.go b/internal/fs/fs_s3.go new file mode 100644 index 000000000..b09445d5d --- /dev/null +++ b/internal/fs/fs_s3.go @@ -0,0 +1,305 @@ +package fs + +import ( + "context" + "fmt" + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/restic/restic/internal/data" + "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/errors" + "io" + "io/fs" + "net/url" + "os" + "path" + "strings" + "sync" + "time" +) + +const S3Prefix = "s3:/" +const basePermissionFile fs.FileMode = 0644 +const basePermissionFolder fs.FileMode = os.ModeDir | 0755 + +type S3Source struct { + s3Client *minio.Client + files map[string]*ExtendedFileInfo + filesByFolder map[string][]string +} + +// statically ensure that S3Source implements FS. +var _ FS = &S3Source{} + +func (fs *S3Source) VolumeName(_ string) string { + return "" +} + +// OpenFile opens a file or directory for reading. +func (fs *S3Source) OpenFile(name string, _ int, metadataOnly bool) (File, error) { + name = s3CleanPath(name) + if name == "/" { + return nil, fmt.Errorf("invalid filename specified") + } + + fi, ok := fs.files[name] + if !ok { + return nil, pathError("open file", name, os.ErrNotExist) + } + + return newS3SourceFile(name, fi, fs.s3Client, + // is not folder, value is nil + fs.filesByFolder[name], metadataOnly) +} + +func (fs *S3Source) factoryS3Client() (*minio.Client, error) { + endpoint := os.Getenv("AWS_ENDPOINT_URL") + accessKeyID := os.Getenv("AWS_ACCESS_KEY_ID") + secretAccessKey := os.Getenv("AWS_SECRET_ACCESS_KEY") + if accessKeyID == "" && secretAccessKey != "" { + return nil, errors.Fatalf("no credentials found. $AWS_SECRET_ACCESS_KEY is set but $AWS_ACCESS_KEY_ID is empty") + } else if accessKeyID != "" && secretAccessKey == "" { + return nil, errors.Fatalf("no credentials found. $AWS_ACCESS_KEY_ID is set but $AWS_SECRET_ACCESS_KEY is empty") + } else if endpoint == "" { + return nil, errors.Fatalf("no credentials found. $AWS_ENDPOINT_URL is empty") + } + + urlEndpoint, err := url.Parse(endpoint) + if err != nil { + return nil, err + } + s3Client, err := minio.New(urlEndpoint.Host, &minio.Options{ + Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""), + Secure: urlEndpoint.Scheme == "https", + }) + if err != nil { + return nil, err + } + + return s3Client, nil +} + +func (fs *S3Source) WarmingUp(targets []string) error { + stateDate := time.Now() + defer func() { + debug.Log("s3 duration warming up %s", time.Since(stateDate)) + }() + + var err error + fs.s3Client, err = fs.factoryS3Client() + + if err != nil { + return err + } + + var muFilesByFolder sync.Mutex + filesByFolder := make(map[string][]string) + var muFiles sync.Mutex + files := make(map[string]*ExtendedFileInfo) + + var wg sync.WaitGroup + wg.Add(len(targets)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + errCh := make(chan error, len(targets)) + for _, target := range targets { + partPath := strings.Split(target, "/") + // example /bucket-name + bucketName := partPath[1] + prefix := path.Join(partPath[2:]...) + root := path.Join("/", bucketName) + + go func() { + defer wg.Done() + for obj := range fs.s3Client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{Recursive: true, Prefix: prefix}) { + if obj.Err != nil { + if ctx.Err() == nil { + select { + case errCh <- obj.Err: + default: + } + } + cancel() + return + } + + absPath := path.Join(root, obj.Key) + for currPath := absPath; ; { + currPath = path.Clean(path.Dir(currPath)) + if currPath == "/" { + break + } + + muFiles.Lock() + if _, exists := files[currPath]; exists { + muFiles.Unlock() + // this tree already added + break + } + files[currPath] = &ExtendedFileInfo{ + Name: path.Base(currPath), + Mode: basePermissionFolder, + ModTime: time.Unix(0, 0), + ChangeTime: time.Unix(0, 0), + Size: 0, + } + muFiles.Unlock() + } + { + dir, file := path.Split(absPath) + dir = path.Clean(dir) + muFilesByFolder.Lock() + filesByFolder[dir] = append(filesByFolder[dir], file) + muFilesByFolder.Unlock() + } + + muFiles.Lock() + files[absPath] = &ExtendedFileInfo{ + Name: path.Base(absPath), + Mode: basePermissionFile, + ModTime: obj.LastModified, + ChangeTime: obj.LastModified, + Size: obj.Size, + } + muFiles.Unlock() + } + }() + } + wg.Wait() + close(errCh) + + select { + case err, ok := <-errCh: + if err != nil && ok { + return err + } + default: + } + + fs.filesByFolder = filesByFolder + fs.files = files + return nil +} + +// Lstat returns the FileInfo structure describing the named file. +// If there is an error, it will be of type *os.PathError. +func (fs *S3Source) Lstat(name string) (*ExtendedFileInfo, error) { + name = s3CleanPath(name) + info, ok := fs.files[name] + if !ok { + return nil, pathError("lstat", name, os.ErrNotExist) + } + return info, nil +} + +func (fs *S3Source) Join(elem ...string) string { + return path.Join(elem...) +} + +func (fs *S3Source) Separator() string { + return "/" +} + +func (fs *S3Source) IsAbs(p string) bool { + return path.IsAbs(p) +} +func (fs *S3Source) Abs(p string) (string, error) { + return s3CleanPath(p), nil +} + +func s3CleanPath(name string) string { + return path.Clean("/" + name) +} + +func (fs *S3Source) Clean(p string) string { + return path.Clean(p) +} + +func (fs *S3Source) Base(p string) string { + return path.Base(p) +} + +func (fs *S3Source) Dir(p string) string { + return path.Dir(p) +} + +type s3SourceFile struct { + rc io.ReadCloser + name string + fi *ExtendedFileInfo + filesInFolder []string + s3Client *minio.Client +} + +// See the File interface for a description of each method +var _ File = &s3SourceFile{} + +func newS3SourceFile(name string, fi *ExtendedFileInfo, s3Client *minio.Client, filesInFolder []string, metadataOnly bool) (*s3SourceFile, error) { + name = s3CleanPath(name) + if metadataOnly || fi.Mode.IsDir() { + return &s3SourceFile{name: name, fi: fi, rc: nil, filesInFolder: filesInFolder, s3Client: s3Client}, nil + } + + partPath := strings.Split(name, "/") + // example /bucket-name + bucketName := partPath[1] + objPath := path.Join(partPath[2:]...) + ctx := context.Background() + object, err := s3Client.GetObject(ctx, bucketName, objPath, minio.GetObjectOptions{}) + if err != nil { + return nil, pathError("open file s3", name, os.ErrNotExist) + } + return &s3SourceFile{name: name, fi: fi, rc: object, filesInFolder: filesInFolder, s3Client: s3Client}, nil + +} + +func (f *s3SourceFile) MakeReadable() error { + if f.rc != nil { + panic("s3 file is already readable") + } + + newF, err := newS3SourceFile(f.name, f.fi, f.s3Client, f.filesInFolder, false) + if err != nil { + return err + } + // replace state and also reset cached FileInfo + *f = *newF + return nil +} + +func (f *s3SourceFile) Stat() (*ExtendedFileInfo, error) { + return f.fi, nil +} + +func (f *s3SourceFile) ToNode(_ bool, _ func(format string, args ...any)) (*data.Node, error) { + node := buildBasicNode(f.name, f.fi) + + //TODO: change on info about owner in repo + node.UID = 0 //uint32(os.Getuid()) + node.GID = 0 //uint32(os.Getgid()) + node.ChangeTime = node.ModTime + + return node, nil +} + +func (f *s3SourceFile) Read(p []byte) (n int, err error) { + if f.rc != nil { + return f.rc.Read(p) + } + + return 0, pathError("read", f.name, os.ErrNotExist) +} + +func (f *s3SourceFile) Readdirnames(_ int) ([]string, error) { + if f.filesInFolder == nil { + return []string{}, pathError("Readdirnames", f.name, os.ErrNotExist) + } + return f.filesInFolder, nil +} + +func (f *s3SourceFile) Close() error { + if f.rc != nil { + return f.rc.Close() + } + return nil +}